Skip to content

Sklearn Pipeline: Complete Guide to Building ML Pipelines in Python

Updated on

You have a machine learning project with five preprocessing steps, three feature engineering operations, and a final model. Each step is a separate block of code. You fit your scaler on the full dataset, then split into train and test. Your one-hot encoding creates different columns in training than in production. Months later, someone changes the imputation strategy but forgets to update the deployment script.

This is the reality of most ML codebases. Manual preprocessing pipelines are fragile, error-prone, and a constant source of data leakage -- the single most common cause of models that perform great in notebooks but fail on live data. When you fit a StandardScaler on the entire dataset before splitting, test set statistics bleed into training. When you encode categorical features outside a unified workflow, train-test skew becomes invisible until production breaks.

Scikit-learn's Pipeline solves these problems by chaining preprocessing and modeling into a single object. One call to fit() trains everything. One call to predict() transforms and predicts. No data leakage. No mismatched transformations. One object to save, load, and deploy. This guide covers everything you need to build production-quality sklearn pipelines, from basic usage through custom transformers and real-world deployment patterns.

📚

Why Pipelines Matter

The Data Leakage Problem

Data leakage happens when information from outside the training set influences the model during training. The most common form in preprocessing looks like this:

# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
 
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # Fitted on ALL data, including test
 
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full dataset

The scaler computes mean and standard deviation from the entire dataset, including the test samples. Your test set evaluation is now optimistic because the model indirectly "saw" information from those samples during preprocessing.

The correct approach:

# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
 
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # Fit only on training
X_test_scaled = scaler.transform(X_test)         # Transform only, no fitting

This works, but the manual approach gets unwieldy fast. With five preprocessing steps, you track five fitted objects and remember the correct fit_transform vs transform call for each. A Pipeline handles this automatically.

Code Organization

Beyond leakage, pipelines solve a code organization problem. Compare these two approaches:

AspectManual Preprocessingsklearn Pipeline
Data leakage riskHigh -- easy to call fit_transform on test dataNone -- pipeline enforces correct fit/transform
Code lines for train + predict10-30 lines per environment2 lines (fit, predict)
Deploying to productionSerialize each transformer separately, reconstruct orderSerialize one object with joblib
Cross-validationMust manually refit all steps per foldcross_val_score handles everything
Hyperparameter tuningManually loop through preprocessing + model paramsGridSearchCV tunes all params together
ReproducibilityDepends on execution order in notebookDeterministic -- same object, same result
DebuggingPrint shapes after each step, manuallypipeline.named_steps for inspection

Basic Pipeline Usage

The Pipeline class takes a list of (name, transformer) tuples. All steps except the last must implement fit and transform. The last step can be any estimator (classifier, regressor, or transformer).

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
 
# Create pipeline with named steps
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

Fitting and Predicting

When you call pipe.fit(X_train, y_train), the pipeline:

  1. Calls scaler.fit_transform(X_train, y_train) -- fits the scaler and transforms training data
  2. Passes the transformed data to classifier.fit(X_transformed, y_train)

When you call pipe.predict(X_test), the pipeline:

  1. Calls scaler.transform(X_test) -- transforms only, no fitting
  2. Passes the transformed data to classifier.predict(X_transformed)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
 
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
# Build and train pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000

Accessing Individual Steps

You can inspect any step by name:

# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds:  {scaler.scale_}")
 
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")

You can also use indexing:

# Access by index
first_step = pipe[0]   # StandardScaler
last_step = pipe[-1]    # LogisticRegression
 
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1]  # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")

make_pipeline: The Shorthand

When you do not need custom step names, make_pipeline generates them automatically from the class names:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
 
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
#                          ('pca', PCA(n_components=2)),
#                          ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
 
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}

Auto-generated names are the lowercase class name. If you use the same transformer twice, make_pipeline appends a number:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
 
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']

Pipeline vs make_pipeline

FeaturePipelinemake_pipeline
Custom step namesYes -- you choose themNo -- auto-generated
Readability for large pipelinesBetter -- descriptive namesWorse -- generic names
Hyperparameter tuning syntaxstepname__param with your namesclassname__param with auto names
Code brevityMore verboseMore concise
Best forProduction pipelines, tuningQuick prototyping

Use Pipeline when you plan to tune hyperparameters or when clear step names improve readability. Use make_pipeline for quick experiments.

Common Preprocessing Steps

Here are the most frequently used transformers in sklearn pipelines:

