Skip to content
Thèmes
Scikit-Learn
Sklearn Pipeline: Guide complet pour construire des pipelines ML en Python

Sklearn Pipeline: Guide complet pour construire des pipelines ML en Python

Mis à jour le

Vous avez un projet de machine learning avec cinq étapes de prétraitement, trois opérations d’ingénierie de features et un modèle final. Chaque étape est un bloc de code séparé. Vous ajustez votre scaler sur l’ensemble du dataset, puis vous faites le split en train et test. Votre one-hot encoding crée des colonnes différentes en entraînement et en production. Des mois plus tard, quelqu’un modifie la stratégie d’imputation mais oublie de mettre à jour le script de déploiement.

C’est la réalité de la plupart des codebases ML. Les pipelines de prétraitement manuels sont fragiles, sujets aux erreurs, et constituent une source constante de data leakage -- la cause la plus fréquente de modèles qui fonctionnent très bien dans des notebooks mais échouent sur des données réelles. Lorsque vous ajustez un StandardScaler sur tout le dataset avant le split, les statistiques du jeu de test fuient dans l’entraînement. Lorsque vous encodez des variables catégorielles en dehors d’un workflow unifié, le décalage train-test devient invisible jusqu’à ce que la production casse.

Le Pipeline de Scikit-learn résout ces problèmes en chaînant le prétraitement et la modélisation dans un seul objet. Un seul appel à fit() entraîne tout. Un seul appel à predict() transforme puis prédit. Pas de data leakage. Pas de transformations incompatibles. Un seul objet à sauvegarder, charger et déployer. Ce guide couvre tout ce dont vous avez besoin pour construire des pipelines sklearn de qualité production, de l’utilisation basique aux transformers personnalisés et aux patterns de déploiement du monde réel.

Pourquoi les Pipelines sont importants

Le problème de data leakage

Le data leakage se produit lorsque des informations extérieures au jeu d’entraînement influencent le modèle pendant l’entraînement. La forme la plus courante dans le prétraitement ressemble à ceci :

# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
 
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # Fitted on ALL data, including test
 
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full dataset

Le scaler calcule la moyenne et l’écart-type sur l’ensemble du dataset, y compris les échantillons de test. L’évaluation de votre jeu de test devient alors trop optimiste, car le modèle a indirectement "vu" des informations issues de ces échantillons pendant le prétraitement.

L’approche correcte :

# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
 
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # Fit only on training
X_test_scaled = scaler.transform(X_test)         # Transform only, no fitting

Cela fonctionne, mais l’approche manuelle devient vite ingérable. Avec cinq étapes de prétraitement, vous devez suivre cinq objets ajustés et vous souvenir du bon appel fit_transform ou transform pour chacun. Un Pipeline gère cela automatiquement.

Organisation du code

Au-delà du leakage, les pipelines résolvent un problème d’organisation du code. Comparez ces deux approches :

AspectPrétraitement manuelsklearn Pipeline
Risque de data leakageÉlevé -- facile d’appeler fit_transform sur les données de testAucun -- le pipeline impose le bon enchaînement fit/transform
Lignes de code pour train + predict10-30 lignes par environnement2 lignes (fit, predict)
Déploiement en productionSérialiser chaque transformer séparément, reconstruire l’ordreSérialiser un seul objet avec joblib
Cross-validationRefit manuel de toutes les étapes à chaque foldcross_val_score gère tout
Réglage des hyperparamètresBoucles manuelles sur le prétraitement + les paramètres du modèleGridSearchCV optimise tous les paramètres ensemble
ReproductibilitéDépend de l’ordre d’exécution dans le notebookDéterministe -- même objet, même résultat
DébogageImprimer les shapes après chaque étape, manuellementpipeline.named_steps pour l’inspection

Utilisation de base d’un Pipeline

La classe Pipeline prend une liste de tuples (name, transformer). Toutes les étapes sauf la dernière doivent implémenter fit et transform. La dernière étape peut être n’importe quel estimateur (classifier, regressor ou transformer).

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
 
