Skip to content

Sklearn Pipeline:用 Python 构建 ML Pipeline 的完整指南

更新于

你正在做一个机器学习项目,其中包含五个预处理步骤、三个特征工程操作,以及一个最终模型。每个步骤都是一段独立的代码。你先在整个数据集上拟合 scaler,然后再拆分训练集和测试集。你的 one-hot encoding 在训练阶段生成的列,与生产环境中的列不一致。几个月后,有人修改了缺失值填充策略,却忘了同步更新部署脚本。

这就是大多数 ML 代码库的真实写照。手工预处理流水线很脆弱、容易出错,而且始终是 data leakage 的重要来源之一——这是模型在 notebook 里表现很好、但在真实数据上失败的最常见原因。当你在拆分数据之前就用整个数据集拟合 StandardScaler 时,测试集统计信息就会泄露到训练过程中。当你在一个统一流程之外对类别特征做编码时,训练集和测试集之间的偏差会一直隐藏,直到生产环境崩溃。

Scikit-learn 的 Pipeline 通过将预处理和建模串联成一个对象来解决这些问题。一次 fit() 就能训练所有步骤。一次 predict() 就能完成转换和预测。没有数据泄露,没有转换不一致。一个对象即可保存、加载和部署。本指南将覆盖你构建生产级 sklearn pipeline 所需的一切内容,从基础用法到自定义 transformer 和真实世界部署模式。

为什么 Pipeline 很重要

数据泄露问题

当训练过程中使用了训练集之外的信息时,就会发生 data leakage。预处理里最常见的形式如下:

# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
 
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # Fitted on ALL data, including test
 
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full dataset

scaler 会从整个数据集计算均值和标准差,包括测试样本。这样你的测试集评估会显得过于乐观,因为模型在预处理阶段间接“看到了”这些样本的信息。

正确做法是:

# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
 
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # Fit only on training
X_test_scaled = scaler.transform(X_test)         # Transform only, no fitting

这虽然可行,但手动方式很快就会变得难以管理。面对五个预处理步骤时,你要跟踪五个已拟合对象,并记住每一步该用 fit_transform 还是 transform。Pipeline 会自动帮你处理这些问题。

代码组织

除了避免泄露,pipeline 还能解决代码组织问题。对比这两种方式:

AspectManual Preprocessingsklearn Pipeline
Data leakage riskHigh -- easy to call fit_transform on test dataNone -- pipeline enforces correct fit/transform
Code lines for train + predict10-30 lines per environment2 lines (fit, predict)
Deploying to productionSerialize each transformer separately, reconstruct orderSerialize one object with joblib
Cross-validationMust manually refit all steps per foldcross_val_score handles everything
Hyperparameter tuningManually loop through preprocessing + model paramsGridSearchCV tunes all params together
ReproducibilityDepends on execution order in notebookDeterministic -- same object, same result
DebuggingPrint shapes after each step, manuallypipeline.named_steps for inspection

Pipeline 基础用法

Pipeline 类接收一个 (name, transformer) 元组列表。除最后一步外,其他步骤都必须实现 fittransform。最后一步可以是任意 estimator(classifier、regressor 或 transformer)。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
 
# Create pipeline with named steps
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

拟合与预测

当你调用 pipe.fit(X_train, y_train) 时,pipeline 会:

  1. 调用 scaler.fit_transform(X_train, y_train) -- 拟合 scaler 并转换训练数据
  2. 将转换后的数据传给 classifier.fit(X_transformed, y_train)

当你调用 pipe.predict(X_test) 时,pipeline 会:

  1. 调用 scaler.transform(X_test) -- 只转换,不拟合
  2. 将转换后的数据传给 classifier.predict(X_transformed)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
 
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
# Build and train pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000

访问各个步骤

你可以通过名称查看任意步骤:

# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds:  {scaler.scale_}")
 
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")

你也可以使用索引:

# Access by index
first_step = pipe[0]   # StandardScaler
last_step = pipe[-1]    # LogisticRegression
 
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1]  # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")

make_pipeline:更简洁的写法

当你不需要自定义步骤名称时,make_pipeline 会根据类名自动生成名称:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
 
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
#                          ('pca', PCA(n_components=2)),
#                          ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
 
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}

自动生成的名称是类名的小写形式。如果你重复使用同一种 transformer,make_pipeline 会自动追加数字:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
 
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']

Pipeline vs make_pipeline

FeaturePipelinemake_pipeline
Custom step namesYes -- you choose themNo -- auto-generated
Readability for large pipelinesBetter -- descriptive namesWorse -- generic names
Hyperparameter tuning syntaxstepname__param with your namesclassname__param with auto names
Code brevityMore verboseMore concise
Best forProduction pipelines, tuningQuick prototyping

当你计划调参,或者更清晰的步骤名有助于可读性时,使用 Pipeline。当你只是快速实验时,使用 make_pipeline

常见预处理步骤

下面是 sklearn pipeline 中最常用的 transformer:

数值特征

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
 
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
TransformerWhat It DoesWhen to Use
StandardScalerCenters to mean=0, scales to std=1Default choice for most algorithms
MinMaxScalerScales to [0, 1] rangeNeural networks, algorithms sensitive to magnitude
RobustScalerUses median and IQR, robust to outliersData with significant outliers
SimpleImputerFills missing values (mean, median, most_frequent, constant)Missing data handling
PolynomialFeaturesGenerates polynomial and interaction featuresAdding nonlinearity to linear models
PowerTransformerApplies Yeo-Johnson or Box-Cox transformSkewed distributions

类别特征

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
 
# One-hot encode categorical features
categorical_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
TransformerWhat It DoesWhen to Use
OneHotEncoderCreates binary columns for each categoryNominal categories (no order)
OrdinalEncoderMaps categories to integersOrdinal categories (low/medium/high)
TargetEncoderEncodes using target variable statisticsHigh-cardinality features (scikit-learn 1.3+)

用 ColumnTransformer 处理混合数据类型

真实数据集通常同时包含数值列和类别列。ColumnTransformer 会并行地对不同列子集应用不同的转换:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
 
# Sample data with mixed types
data = pd.DataFrame({
    'age': [25, 30, np.nan, 45, 50],
    'income': [40000, 55000, 60000, np.nan, 90000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
    'purchased': [0, 1, 1, 0, 1]
})
 
X = data.drop('purchased', axis=1)
y = data['purchased']
 
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
 
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)
 
# Full pipeline: preprocessing + model
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")

获取转换后的特征名称

在拟合 ColumnTransformer 后,你可以获取转换后的特征名称:

# After fitting the pipeline
pipeline.fit(X, y)
 
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
#  'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']

处理剩余列

默认情况下,ColumnTransformer 会丢弃所有未指定给任何 transformer 的列。可以通过 remainder 参数控制:

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough'  # Keep unspecified columns as-is
    # remainder='drop'       # Default: drop unspecified columns
)

结合 GridSearchCV 的 Pipeline

sklearn pipeline 最强大的特性之一,就是能与超参数调优无缝结合。使用 stepname__parameter 语法可以引用 pipeline 内部步骤的参数:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
 
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
 
# Build pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svc', SVC())
])
 
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
    'pca__n_components': [5, 10, 15, 20],
    'svc__C': [0.1, 1, 10, 100],
    'svc__kernel': ['rbf', 'linear'],
    'svc__gamma': ['scale', 'auto']
}
 
# Run grid search
grid = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid.fit(X_train, y_train)
 
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score:   {grid.best_score_:.4f}")
print(f"Test score:      {grid.score(X_test, y_test):.4f}")

调整 ColumnTransformer 参数

对于带有 ColumnTransformer 的嵌套 pipeline,可以用双下划线连接步骤名称:

# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
    'classifier__C': [0.1, 1, 10]
}