Numeric Features

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
 
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
TransformerWhat It DoesWhen to Use
StandardScalerCenters to mean=0, scales to std=1Default choice for most algorithms
MinMaxScalerScales to [0, 1] rangeNeural networks, algorithms sensitive to magnitude
RobustScalerUses median and IQR, robust to outliersData with significant outliers
SimpleImputerFills missing values (mean, median, most_frequent, constant)Missing data handling
PolynomialFeaturesGenerates polynomial and interaction featuresAdding nonlinearity to linear models
PowerTransformerApplies Yeo-Johnson or Box-Cox transformSkewed distributions

Categorical Features

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
 
# One-hot encode categorical features
categorical_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
TransformerWhat It DoesWhen to Use
OneHotEncoderCreates binary columns for each categoryNominal categories (no order)
OrdinalEncoderMaps categories to integersOrdinal categories (low/medium/high)
TargetEncoderEncodes using target variable statisticsHigh-cardinality features (scikit-learn 1.3+)

ColumnTransformer for Mixed Data Types

Real datasets have both numeric and categorical columns. ColumnTransformer applies different transformations to different column subsets in parallel:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
 
# Sample data with mixed types
data = pd.DataFrame({
    'age': [25, 30, np.nan, 45, 50],
    'income': [40000, 55000, 60000, np.nan, 90000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
    'purchased': [0, 1, 1, 0, 1]
})
 
X = data.drop('purchased', axis=1)
y = data['purchased']
 
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
 
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)
 
# Full pipeline: preprocessing + model
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")

Getting Feature Names After Transformation

After fitting a ColumnTransformer, you can retrieve the transformed feature names:

# After fitting the pipeline
pipeline.fit(X, y)
 
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
#  'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']

Handling Remainder Columns

By default, ColumnTransformer drops columns not specified in any transformer. Control this with the remainder parameter:

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough'  # Keep unspecified columns as-is
    # remainder='drop'       # Default: drop unspecified columns
)

Pipeline with GridSearchCV

One of the most powerful features of sklearn pipelines is seamless integration with hyperparameter tuning. Use the stepname__parameter syntax to reference parameters inside pipeline steps:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
 
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
 
# Build pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svc', SVC())
])
 
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
    'pca__n_components': [5, 10, 15, 20],
    'svc__C': [0.1, 1, 10, 100],
    'svc__kernel': ['rbf', 'linear'],
    'svc__gamma': ['scale', 'auto']
}
 
# Run grid search
grid = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid.fit(X_train, y_train)
 
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score:   {grid.best_score_:.4f}")
print(f"Test score:      {grid.score(X_test, y_test):.4f}")

Tuning ColumnTransformer Parameters

For nested pipelines with ColumnTransformer, chain the step names with double underscores:

# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
    'classifier__C': [0.1, 1, 10]
}

Using RandomizedSearchCV for Large Search Spaces

When the parameter grid is large, RandomizedSearchCV samples a fixed number of parameter combinations:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
 
param_distributions = {
    'pca__n_components': randint(5, 25),
    'svc__C': uniform(0.1, 100),
    'svc__kernel': ['rbf', 'linear', 'poly'],
    'svc__gamma': uniform(0.001, 1)
}
 
random_search = RandomizedSearchCV(
    pipe,
    param_distributions,
    n_iter=50,       # Sample 50 combinations
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)
 
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score:   {random_search.best_score_:.4f}")

Custom Transformers

FunctionTransformer: Quick Custom Steps

For simple stateless transformations, use FunctionTransformer:

from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
 
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1  # Optional inverse for inverse_transform
)
 