# Create pipeline with named steps
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

Fit et Predict

Lorsque vous appelez pipe.fit(X_train, y_train), le pipeline :

  1. Appelle scaler.fit_transform(X_train, y_train) -- ajuste le scaler et transforme les données d’entraînement
  2. Passe les données transformées à classifier.fit(X_transformed, y_train)

Lorsque vous appelez pipe.predict(X_test), le pipeline :

  1. Appelle scaler.transform(X_test) -- transforme seulement, sans ré-ajustement
  2. Passe les données transformées à classifier.predict(X_transformed)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
 
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
# Build and train pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000

Accéder aux étapes individuelles

Vous pouvez inspecter n’importe quelle étape par son nom :

# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds:  {scaler.scale_}")
 
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")

Vous pouvez aussi utiliser l’indexation :

# Access by index
first_step = pipe[0]   # StandardScaler
last_step = pipe[-1]    # LogisticRegression
 
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1]  # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")

make_pipeline : le raccourci

Lorsque vous n’avez pas besoin de noms d’étapes personnalisés, make_pipeline génère automatiquement les noms à partir des noms de classes :

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
 
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
#                          ('pca', PCA(n_components=2)),
#                          ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
 
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}

Les noms générés automatiquement correspondent au nom de la classe en minuscules. Si vous utilisez deux fois le même transformer, make_pipeline ajoute un numéro :

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
 
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']

Pipeline vs make_pipeline

FonctionnalitéPipelinemake_pipeline
Noms d’étapes personnalisésOui -- vous les choisissezNon -- générés automatiquement
Lisibilité pour les grands pipelinesMeilleure -- noms descriptifsMoins bonne -- noms génériques
Syntaxe de tuning d’hyperparamètresstepname__param avec vos nomsclassname__param avec les noms auto
Concision du codePlus verbeuxPlus concis
Idéal pourPipelines de production, tuningPrototypage rapide

Utilisez Pipeline lorsque vous prévoyez d’optimiser des hyperparamètres ou lorsque des noms d’étapes clairs améliorent la lisibilité. Utilisez make_pipeline pour des expériences rapides.

Étapes de prétraitement courantes

Voici les transformers les plus fréquemment utilisés dans les pipelines sklearn :

Variables numériques

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
 
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
TransformerRôleQuand l’utiliser
StandardScalerCentre à moyenne=0, échelle écart-type=1Choix par défaut pour la plupart des algorithmes
MinMaxScalerRamène à l’intervalle [0, 1]Réseaux neuronaux, algorithmes sensibles à l’échelle
RobustScalerUtilise la médiane et l’IQR, robuste aux outliersDonnées avec des valeurs aberrantes importantes
SimpleImputerRemplit les valeurs manquantes (mean, median, most_frequent, constant)Gestion des données manquantes
PolynomialFeaturesGénère des features polynomiales et d’interactionAjouter de la non-linéarité aux modèles linéaires
PowerTransformerApplique une transformation Yeo-Johnson ou Box-CoxDistributions asymétriques

Variables catégorielles

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
 
# One-hot encode categorical features
categorical_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
TransformerRôleQuand l’utiliser
OneHotEncoderCrée des colonnes binaires pour chaque catégorieCatégories nominales (sans ordre)
OrdinalEncoderMappe les catégories vers des entiersCatégories ordinales (low/medium/high)
TargetEncoderEncode à l’aide de statistiques de la variable cibleVariables à forte cardinalité (scikit-learn 1.3+)

ColumnTransformer pour les types de données mixtes

Les datasets réels contiennent à la fois des colonnes numériques et catégorielles. ColumnTransformer applique différentes transformations à différents sous-ensembles de colonnes en parallèle :

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
 