在大搜索空间中使用 RandomizedSearchCV

当参数空间很大时,RandomizedSearchCV 会随机采样固定数量的参数组合:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
 
param_distributions = {
    'pca__n_components': randint(5, 25),
    'svc__C': uniform(0.1, 100),
    'svc__kernel': ['rbf', 'linear', 'poly'],
    'svc__gamma': uniform(0.001, 1)
}
 
random_search = RandomizedSearchCV(
    pipe,
    param_distributions,
    n_iter=50,       # Sample 50 combinations
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)
 
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score:   {random_search.best_score_:.4f}")

自定义 Transformer

FunctionTransformer:快速自定义步骤

对于简单的无状态转换,可以使用 FunctionTransformer

from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
 
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1  # Optional inverse for inverse_transform
)
 
pipe = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler())
])
 
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original:    {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")

自定义 Transformer 类

对于有状态的转换(即会从数据中学习参数的转换),创建一个继承自 BaseEstimatorTransformerMixin 的类:

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
 
class OutlierClipper(BaseEstimator, TransformerMixin):
    """Clips values beyond a specified number of standard deviations."""
 
    def __init__(self, n_std=3):
        self.n_std = n_std
 
    def fit(self, X, y=None):
        # Learn the boundaries from training data
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        self.lower_ = self.mean_ - self.n_std * self.std_
        self.upper_ = self.mean_ + self.n_std * self.std_
        return self  # Always return self from fit
 
    def transform(self, X):
        # Apply learned boundaries to any data
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped

把它像内置 transformer 一样放进 pipeline 中:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('clip', OutlierClipper(n_std=2)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
 
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
 
grid = GridSearchCV(
    pipe,
    {'clip__n_std': [1.5, 2, 2.5, 3]},
    cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")

用于特征工程的自定义 Transformer

下面是一个更实用的例子——为指定列创建交互特征:

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
 
class FeatureInteraction(BaseEstimator, TransformerMixin):
    """Creates multiplication interactions between specified column pairs."""
 
    def __init__(self, interaction_pairs=None):
        self.interaction_pairs = interaction_pairs
 
    def fit(self, X, y=None):
        # Store column names if DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.tolist()
        else:
            self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
        return self
 
    def transform(self, X):
        X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
 
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                name = f"{col_a}_x_{col_b}"
                X_df[name] = X_df[col_a] * X_df[col_b]
 
        return X_df.values
 
    def get_feature_names_out(self, input_features=None):
        names = list(self.feature_names_in_)
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                names.append(f"{col_a}_x_{col_b}")
        return np.array(names)

FeatureUnion:并行特征工程

Pipeline 是顺序串联步骤,而 FeatureUnion 会并行运行多个 transformer,并将输出按水平方向拼接:

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
# Create parallel feature branches
feature_union = FeatureUnion([
    ('scaled', StandardScaler()),             # Original features, scaled
    ('pca', PCA(n_components=2)),             # 2 PCA components
    ('poly', PolynomialFeatures(degree=2, include_bias=False))  # Polynomial features
])
 
# Combine into a full pipeline
pipe = Pipeline([
    ('features', feature_union),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features:    {X_train.shape[1]}")
print(f"After FeatureUnion:   {X_transformed.shape[1]}")
print(f"Test accuracy:        {pipe.score(X_test, y_test):.4f}")

FeatureUnion vs ColumnTransformer

FeatureFeatureUnionColumnTransformer
InputAll columns go to all transformersSpecific columns to specific transformers
OutputConcatenates horizontallyConcatenates horizontally
Use caseMultiple representations of the same featuresDifferent types of features need different processing
Column selectionCannot select -- operates on all columnsBuilt-in column specification
Modern alternativeOften replaced by ColumnTransformerPreferred for most use cases

在现代 scikit-learn 中,ColumnTransformer 已经覆盖了过去 FeatureUnion 的大多数使用场景。FeatureUnion 仍然适合用于同一组特征的多种表示方式(例如原始值 + PCA + polynomial features)。

保存和加载 Pipeline

pipeline 最大的优势之一,就是部署更简单。你不需要分别序列化每个 transformer 和模型,只需保存一个对象:

import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
 
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
 
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")

给 Pipeline 做版本管理

在生产环境中,应将元数据与 pipeline 一起保存:

import joblib
import datetime
import sklearn
 
artifact = {
    'pipeline': pipe,
    'training_date': datetime.datetime.now().isoformat(),
    'sklearn_version': sklearn.__version__,
    'feature_names': list(iris.feature_names),
    'target_names': list(iris.target_names),
    'training_accuracy': pipe.score(X_train, y_train),
    'test_accuracy': pipe.score(X_test, y_test),
    'n_training_samples': len(X_train)
}
 
joblib.dump(artifact, 'model_artifact_v1.joblib')
 
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version:  {loaded['sklearn_version']}")
print(f"Test accuracy:    {loaded['test_accuracy']:.4f}")
 
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])

使用 pickle(替代方案)

joblib 更适合 sklearn 对象,因为它能高效处理大型 NumPy 数组。标准的 pickle 也可以:

import pickle
 
# Save
with open('pipeline.pkl', 'wb') as f:
    pickle.dump(pipe, f)
 
# Load
with open('pipeline.pkl', 'rb') as f:
    loaded = pickle.load(f)

真实世界示例:完整分类 Pipeline

下面是一个用于分类任务的完整、生产级 pipeline,包含混合类型特征。它使用带 ColumnTransformerRandom Forest classifier,并最终输出完整的 evaluation report。这个例子采用了大多数 ML 从业者都会遇到的 Titanic 风格数据集模式:

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
    train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
 
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
 
data = pd.DataFrame({
    'age': np.random.normal(35, 12, n),
    'income': np.random.lognormal(10.5, 0.8, n),
    'credit_score': np.random.normal(650, 80, n),
    'years_employed': np.random.exponential(5, n),
    'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
    'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
    'promoted': np.random.binomial(1, 0.3, n)
})
 
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
    mask = np.random.random(n) < 0.05
    data.loc[mask, col] = np.nan
 
for col in ['department', 'education']:
    mask = np.random.random(n) < 0.03
    data.loc[mask, col] = np.nan
 
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")
# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
 
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set:     {X_test.shape[0]} samples")
# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
 
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# -- Combine column transformers --
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'  # Explicitly drop unlisted columns
)
 
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(
        n_estimators=200,
        max_depth=10,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    ))
])
# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
 
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
 
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
# -- Hyperparameter tuning --
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [5, 10, 15, None],
    'classifier__min_samples_leaf': [3, 5, 10]
}
 
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid_search.fit(X_train, y_train)
 
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score:   {grid_search.best_score_:.4f}")
print(f"Test score:      {grid_search.score(X_test, y_test):.4f}")
# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
 
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
 
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values('importance', ascending=False)
 
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))
# -- Save the final pipeline --
import joblib
 
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
 
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
 
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
    'age': [28],
    'income': [65000],
    'credit_score': [720],
    'years_employed': [3.5],
    'department': ['Engineering'],
    'education': ['Master'],
    'city': ['NYC']
})
 
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")