pipe = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler())
])
 
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original:    {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")

Custom Transformer Class

For stateful transformations (those that learn parameters from data), create a class that inherits from BaseEstimator and TransformerMixin:

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
 
class OutlierClipper(BaseEstimator, TransformerMixin):
    """Clips values beyond a specified number of standard deviations."""
 
    def __init__(self, n_std=3):
        self.n_std = n_std
 
    def fit(self, X, y=None):
        # Learn the boundaries from training data
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        self.lower_ = self.mean_ - self.n_std * self.std_
        self.upper_ = self.mean_ + self.n_std * self.std_
        return self  # Always return self from fit
 
    def transform(self, X):
        # Apply learned boundaries to any data
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped

Use it in a pipeline like any built-in transformer:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('clip', OutlierClipper(n_std=2)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
 
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
 
grid = GridSearchCV(
    pipe,
    {'clip__n_std': [1.5, 2, 2.5, 3]},
    cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")

Custom Transformer for Feature Engineering

A more practical example -- creating interaction features from specific columns:

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
 
class FeatureInteraction(BaseEstimator, TransformerMixin):
    """Creates multiplication interactions between specified column pairs."""
 
    def __init__(self, interaction_pairs=None):
        self.interaction_pairs = interaction_pairs
 
    def fit(self, X, y=None):
        # Store column names if DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.tolist()
        else:
            self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
        return self
 
    def transform(self, X):
        X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
 
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                name = f"{col_a}_x_{col_b}"
                X_df[name] = X_df[col_a] * X_df[col_b]
 
        return X_df.values
 
    def get_feature_names_out(self, input_features=None):
        names = list(self.feature_names_in_)
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                names.append(f"{col_a}_x_{col_b}")
        return np.array(names)

FeatureUnion: Parallel Feature Engineering

While Pipeline chains steps sequentially, FeatureUnion runs transformers in parallel and concatenates their outputs horizontally:

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
# Create parallel feature branches
feature_union = FeatureUnion([
    ('scaled', StandardScaler()),             # Original features, scaled
    ('pca', PCA(n_components=2)),             # 2 PCA components
    ('poly', PolynomialFeatures(degree=2, include_bias=False))  # Polynomial features
])
 
# Combine into a full pipeline
pipe = Pipeline([
    ('features', feature_union),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features:    {X_train.shape[1]}")
print(f"After FeatureUnion:   {X_transformed.shape[1]}")
print(f"Test accuracy:        {pipe.score(X_test, y_test):.4f}")

FeatureUnion vs ColumnTransformer

FeatureFeatureUnionColumnTransformer
InputAll columns go to all transformersSpecific columns to specific transformers
OutputConcatenates horizontallyConcatenates horizontally
Use caseMultiple representations of the same featuresDifferent types of features need different processing
Column selectionCannot select -- operates on all columnsBuilt-in column specification
Modern alternativeOften replaced by ColumnTransformerPreferred for most use cases

In modern scikit-learn, ColumnTransformer handles most cases where FeatureUnion was previously used. FeatureUnion remains useful when you want multiple representations of the same feature set (e.g., raw values + PCA + polynomial features).

Saving and Loading Pipelines

One of the biggest advantages of pipelines is deployment simplicity. Instead of serializing each transformer and model separately, you save one object:

import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
 
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
 
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")

Versioning Pipelines

For production, include metadata alongside the pipeline:

import joblib
import datetime
import sklearn
 
artifact = {
    'pipeline': pipe,
    'training_date': datetime.datetime.now().isoformat(),
    'sklearn_version': sklearn.__version__,
    'feature_names': list(iris.feature_names),
    'target_names': list(iris.target_names),
    'training_accuracy': pipe.score(X_train, y_train),
    'test_accuracy': pipe.score(X_test, y_test),
    'n_training_samples': len(X_train)
}
 
joblib.dump(artifact, 'model_artifact_v1.joblib')
 
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version:  {loaded['sklearn_version']}")
print(f"Test accuracy:    {loaded['test_accuracy']:.4f}")
 
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])

Using pickle (Alternative)

joblib is preferred for sklearn objects because it efficiently handles large NumPy arrays. Standard pickle also works:

import pickle
 
# Save
with open('pipeline.pkl', 'wb') as f:
    pickle.dump(pipe, f)
 
# Load
with open('pipeline.pkl', 'rb') as f:
    loaded = pickle.load(f)

Real-World Example: Complete Classification Pipeline

Here is a complete, production-quality pipeline for a classification task with mixed feature types. It uses a Random Forest classifier with ColumnTransformer and ends with a full evaluation report. This example uses the Titanic-style dataset pattern that most ML practitioners encounter:

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
    train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
 
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
 
data = pd.DataFrame({
    'age': np.random.normal(35, 12, n),
    'income': np.random.lognormal(10.5, 0.8, n),
    'credit_score': np.random.normal(650, 80, n),
    'years_employed': np.random.exponential(5, n),
    'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
    'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
    'promoted': np.random.binomial(1, 0.3, n)
})
 
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
    mask = np.random.random(n) < 0.05
    data.loc[mask, col] = np.nan
 
for col in ['department', 'education']:
    mask = np.random.random(n) < 0.03
    data.loc[mask, col] = np.nan
 
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")
# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
 
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set:     {X_test.shape[0]} samples")
# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
 
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# -- Combine column transformers --
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'  # Explicitly drop unlisted columns
)
 
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(
        n_estimators=200,
        max_depth=10,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    ))
])
# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
 
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
 
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
# -- Hyperparameter tuning --
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [5, 10, 15, None],
    'classifier__min_samples_leaf': [3, 5, 10]
}
 
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid_search.fit(X_train, y_train)
 
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score:   {grid_search.best_score_:.4f}")
print(f"Test score:      {grid_search.score(X_test, y_test):.4f}")
# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
 
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
 
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values('importance', ascending=False)
 
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))
# -- Save the final pipeline --
import joblib
 
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
 
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
 
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
    'age': [28],
    'income': [65000],
    'credit_score': [720],
    'years_employed': [3.5],
    'department': ['Engineering'],
    'education': ['Master'],
    'city': ['NYC']
})
 
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")