# Sample data with mixed types
data = pd.DataFrame({
    'age': [25, 30, np.nan, 45, 50],
    'income': [40000, 55000, 60000, np.nan, 90000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
    'purchased': [0, 1, 1, 0, 1]
})
 
X = data.drop('purchased', axis=1)
y = data['purchased']
 
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
 
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)
 
# Full pipeline: preprocessing + model
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")

Obtenir les noms de features après transformation

Après avoir ajusté un ColumnTransformer, vous pouvez récupérer les noms des features transformées :

# After fitting the pipeline
pipeline.fit(X, y)
 
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
#  'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']

Gérer les colonnes restantes

Par défaut, ColumnTransformer supprime les colonnes non spécifiées dans un transformer. Contrôlez cela avec le paramètre remainder :

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough'  # Keep unspecified columns as-is
    # remainder='drop'       # Default: drop unspecified columns
)

Pipeline avec GridSearchCV

L’une des fonctionnalités les plus puissantes des pipelines sklearn est leur intégration transparente avec l’optimisation d’hyperparamètres. Utilisez la syntaxe stepname__parameter pour référencer les paramètres à l’intérieur des étapes du pipeline :

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
 
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
 
# Build pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svc', SVC())
])
 
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
    'pca__n_components': [5, 10, 15, 20],
    'svc__C': [0.1, 1, 10, 100],
    'svc__kernel': ['rbf', 'linear'],
    'svc__gamma': ['scale', 'auto']
}
 
# Run grid search
grid = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid.fit(X_train, y_train)
 
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score:   {grid.best_score_:.4f}")
print(f"Test score:      {grid.score(X_test, y_test):.4f}")

Ajuster les paramètres de ColumnTransformer

Pour des pipelines imbriqués avec ColumnTransformer, enchaînez les noms d’étapes avec des double underscores :

# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
    'classifier__C': [0.1, 1, 10]
}

Utiliser RandomizedSearchCV pour de grands espaces de recherche

Lorsque la grille de paramètres est très grande, RandomizedSearchCV échantillonne un nombre fixe de combinaisons de paramètres :

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
 
param_distributions = {
    'pca__n_components': randint(5, 25),
    'svc__C': uniform(0.1, 100),
    'svc__kernel': ['rbf', 'linear', 'poly'],
    'svc__gamma': uniform(0.001, 1)
}
 
random_search = RandomizedSearchCV(
    pipe,
    param_distributions,
    n_iter=50,       # Sample 50 combinations
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)
 
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score:   {random_search.best_score_:.4f}")

Transformers personnalisés

FunctionTransformer : étapes personnalisées rapides

Pour des transformations simples et sans état, utilisez FunctionTransformer :

from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
 
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1  # Optional inverse for inverse_transform
)
 