这个示例展示了所有关键 pipeline 模式:混合特征类型、缺失值处理、交叉验证、超参数调优、特征检查以及生产序列化。

适用于不同模型类型的 Pipeline

同一套预处理 pipeline 可以服务不同模型。你只需要替换最后一步:

from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
 
# Reuse the preprocessor from the previous example
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42)
}
 
results = {}
for name, model in models.items():
    pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
    results[name] = {
        'mean': scores.mean(),
        'std': scores.std()
    }
    print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")

你也可以动态替换 estimator 步骤:

# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")

回归任务中的 Pipeline

Pipeline 在回归任务中的工作方式完全相同。有关 sklearn 线性回归的更多细节,请参见我们的 sklearn linear regression guide

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
 
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
    'sqft': np.random.normal(1500, 400, n),
    'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
    'age': np.random.uniform(0, 50, n),
    'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
    'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
    housing['sqft'] * 200
    + housing['bedrooms'] * 15000
    - housing['age'] * 1000
    + np.random.normal(0, 20000, n)
)
 
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
 
preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('poly', PolynomialFeatures(degree=2, include_bias=False)),
        ('scaler', StandardScaler())
    ]), numeric_features),
    ('cat', Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ]), categorical_features)
])
 
reg_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', Ridge(alpha=1.0))
])
 
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
 
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")