This example demonstrates every key pipeline pattern: mixed feature types, missing value handling, cross-validation, hyperparameter tuning, feature inspection, and production serialization.

Pipeline with Different Model Types

The same preprocessing pipeline can serve different models. Swap only the last step:

from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
 
# Reuse the preprocessor from the previous example
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42)
}
 
results = {}
for name, model in models.items():
    pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
    results[name] = {
        'mean': scores.mean(),
        'std': scores.std()
    }
    print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")

You can also dynamically set the estimator step:

# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")

Pipelines in Regression

Pipelines work identically for regression tasks. For details on linear regression with sklearn, see our sklearn linear regression guide.

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
 
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
    'sqft': np.random.normal(1500, 400, n),
    'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
    'age': np.random.uniform(0, 50, n),
    'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
    'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
    housing['sqft'] * 200
    + housing['bedrooms'] * 15000
    - housing['age'] * 1000
    + np.random.normal(0, 20000, n)
)
 
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
 
preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('poly', PolynomialFeatures(degree=2, include_bias=False)),
        ('scaler', StandardScaler())
    ]), numeric_features),
    ('cat', Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ]), categorical_features)
])
 
reg_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', Ridge(alpha=1.0))
])
 
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
 
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")

Skipping Steps with passthrough and None

You can conditionally skip steps by setting them to 'passthrough' or None:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
 
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('reduce_dim', PCA()),
    ('classifier', SVC())
])
 
# Grid search can toggle steps on/off
param_grid = [
    {
        'reduce_dim': [PCA(5), PCA(10), PCA(15)],
        'classifier__C': [1, 10]
    },
    {
        'reduce_dim': ['passthrough'],  # Skip PCA entirely
        'classifier__C': [1, 10]
    }
]
 
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score:  {grid.best_score_:.4f}")

Caching Pipeline Steps

When tuning hyperparameters, intermediate steps may be recomputed redundantly. Enable caching to avoid this:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
 
# Create a temporary cache directory
cachedir = mkdtemp()
 
pipe = Pipeline(
    [
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=10)),
        ('svc', SVC())
    ],
    memory=cachedir  # Cache intermediate transformations
)
 
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
 
# Clean up when done
# rmtree(cachedir)

Exploring Data Before Building Pipelines

Before constructing a pipeline, you need to understand your features: their distributions, missing value patterns, correlations, and potential transformations. PyGWalker (opens in a new tab) lets you turn any Pandas DataFrame into an interactive visual exploration interface directly in Jupyter notebooks:

import pandas as pd
import pygwalker as pyg
 
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)

This kind of visual exploration helps you decide which features need scaling, which have outliers that need clipping, and which categorical features have high cardinality. You can identify missing value patterns and understand feature distributions before writing a single line of pipeline code.

For iterating on your full pipeline experimentation workflow -- testing different preprocessing strategies, comparing model performance, and tracking results -- RunCell (opens in a new tab) provides an AI-powered Jupyter environment where an agent assists with code generation, debugging, and experiment management.

Common Pitfalls and Debugging Tips

Pitfall 1: Forgetting to Use the Pipeline for Prediction

# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
 
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)

Pitfall 2: Fitting Preprocessing Outside the Pipeline

# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
 
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train)  # Pipeline handles the scaling

Pitfall 3: Wrong Parameter Names in GridSearchCV

The stepname__param syntax must match your pipeline step names exactly:

pipe = Pipeline([
    ('my_scaler', StandardScaler()),
    ('my_clf', LogisticRegression())
])
 
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]}  # KeyError
 
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}