pipe = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler())
])
 
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original:    {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")

Classe de transformer personnalisée

Pour des transformations avec état (celles qui apprennent des paramètres à partir des données), créez une classe qui hérite de BaseEstimator et TransformerMixin :

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
 
class OutlierClipper(BaseEstimator, TransformerMixin):
    """Clips values beyond a specified number of standard deviations."""
 
    def __init__(self, n_std=3):
        self.n_std = n_std
 
    def fit(self, X, y=None):
        # Learn the boundaries from training data
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        self.lower_ = self.mean_ - self.n_std * self.std_
        self.upper_ = self.mean_ + self.n_std * self.std_
        return self  # Always return self from fit
 
    def transform(self, X):
        # Apply learned boundaries to any data
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped

Utilisez-le dans un pipeline comme n’importe quel transformer intégré :

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('clip', OutlierClipper(n_std=2)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
 
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
 
grid = GridSearchCV(
    pipe,
    {'clip__n_std': [1.5, 2, 2.5, 3]},
    cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")

Transformer personnalisé pour l’ingénierie de features

Un exemple plus pratique -- créer des features d’interaction à partir de colonnes spécifiques :

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
 
class FeatureInteraction(BaseEstimator, TransformerMixin):
    """Creates multiplication interactions between specified column pairs."""
 
    def __init__(self, interaction_pairs=None):
        self.interaction_pairs = interaction_pairs
 
    def fit(self, X, y=None):
        # Store column names if DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.tolist()
        else:
            self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
        return self
 
    def transform(self, X):
        X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
 
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                name = f"{col_a}_x_{col_b}"
                X_df[name] = X_df[col_a] * X_df[col_b]
 
        return X_df.values
 
    def get_feature_names_out(self, input_features=None):
        names = list(self.feature_names_in_)
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                names.append(f"{col_a}_x_{col_b}")
        return np.array(names)

FeatureUnion : ingénierie de features en parallèle

Alors que Pipeline enchaîne les étapes séquentiellement, FeatureUnion exécute les transformers en parallèle et concatène leurs sorties horizontalement :

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
# Create parallel feature branches
feature_union = FeatureUnion([
    ('scaled', StandardScaler()),             # Original features, scaled
    ('pca', PCA(n_components=2)),             # 2 PCA components
    ('poly', PolynomialFeatures(degree=2, include_bias=False))  # Polynomial features
])
 
# Combine into a full pipeline
pipe = Pipeline([
    ('features', feature_union),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features:    {X_train.shape[1]}")
print(f"After FeatureUnion:   {X_transformed.shape[1]}")
print(f"Test accuracy:        {pipe.score(X_test, y_test):.4f}")

FeatureUnion vs ColumnTransformer

FonctionnalitéFeatureUnionColumnTransformer
EntréeToutes les colonnes vont à tous les transformersColonnes spécifiques vers transformers spécifiques
SortieConcatène horizontalementConcatène horizontalement
Cas d’usagePlusieurs représentations des mêmes featuresDifférents types de features nécessitent des traitements différents
Sélection de colonnesImpossible -- fonctionne sur toutes les colonnesSpécification de colonnes intégrée
Alternative moderneSouvent remplacé par ColumnTransformerPréféré dans la plupart des cas

Dans le scikit-learn moderne, ColumnTransformer couvre la plupart des cas où FeatureUnion était utilisé auparavant. FeatureUnion reste utile lorsque vous voulez plusieurs représentations du même ensemble de features (par exemple, valeurs brutes + PCA + features polynomiales).

Sauvegarder et charger des Pipelines

L’un des plus grands avantages des pipelines est la simplicité du déploiement. Au lieu de sérialiser séparément chaque transformer et le modèle, vous sauvegardez un seul objet :

import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
 
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
 
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")

Versionner les Pipelines

En production, incluez des métadonnées en plus du pipeline :

import joblib
import datetime
import sklearn
 
artifact = {
    'pipeline': pipe,
    'training_date': datetime.datetime.now().isoformat(),
    'sklearn_version': sklearn.__version__,
    'feature_names': list(iris.feature_names),
    'target_names': list(iris.target_names),
    'training_accuracy': pipe.score(X_train, y_train),
    'test_accuracy': pipe.score(X_test, y_test),
    'n_training_samples': len(X_train)
}
 
joblib.dump(artifact, 'model_artifact_v1.joblib')
 
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version:  {loaded['sklearn_version']}")
print(f"Test accuracy:    {loaded['test_accuracy']:.4f}")
 
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])

Utiliser pickle (alternative)

joblib est préférable pour les objets sklearn, car il gère efficacement les grands tableaux NumPy. pickle standard fonctionne aussi :

import pickle
 
# Save
with open('pipeline.pkl', 'wb') as f:
    pickle.dump(pipe, f)
 
# Load
with open('pipeline.pkl', 'rb') as f:
    loaded = pickle.load(f)

Exemple réel : pipeline complet de classification

Voici un pipeline complet, de qualité production, pour une tâche de classification avec des types de features mixtes. Il utilise un Random Forest classifier avec ColumnTransformer et se termine par un rapport d’évaluation complet. Cet exemple utilise le schéma de dataset de type Titanic que rencontrent la plupart des praticiens ML :

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
    train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
 
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
 
data = pd.DataFrame({
    'age': np.random.normal(35, 12, n),
    'income': np.random.lognormal(10.5, 0.8, n),
    'credit_score': np.random.normal(650, 80, n),
    'years_employed': np.random.exponential(5, n),
    'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
    'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
    'promoted': np.random.binomial(1, 0.3, n)
})
 
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
    mask = np.random.random(n) < 0.05
    data.loc[mask, col] = np.nan
 
for col in ['department', 'education']:
    mask = np.random.random(n) < 0.03
    data.loc[mask, col] = np.nan
 
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")
# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
 
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set:     {X_test.shape[0]} samples")
# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
 
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# -- Combine column transformers --
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'  # Explicitly drop unlisted columns
)
 
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(
        n_estimators=200,
        max_depth=10,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    ))
])
# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
 
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
 
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
# -- Hyperparameter tuning --
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [5, 10, 15, None],
    'classifier__min_samples_leaf': [3, 5, 10]
}
 
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid_search.fit(X_train, y_train)
 
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score:   {grid_search.best_score_:.4f}")
print(f"Test score:      {grid_search.score(X_test, y_test):.4f}")
# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
 
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
 
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values('importance', ascending=False)
 
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))
# -- Save the final pipeline --
import joblib
 
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
 
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
 
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
    'age': [28],
    'income': [65000],
    'credit_score': [720],
    'years_employed': [3.5],
    'department': ['Engineering'],
    'education': ['Master'],
    'city': ['NYC']
})
 
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")