使用 passthrough 和 None 跳过步骤

你可以通过将某一步设置为 'passthrough'None 来有条件地跳过它:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
 
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('reduce_dim', PCA()),
    ('classifier', SVC())
])
 
# Grid search can toggle steps on/off
param_grid = [
    {
        'reduce_dim': [PCA(5), PCA(10), PCA(15)],
        'classifier__C': [1, 10]
    },
    {
        'reduce_dim': ['passthrough'],  # Skip PCA entirely
        'classifier__C': [1, 10]
    }
]
 
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score:  {grid.best_score_:.4f}")

缓存 Pipeline 步骤

在调参时,中间步骤可能会被重复计算。启用缓存可以避免这种重复:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
 
# Create a temporary cache directory
cachedir = mkdtemp()
 
pipe = Pipeline(
    [
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=10)),
        ('svc', SVC())
    ],
    memory=cachedir  # Cache intermediate transformations
)
 
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
 
# Clean up when done
# rmtree(cachedir)

在构建 Pipeline 前探索数据

在构建 pipeline 之前,你需要了解特征:它们的分布、缺失值模式、相关性,以及可能适合的转换方式。PyGWalker (opens in a new tab) 可以让你在 Jupyter notebook 中直接把任何 Pandas DataFrame 变成交互式可视化探索界面:

import pandas as pd
import pygwalker as pyg
 
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)

这种可视化探索能帮助你判断哪些特征需要 scaling,哪些存在需要裁剪的 outliers,以及哪些类别特征具有较高 cardinality。你可以在写任何 pipeline 代码之前,就发现缺失值模式并理解特征分布。

如果你想迭代完整的 pipeline 实验流程——测试不同预处理策略、比较模型表现并跟踪结果——RunCell (opens in a new tab) 提供了一个 AI 驱动的 Jupyter 环境,agent 可协助代码生成、调试和实验管理。

常见陷阱与调试技巧

陷阱 1:预测时忘记使用 Pipeline

# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
 
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)

陷阱 2:在 Pipeline 外部拟合预处理

# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
 
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train)  # Pipeline handles the scaling

陷阱 3:GridSearchCV 中参数名写错

stepname__param 语法必须与 pipeline 中定义的步骤名完全一致:

pipe = Pipeline([
    ('my_scaler', StandardScaler()),
    ('my_clf', LogisticRegression())
])
 
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]}  # KeyError
 
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}

陷阱 4:列顺序变化

当你使用 Pandas DataFrame 配合 ColumnTransformer 时(可通过 pandas read_csv 加载),输出中的列顺序取决于 transformer 的顺序,而不是原始 DataFrame 的顺序:

# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
    ('num', numeric_transformer, numeric_features),     # These come first
    ('cat', categorical_transformer, categorical_features)  # These come second
])

调试中间输出

要查看每一步发生了什么:

# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
 
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1]  # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
 
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas")  # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debug

调试 shape 不匹配

# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
 
for name, step in pipe.named_steps.items():
    if hasattr(step, 'transform'):
        # Check if the step has been fitted
        try:
            X_train = step.transform(X_train)
            print(f"After '{name}': {X_train.shape}")
        except Exception as e:
            print(f"Error at '{name}': {e}")
            break

Pipeline 方法参考