Pitfall 4: Column Order Changes

When using ColumnTransformer with Pandas DataFrames (which you can load with pandas read_csv), the column order in the output depends on the order of transformers, not the original DataFrame:

# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
    ('num', numeric_transformer, numeric_features),     # These come first
    ('cat', categorical_transformer, categorical_features)  # These come second
])

Debugging Intermediate Outputs

To inspect what happens at each step:

# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
 
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1]  # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
 
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas")  # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debug

Debugging Shape Mismatches

# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
 
for name, step in pipe.named_steps.items():
    if hasattr(step, 'transform'):
        # Check if the step has been fitted
        try:
            X_train = step.transform(X_train)
            print(f"After '{name}': {X_train.shape}")
        except Exception as e:
            print(f"Error at '{name}': {e}")
            break

Pipeline Method Reference

MethodDescription
fit(X, y)Fit all transformers and the final estimator
predict(X)Transform X through all steps, then predict with the final estimator
predict_proba(X)Transform and get probability estimates (classifiers only)
transform(X)Transform X through all steps (if the last step is a transformer)
fit_transform(X, y)Fit and transform in one call
fit_predict(X, y)Fit and predict in one call
score(X, y)Transform and score (accuracy for classifiers, R2 for regressors)
set_params(**params)Set parameters using stepname__param syntax
get_params()Get all parameters
named_stepsDictionary-like access to pipeline steps
[i] or [name]Access step by index or name
[start:end]Slice to create a sub-pipeline

FAQ

What is the difference between Pipeline and make_pipeline?

Pipeline requires you to provide (name, estimator) tuples for each step, giving you explicit control over step names. make_pipeline accepts bare estimator instances and auto-generates names from the class names (lowercased). Use Pipeline when you need descriptive names or plan to tune hyperparameters with GridSearchCV. Use make_pipeline for quick prototyping.

Does sklearn Pipeline prevent data leakage?

Yes. When you call pipeline.fit(X_train, y_train), each transformer is fitted only on the training data. During cross-validation with cross_val_score or GridSearchCV, the pipeline refits all steps on each training fold, ensuring no test fold data leaks into the preprocessing. This is the primary advantage over manual preprocessing.

Can I use Pipeline with deep learning models?

Scikit-learn pipelines work with any estimator that follows the sklearn API (implements fit, predict, and optionally transform). Libraries like scikeras provide sklearn-compatible wrappers for Keras models, allowing them to be used in pipelines. XGBoost and LightGBM also provide sklearn-compatible interfaces.

How do I handle feature names after ColumnTransformer?

Call pipeline.named_steps['preprocessor'].get_feature_names_out() after fitting. This returns an array of feature names with prefixes indicating which transformer produced them (e.g., num__age, cat__city_NYC). This works in scikit-learn 1.0 and later.

Can I have multiple models in one Pipeline?

No. A Pipeline is a linear sequence where the last step is the estimator. If you want to compare multiple models, create separate pipelines with the same preprocessing steps but different final estimators. You can automate this by looping over a dictionary of models and creating a pipeline for each.

How do I skip a step in a Pipeline during GridSearchCV?

Set the step to 'passthrough' in the parameter grid. For example: {'reduce_dim': ['passthrough']} will skip the reduce_dim step entirely during that grid search iteration. You can also set a step to None, but 'passthrough' is the recommended approach.

What happens if the Pipeline encounters unseen categories at prediction time?

If you use OneHotEncoder with handle_unknown='ignore' inside your pipeline, unseen categories are encoded as all zeros. Without this setting, the pipeline raises an error. Always set handle_unknown='ignore' in production pipelines where new category values are possible.

Conclusion

Sklearn Pipeline transforms messy, error-prone ML code into clean, reproducible workflows. By chaining preprocessing and modeling into a single object, you eliminate data leakage, simplify deployment, and make hyperparameter tuning across the entire workflow trivial.

Start with the basics: wrap your scaler and model in a Pipeline. Graduate to ColumnTransformer when you have mixed feature types. Use GridSearchCV to tune parameters across every pipeline step simultaneously. Build custom transformers when the built-in options do not cover your feature engineering needs.

The investment in learning pipelines pays off immediately. Your cross-validation results become trustworthy because preprocessing is refitted per fold. Your production deployment becomes a single joblib.dump and joblib.load. And your codebase becomes maintainable because the entire transformation and prediction logic lives in one inspectable object.

Related Guides

📚