Cet exemple démontre tous les patterns clés des pipelines : types de features mixtes, gestion des valeurs manquantes, cross-validation, tuning d’hyperparamètres, inspection des features et sérialisation pour la production.

Pipeline avec différents types de modèles

Le même pipeline de prétraitement peut servir différents modèles. Il suffit de remplacer la dernière étape :

from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
 
# Reuse the preprocessor from the previous example
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42)
}
 
results = {}
for name, model in models.items():
    pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
    results[name] = {
        'mean': scores.mean(),
        'std': scores.std()
    }
    print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")

Vous pouvez aussi définir dynamiquement l’estimateur final :

# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")

Pipelines en régression

Les pipelines fonctionnent exactement de la même manière pour les tâches de régression. Pour plus de détails sur la régression linéaire avec sklearn, consultez notre guide sklearn linear regression.

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
 
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
    'sqft': np.random.normal(1500, 400, n),
    'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
    'age': np.random.uniform(0, 50, n),
    'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
    'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
    housing['sqft'] * 200
    + housing['bedrooms'] * 15000
    - housing['age'] * 1000
    + np.random.normal(0, 20000, n)
)
 
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
 
preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('poly', PolynomialFeatures(degree=2, include_bias=False)),
        ('scaler', StandardScaler())
    ]), numeric_features),
    ('cat', Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ]), categorical_features)
])
 
reg_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', Ridge(alpha=1.0))
])
 
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
 
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")

Ignorer des étapes avec passthrough et None

Vous pouvez sauter conditionnellement des étapes en les définissant à 'passthrough' ou None :

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
 
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('reduce_dim', PCA()),
    ('classifier', SVC())
])
 
# Grid search can toggle steps on/off
param_grid = [
    {
        'reduce_dim': [PCA(5), PCA(10), PCA(15)],
        'classifier__C': [1, 10]
    },
    {
        'reduce_dim': ['passthrough'],  # Skip PCA entirely
        'classifier__C': [1, 10]
    }
]
 
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score:  {grid.best_score_:.4f}")

Mise en cache des étapes du Pipeline

Lors du tuning d’hyperparamètres, certaines étapes intermédiaires peuvent être recalculées inutilement. Activez le cache pour éviter cela :

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
 
# Create a temporary cache directory
cachedir = mkdtemp()
 