MethodDescription
fit(X, y)Fit all transformers and the final estimator
predict(X)Transform X through all steps, then predict with the final estimator
predict_proba(X)Transform and get probability estimates (classifiers only)
transform(X)Transform X through all steps (if the last step is a transformer)
fit_transform(X, y)Fit and transform in one call
fit_predict(X, y)Fit and predict in one call
score(X, y)Transform and score (accuracy for classifiers, R2 for regressors)
set_params(**params)Set parameters using stepname__param syntax
get_params()Get all parameters
named_stepsDictionary-like access to pipeline steps
[i] or [name]Access step by index or name
[start:end]Slice to create a sub-pipeline

FAQ

Pipeline 和 make_pipeline 有什么区别?

Pipeline 要求你为每个步骤提供 (name, estimator) 元组,因此可以显式控制步骤名称。make_pipeline 接收直接的 estimator 实例,并自动根据类名生成名称(转为小写)。当你需要描述性名称,或者计划使用 GridSearchCV 调参时,使用 Pipeline。当你只是快速原型验证时,使用 make_pipeline

sklearn Pipeline 能防止数据泄露吗?

可以。当你调用 pipeline.fit(X_train, y_train) 时,每个 transformer 都只会在训练数据上拟合。在使用 cross_val_scoreGridSearchCV 做交叉验证时,pipeline 会在每个训练折上重新拟合所有步骤,确保测试折的数据不会泄露到预处理过程中。这是它相对手工预处理的核心优势。

我可以在 Pipeline 中使用深度学习模型吗?

可以,只要该模型遵循 sklearn API(实现 fitpredict,以及可选的 transform)。像 scikeras 这样的库提供了与 sklearn 兼容的 Keras 封装,使其可用于 pipeline。XGBoost 和 LightGBM 也提供了 sklearn 兼容接口。

如何在 ColumnTransformer 之后获取特征名称?

在拟合后调用 pipeline.named_steps['preprocessor'].get_feature_names_out()。这会返回一个特征名称数组,名称前缀会标明它来自哪个 transformer(例如 num__agecat__city_NYC)。该功能在 scikit-learn 1.0 及以后版本可用。

一个 Pipeline 里可以有多个模型吗?

不可以。Pipeline 是一个线性序列,最后一步是 estimator。如果你想比较多个模型,可以为相同的预处理步骤创建多个 pipeline,并为它们设置不同的最终 estimator。你可以通过遍历模型字典来自动化这个过程。

如何在 GridSearchCV 中跳过某一步?

在参数网格中把该步骤设置为 'passthrough'。例如:{'reduce_dim': ['passthrough']} 会在该次 grid search 迭代中完全跳过 reduce_dim 步骤。你也可以将步骤设为 None,但推荐使用 'passthrough'

如果 Pipeline 在预测时遇到未见过的类别怎么办?

如果你在 pipeline 中使用 OneHotEncoder 并设置 handle_unknown='ignore',未见过的类别会被编码为全 0。否则 pipeline 会报错。在生产环境中只要类别取值可能变化,就应始终设置 handle_unknown='ignore'

结论

Sklearn Pipeline 将混乱、易出错的 ML 代码转变为干净、可复现的工作流。通过把预处理和建模串联成一个对象,你可以消除数据泄露、简化部署,并让整个工作流的超参数调优变得非常简单。

从基础开始:把 scaler 和模型包进一个 Pipeline。当你有混合特征类型时,升级到 ColumnTransformer。使用 GridSearchCV 同时调优 pipeline 每个步骤的参数。当内置选项无法满足你的特征工程需求时,构建自定义 transformer。

学习 pipeline 的投入会立刻得到回报。你的交叉验证结果会更加可信,因为每一折都会重新拟合预处理。你的生产部署会简化为一次 joblib.dumpjoblib.load。而你的代码库也会更容易维护,因为整个转换和预测逻辑都集中在一个可检查的对象中。

相关指南

📚