pipe = Pipeline(
    [
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=10)),
        ('svc', SVC())
    ],
    memory=cachedir  # Cache intermediate transformations
)
 
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
 
# Clean up when done
# rmtree(cachedir)

Explorer les données avant de construire des pipelines

Avant de construire un pipeline, vous devez comprendre vos features : leurs distributions, les patterns de valeurs manquantes, les corrélations et les transformations possibles. PyGWalker (opens in a new tab) vous permet de transformer n’importe quel DataFrame Pandas en interface interactive d’exploration visuelle directement dans les notebooks Jupyter :

import pandas as pd
import pygwalker as pyg
 
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)

Ce type d’exploration visuelle vous aide à décider quelles features doivent être normalisées, lesquelles contiennent des outliers à clipper, et quelles variables catégorielles ont une cardinalité élevée. Vous pouvez identifier les patterns de valeurs manquantes et comprendre les distributions des features avant d’écrire une seule ligne de code de pipeline.

Pour itérer sur votre workflow complet d’expérimentation de pipeline -- tester différentes stratégies de prétraitement, comparer les performances des modèles et suivre les résultats -- RunCell (opens in a new tab) fournit un environnement Jupyter alimenté par l’IA où un agent assiste la génération de code, le débogage et la gestion des expériences.

Pièges courants et conseils de débogage

Piège 1 : oublier d’utiliser le Pipeline pour la prédiction

# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
 
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)

Piège 2 : ajuster le prétraitement en dehors du Pipeline

# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
 
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train)  # Pipeline handles the scaling

Piège 3 : mauvais noms de paramètres dans GridSearchCV

La syntaxe stepname__param doit correspondre exactement aux noms des étapes de votre pipeline :

pipe = Pipeline([
    ('my_scaler', StandardScaler()),
    ('my_clf', LogisticRegression())
])
 
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]}  # KeyError
 
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}

Piège 4 : changement de l’ordre des colonnes

Lorsque vous utilisez ColumnTransformer avec des DataFrames Pandas (que vous pouvez charger avec pandas read_csv), l’ordre des colonnes en sortie dépend de l’ordre des transformers, et non de celui du DataFrame d’origine :

# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
    ('num', numeric_transformer, numeric_features),     # These come first
    ('cat', categorical_transformer, categorical_features)  # These come second
])

Déboguer les sorties intermédiaires

Pour inspecter ce qui se passe à chaque étape :

# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
 
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1]  # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
 
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas")  # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debug

Déboguer les mismatches de forme

# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
 
for name, step in pipe.named_steps.items():
    if hasattr(step, 'transform'):
        # Check if the step has been fitted
        try:
            X_train = step.transform(X_train)
            print(f"After '{name}': {X_train.shape}")
        except Exception as e:
            print(f"Error at '{name}': {e}")
            break

Référence des méthodes du Pipeline

MéthodeDescription
fit(X, y)Ajuste tous les transformers et l’estimateur final
predict(X)Transforme X à travers toutes les étapes, puis prédit avec l’estimateur final
predict_proba(X)Transforme et retourne des probabilités (classifiers uniquement)
transform(X)Transforme X à travers toutes les étapes (si la dernière étape est un transformer)
fit_transform(X, y)Ajuste et transforme en un seul appel
fit_predict(X, y)Ajuste et prédit en un seul appel
score(X, y)Transforme et score (accuracy pour les classifiers, R2 pour les regresseurs)
set_params(**params)Définit les paramètres avec la syntaxe stepname__param
get_params()Retourne tous les paramètres
named_stepsAccès de type dictionnaire aux étapes du pipeline
[i] ou [name]Accès à une étape par index ou par nom
[start:end]Découper pour créer un sous-pipeline

FAQ

Quelle est la différence entre Pipeline et make_pipeline ?

Pipeline vous oblige à fournir des tuples (name, estimator) pour chaque étape, ce qui vous donne un contrôle explicite sur les noms des étapes. make_pipeline accepte des instances d’estimateurs seules et génère automatiquement les noms à partir des noms de classes (en minuscules). Utilisez Pipeline lorsque vous avez besoin de noms descriptifs ou que vous prévoyez d’optimiser des hyperparamètres avec GridSearchCV. Utilisez make_pipeline pour un prototypage rapide.

sklearn Pipeline empêche-t-il le data leakage ?

Oui. Lorsque vous appelez pipeline.fit(X_train, y_train), chaque transformer est ajusté uniquement sur les données d’entraînement. Pendant la cross-validation avec cross_val_score ou GridSearchCV, le pipeline réajuste toutes les étapes sur chaque fold d’entraînement, garantissant qu’aucune donnée du fold de test ne fuit dans le prétraitement. C’est l’avantage principal par rapport au prétraitement manuel.

Puis-je utiliser Pipeline avec des modèles deep learning ?

Les pipelines Scikit-learn fonctionnent avec tout estimateur qui suit l’API sklearn (implémente fit, predict et éventuellement transform). Des bibliothèques comme scikeras fournissent des wrappers compatibles sklearn pour les modèles Keras, ce qui permet de les utiliser dans des pipelines. XGBoost et LightGBM fournissent également des interfaces compatibles sklearn.

Comment gérer les noms de features après ColumnTransformer ?

Appelez pipeline.named_steps['preprocessor'].get_feature_names_out() après l’ajustement. Cela renvoie un tableau de noms de features avec des préfixes indiquant quel transformer les a produites (par exemple, num__age, cat__city_NYC). Cela fonctionne dans scikit-learn 1.0 et plus.

Peut-on avoir plusieurs modèles dans un seul Pipeline ?

Non. Un Pipeline est une séquence linéaire où la dernière étape est l’estimateur. Si vous voulez comparer plusieurs modèles, créez des pipelines séparés avec les mêmes étapes de prétraitement mais des estimateurs finaux différents. Vous pouvez automatiser cela en bouclant sur un dictionnaire de modèles et en créant un pipeline pour chacun.

Comment sauter une étape dans un Pipeline pendant GridSearchCV ?

Définissez l’étape à 'passthrough' dans la grille de paramètres. Par exemple : {'reduce_dim': ['passthrough']} ignorera entièrement l’étape reduce_dim pendant cette itération de grid search. Vous pouvez aussi définir une étape à None, mais 'passthrough' est l’approche recommandée.

Que se passe-t-il si le Pipeline rencontre des catégories inédites au moment de la prédiction ?

Si vous utilisez OneHotEncoder avec handle_unknown='ignore' dans votre pipeline, les catégories inédites sont encodées en zéros. Sans ce réglage, le pipeline lève une erreur. Définissez toujours handle_unknown='ignore' dans les pipelines de production où de nouvelles valeurs catégorielles sont possibles.

Conclusion

Sklearn Pipeline transforme un code ML désordonné et sujet aux erreurs en workflows clairs et reproductibles. En chaînant le prétraitement et la modélisation dans un seul objet, vous éliminez le data leakage, simplifiez le déploiement et rendez le tuning d’hyperparamètres sur l’ensemble du workflow trivial.

Commencez par les bases : enveloppez votre scaler et votre modèle dans un Pipeline. Passez à ColumnTransformer lorsque vous avez des types de features mixtes. Utilisez GridSearchCV pour ajuster les paramètres de chaque étape du pipeline simultanément. Construisez des transformers personnalisés lorsque les options intégrées ne couvrent pas vos besoins en ingénierie de features.

L’investissement pour apprendre les pipelines porte immédiatement ses fruits. Vos résultats de cross-validation deviennent fiables parce que le prétraitement est réajusté à chaque fold. Votre déploiement en production devient un simple joblib.dump et joblib.load. Et votre codebase devient plus maintenable parce que toute la logique de transformation et de prédiction vit dans un seul objet inspectable.

Guides associés

📚