Sklearn Pipeline: Vollständiger Leitfaden zum Erstellen von ML-Pipelines in Python
Aktualisiert am
Sie haben ein Machine-Learning-Projekt mit fünf Preprocessing-Schritten, drei Feature-Engineering-Operationen und einem finalen Modell. Jeder Schritt ist ein separater Codeblock. Sie fitten Ihren Scaler auf dem gesamten Datensatz und splitten dann in Train und Test. Ihr One-Hot-Encoding erzeugt in Training und Produktion unterschiedliche Spalten. Monate später ändert jemand die Imputationsstrategie, vergisst aber, das Deployment-Skript zu aktualisieren.
Das ist die Realität der meisten ML-Codebases. Manuelle Preprocessing-Pipelines sind fragil, fehleranfällig und eine ständige Quelle für Data Leakage -- die häufigste Ursache dafür, dass Modelle in Notebooks großartig funktionieren, aber auf Live-Daten scheitern. Wenn Sie einen StandardScaler auf dem gesamten Datensatz fitten, bevor Sie splitten, gelangen Test-Set-Statistiken in das Training. Wenn Sie kategoriale Features außerhalb eines einheitlichen Workflows kodieren, bleibt Train-Test-Skew unsichtbar, bis die Produktion bricht.
Die Pipeline von Scikit-learn löst diese Probleme, indem sie Preprocessing und Modellierung zu einem einzigen Objekt verknüpft. Ein Aufruf von fit() trainiert alles. Ein Aufruf von predict() transformiert und sagt voraus. Kein Data Leakage. Keine nicht zusammenpassenden Transformationen. Ein Objekt zum Speichern, Laden und Deployen. Dieser Leitfaden deckt alles ab, was Sie brauchen, um produktionsreife sklearn-Pipelines zu bauen — von den Grundlagen über benutzerdefinierte Transformer bis hin zu realen Deployment-Patterns.
Warum Pipelines wichtig sind
Das Problem des Data Leakage
Data Leakage entsteht, wenn Informationen außerhalb des Trainingssets das Modell während des Trainings beeinflussen. Die häufigste Form im Preprocessing sieht so aus:
# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # Fitted on ALL data, including test
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full datasetDer Scaler berechnet Mittelwert und Standardabweichung aus dem gesamten Datensatz, einschließlich der Test-Samples. Ihre Test-Set-Evaluation ist dadurch zu optimistisch, weil das Modell indirekt Informationen aus diesen Samples während des Preprocessings „gesehen“ hat.
Der korrekte Ansatz:
# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # Fit only on training
X_test_scaled = scaler.transform(X_test) # Transform only, no fittingDas funktioniert, aber der manuelle Ansatz wird schnell unhandlich. Bei fünf Preprocessing-Schritten verwalten Sie fünf fitten Objekte und müssen sich merken, wann fit_transform und wann transform zu verwenden ist. Eine Pipeline erledigt das automatisch.
Code-Organisation
Über Leakage hinaus lösen Pipelines ein Problem der Code-Organisation. Vergleichen Sie diese beiden Ansätze:
| Aspekt | Manuelles Preprocessing | sklearn Pipeline |
|---|---|---|
| Risiko für Data Leakage | Hoch -- leicht versehentlich fit_transform auf Testdaten | Keines -- Pipeline erzwingt korrektes fit/transform |
| Codezeilen für Train + Predict | 10-30 Zeilen pro Umgebung | 2 Zeilen (fit, predict) |
| Deployment in Produktion | Jeden Transformer separat serialisieren, Reihenfolge rekonstruieren | Ein Objekt mit joblib serialisieren |
| Cross-Validation | Alle Schritte pro Fold manuell neu fitten | cross_val_score übernimmt alles |
| Hyperparameter-Tuning | Preprocessing + Modellparameter manuell durchlaufen | GridSearchCV optimiert alle Parameter gemeinsam |
| Reproduzierbarkeit | Hängt von der Ausführungsreihenfolge im Notebook ab | Deterministisch -- gleiches Objekt, gleiches Ergebnis |
| Debugging | Shapes nach jedem Schritt ausgeben, manuell | pipeline.named_steps zur Inspektion |
Grundlegende Pipeline-Nutzung
Die Klasse Pipeline nimmt eine Liste von (name, transformer)-Tuples entgegen. Alle Schritte außer dem letzten müssen fit und transform implementieren. Der letzte Schritt kann ein beliebiger Estimator sein (Classifier, Regressor oder Transformer).
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
# Create pipeline with named steps
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])Fitten und Vorhersagen
Wenn Sie pipe.fit(X_train, y_train) aufrufen, macht die Pipeline Folgendes:
- Ruft
scaler.fit_transform(X_train, y_train)auf -- fitten des Scalers und Transformation der Trainingsdaten - Übergibt die transformierten Daten an
classifier.fit(X_transformed, y_train)
Wenn Sie pipe.predict(X_test) aufrufen, macht die Pipeline Folgendes:
- Ruft
scaler.transform(X_test)auf -- nur transformieren, nicht fitten - Übergibt die transformierten Daten an
classifier.predict(X_transformed)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
# Load data
iris = load_iris()
X, y = iris.data, iris.target
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Build and train pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
pipe.fit(X_train, y_train)
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000Zugriff auf einzelne Schritte
Sie können jeden Schritt nach Namen inspizieren:
# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds: {scaler.scale_}")
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")Sie können auch per Index zugreifen:
# Access by index
first_step = pipe[0] # StandardScaler
last_step = pipe[-1] # LogisticRegression
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1] # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")make_pipeline: Die Kurzform
Wenn Sie keine benutzerdefinierten Schrittnamen benötigen, erzeugt make_pipeline sie automatisch aus den Klassennamen:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
# ('pca', PCA(n_components=2)),
# ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}Automatisch erzeugte Namen sind der Klassenname in Kleinbuchstaben. Wenn Sie denselben Transformer zweimal verwenden, hängt make_pipeline eine Zahl an:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']Pipeline vs make_pipeline
| Feature | Pipeline | make_pipeline |
|---|---|---|
| Benutzerdefinierte Schrittnamen | Ja -- Sie wählen sie | Nein -- automatisch erzeugt |
| Lesbarkeit für große Pipelines | Besser -- beschreibende Namen | Schlechter -- generische Namen |
| Syntax für Hyperparameter-Tuning | stepname__param mit Ihren Namen | classname__param mit Auto-Namen |
| Kürze des Codes | Ausführlicher | Knapper |
| Am besten für | Produktionspipelines, Tuning | Schnelles Prototyping |
Verwenden Sie Pipeline, wenn Sie Hyperparameter tunen oder klare Schrittnamen die Lesbarkeit verbessern. Verwenden Sie make_pipeline für schnelle Experimente.
Häufige Preprocessing-Schritte
Hier sind die am häufigsten verwendeten Transformer in sklearn-Pipelines:
Numerische Features
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])| Transformer | Was es macht | Wann verwenden |
|---|---|---|
StandardScaler | Zentriert auf Mittelwert=0, skaliert auf Std=1 | Standardwahl für die meisten Algorithmen |
MinMaxScaler | Skaliert auf den Bereich [0, 1] | Neuronale Netze, Algorithmen, die empfindlich auf Größenordnungen reagieren |
RobustScaler | Verwendet Median und IQR, robust gegenüber Ausreißern | Daten mit starken Ausreißern |
SimpleImputer | Füllt fehlende Werte (mean, median, most_frequent, constant) | Umgang mit fehlenden Daten |
PolynomialFeatures | Erzeugt polynomielle und Interaktions-Features | Nichtlinearität für lineare Modelle hinzufügen |
PowerTransformer | Wendet Yeo-Johnson- oder Box-Cox-Transformation an | Schiefe Verteilungen |
Kategoriale Features
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
# One-hot encode categorical features
categorical_pipe = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])| Transformer | Was es macht | Wann verwenden |
|---|---|---|
OneHotEncoder | Erstellt binäre Spalten für jede Kategorie | Nominale Kategorien (keine Ordnung) |
OrdinalEncoder | Ordnet Kategorien ganze Zahlen zu | Ordinale Kategorien (low/medium/high) |
TargetEncoder | Kodiert mithilfe von Zielvariablen-Statistiken | Features mit hoher Kardinalität (scikit-learn 1.3+) |
ColumnTransformer für gemischte Datentypen
Echte Datensätze enthalten sowohl numerische als auch kategoriale Spalten. ColumnTransformer wendet unterschiedliche Transformationen parallel auf verschiedene Spaltengruppen an:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
# Sample data with mixed types
data = pd.DataFrame({
'age': [25, 30, np.nan, 45, 50],
'income': [40000, 55000, 60000, np.nan, 90000],
'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
'purchased': [0, 1, 1, 0, 1]
})
X = data.drop('purchased', axis=1)
y = data['purchased']
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# Full pipeline: preprocessing + model
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', LogisticRegression(max_iter=1000))
])
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")Feature-Namen nach der Transformation abrufen
Nach dem Fitten eines ColumnTransformer können Sie die transformierten Feature-Namen abrufen:
# After fitting the pipeline
pipeline.fit(X, y)
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
# 'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']Umgang mit Restspalten
Standardmäßig verwirft ColumnTransformer Spalten, die in keinem Transformer spezifiziert sind. Steuern Sie das mit dem Parameter remainder:
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='passthrough' # Keep unspecified columns as-is
# remainder='drop' # Default: drop unspecified columns
)Pipeline mit GridSearchCV
Eine der stärksten Funktionen von sklearn-Pipelines ist die nahtlose Integration in Hyperparameter-Tuning. Verwenden Sie die Syntax stepname__parameter, um Parameter innerhalb von Pipeline-Schritten anzusprechen:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
# Build pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA()),
('svc', SVC())
])
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
'pca__n_components': [5, 10, 15, 20],
'svc__C': [0.1, 1, 10, 100],
'svc__kernel': ['rbf', 'linear'],
'svc__gamma': ['scale', 'auto']
}
# Run grid search
grid = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid.fit(X_train, y_train)
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score: {grid.best_score_:.4f}")
print(f"Test score: {grid.score(X_test, y_test):.4f}")Tuning von ColumnTransformer-Parametern
Für verschachtelte Pipelines mit ColumnTransformer verketten Sie die Schrittnamen mit doppelten Unterstrichen:
# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
'classifier__C': [0.1, 1, 10]
}RandomizedSearchCV für große Suchräume
Wenn der Parameterraum groß ist, samelt RandomizedSearchCV eine feste Anzahl von Parameterkombinationen:
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
param_distributions = {
'pca__n_components': randint(5, 25),
'svc__C': uniform(0.1, 100),
'svc__kernel': ['rbf', 'linear', 'poly'],
'svc__gamma': uniform(0.001, 1)
}
random_search = RandomizedSearchCV(
pipe,
param_distributions,
n_iter=50, # Sample 50 combinations
cv=5,
scoring='accuracy',
n_jobs=-1,
random_state=42
)
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")Benutzerdefinierte Transformer
FunctionTransformer: Schnelle benutzerdefinierte Schritte
Für einfache zustandslose Transformationen verwenden Sie FunctionTransformer:
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
func=np.log1p,
inverse_func=np.expm1 # Optional inverse for inverse_transform
)
pipe = Pipeline([
('log', log_transformer),
('scaler', StandardScaler())
])
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original: {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")Benutzerdefinierte Transformer-Klasse
Für zustandsbehaftete Transformationen (also solche, die Parameter aus Daten lernen) erstellen Sie eine Klasse, die von BaseEstimator und TransformerMixin erbt:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class OutlierClipper(BaseEstimator, TransformerMixin):
"""Clips values beyond a specified number of standard deviations."""
def __init__(self, n_std=3):
self.n_std = n_std
def fit(self, X, y=None):
# Learn the boundaries from training data
self.mean_ = np.mean(X, axis=0)
self.std_ = np.std(X, axis=0)
self.lower_ = self.mean_ - self.n_std * self.std_
self.upper_ = self.mean_ + self.n_std * self.std_
return self # Always return self from fit
def transform(self, X):
# Apply learned boundaries to any data
X_clipped = np.clip(X, self.lower_, self.upper_)
return X_clippedVerwenden Sie ihn wie jeden eingebauten Transformer in einer Pipeline:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('clip', OutlierClipper(n_std=2)),
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
grid = GridSearchCV(
pipe,
{'clip__n_std': [1.5, 2, 2.5, 3]},
cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")Benutzerdefinierter Transformer für Feature Engineering
Ein praxisnäheres Beispiel -- das Erstellen von Interaktions-Features aus bestimmten Spalten:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
class FeatureInteraction(BaseEstimator, TransformerMixin):
"""Creates multiplication interactions between specified column pairs."""
def __init__(self, interaction_pairs=None):
self.interaction_pairs = interaction_pairs
def fit(self, X, y=None):
# Store column names if DataFrame
if isinstance(X, pd.DataFrame):
self.feature_names_in_ = X.columns.tolist()
else:
self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
return self
def transform(self, X):
X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
if self.interaction_pairs:
for col_a, col_b in self.interaction_pairs:
name = f"{col_a}_x_{col_b}"
X_df[name] = X_df[col_a] * X_df[col_b]
return X_df.values
def get_feature_names_out(self, input_features=None):
names = list(self.feature_names_in_)
if self.interaction_pairs:
for col_a, col_b in self.interaction_pairs:
names.append(f"{col_a}_x_{col_b}")
return np.array(names)FeatureUnion: Paralleles Feature Engineering
Während Pipeline Schritte sequenziell verknüpft, führt FeatureUnion Transformer parallel aus und konkateniert deren Ausgaben horizontal:
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# Create parallel feature branches
feature_union = FeatureUnion([
('scaled', StandardScaler()), # Original features, scaled
('pca', PCA(n_components=2)), # 2 PCA components
('poly', PolynomialFeatures(degree=2, include_bias=False)) # Polynomial features
])
# Combine into a full pipeline
pipe = Pipeline([
('features', feature_union),
('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
pipe.fit(X_train, y_train)
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features: {X_train.shape[1]}")
print(f"After FeatureUnion: {X_transformed.shape[1]}")
print(f"Test accuracy: {pipe.score(X_test, y_test):.4f}")FeatureUnion vs ColumnTransformer
| Feature | FeatureUnion | ColumnTransformer |
|---|---|---|
| Input | Alle Spalten gehen an alle Transformer | Bestimmte Spalten an bestimmte Transformer |
| Output | Konkateniert horizontal | Konkateniert horizontal |
| Use Case | Mehrere Repräsentationen derselben Features | Unterschiedliche Feature-Typen benötigen unterschiedliche Verarbeitung |
| Spaltenauswahl | Nicht möglich -- arbeitet auf allen Spalten | Eingebaute Spaltenauswahl |
| Moderne Alternative | Oft durch ColumnTransformer ersetzt | Für die meisten Anwendungsfälle bevorzugt |
In modernem scikit-learn deckt ColumnTransformer die meisten Fälle ab, für die früher FeatureUnion verwendet wurde. FeatureUnion bleibt nützlich, wenn Sie mehrere Repräsentationen desselben Feature-Sets möchten, z. B. Rohwerte + PCA + polynomiale Features.
Pipelines speichern und laden
Einer der größten Vorteile von Pipelines ist die Einfachheit beim Deployment. Statt jeden Transformer und das Modell separat zu serialisieren, speichern Sie ein einziges Objekt:
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")Pipelines versionieren
Für Produktion sollten Sie Metadaten zusammen mit der Pipeline speichern:
import joblib
import datetime
import sklearn
artifact = {
'pipeline': pipe,
'training_date': datetime.datetime.now().isoformat(),
'sklearn_version': sklearn.__version__,
'feature_names': list(iris.feature_names),
'target_names': list(iris.target_names),
'training_accuracy': pipe.score(X_train, y_train),
'test_accuracy': pipe.score(X_test, y_test),
'n_training_samples': len(X_train)
}
joblib.dump(artifact, 'model_artifact_v1.joblib')
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version: {loaded['sklearn_version']}")
print(f"Test accuracy: {loaded['test_accuracy']:.4f}")
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])pickle verwenden (Alternative)
joblib ist für sklearn-Objekte bevorzugt, weil es große NumPy-Arrays effizient verarbeitet. Standard-pickle funktioniert ebenfalls:
import pickle
# Save
with open('pipeline.pkl', 'wb') as f:
pickle.dump(pipe, f)
# Load
with open('pipeline.pkl', 'rb') as f:
loaded = pickle.load(f)Praxisbeispiel: Vollständige Klassifikations-Pipeline
Hier ist eine vollständige, produktionsreife Pipeline für eine Klassifikationsaufgabe mit gemischten Feature-Typen. Sie verwendet einen Random Forest Classifier mit ColumnTransformer und endet mit einem vollständigen Evaluation Report. Dieses Beispiel verwendet das typische Titanic-Datensatzmuster, dem viele ML-Praktiker begegnen:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
data = pd.DataFrame({
'age': np.random.normal(35, 12, n),
'income': np.random.lognormal(10.5, 0.8, n),
'credit_score': np.random.normal(650, 80, n),
'years_employed': np.random.exponential(5, n),
'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
'promoted': np.random.binomial(1, 0.3, n)
})
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
mask = np.random.random(n) < 0.05
data.loc[mask, col] = np.nan
for col in ['department', 'education']:
mask = np.random.random(n) < 0.03
data.loc[mask, col] = np.nan
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# -- Combine column transformers --
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='drop' # Explicitly drop unlisted columns
)
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(
n_estimators=200,
max_depth=10,
min_samples_leaf=5,
class_weight='balanced',
random_state=42,
n_jobs=-1
))
])# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")# -- Hyperparameter tuning --
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'classifier__n_estimators': [100, 200, 300],
'classifier__max_depth': [5, 10, 15, None],
'classifier__min_samples_leaf': [3, 5, 10]
}
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
print(f"Test score: {grid_search.score(X_test, y_test):.4f}")# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': importances
}).sort_values('importance', ascending=False)
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))# -- Save the final pipeline --
import joblib
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
'age': [28],
'income': [65000],
'credit_score': [720],
'years_employed': [3.5],
'department': ['Engineering'],
'education': ['Master'],
'city': ['NYC']
})
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")Dieses Beispiel demonstriert jedes wichtige Pipeline-Muster: gemischte Feature-Typen, Umgang mit fehlenden Werten, Cross-Validation, Hyperparameter-Tuning, Feature-Inspektion und Produktions-Serialisierung.
Pipeline mit unterschiedlichen Modelltypen
Dasselbe Preprocessing kann für verschiedene Modelle verwendet werden. Tauschen Sie nur den letzten Schritt aus:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
# Reuse the preprocessor from the previous example
models = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'SVM': SVC(kernel='rbf', random_state=42)
}
results = {}
for name, model in models.items():
pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', model)
])
scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
results[name] = {
'mean': scores.mean(),
'std': scores.std()
}
print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")Sie können den Estimator-Schritt auch dynamisch setzen:
# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")Pipelines in der Regression
Pipelines funktionieren für Regressionsaufgaben genauso. Details zur linearen Regression mit sklearn finden Sie in unserem Leitfaden zur sklearn linearen Regression.
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
'sqft': np.random.normal(1500, 400, n),
'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
'age': np.random.uniform(0, 50, n),
'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
housing['sqft'] * 200
+ housing['bedrooms'] * 15000
- housing['age'] * 1000
+ np.random.normal(0, 20000, n)
)
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
preprocessor = ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
]), categorical_features)
])
reg_pipeline = Pipeline([
('preprocessor', preprocessor),
('regressor', Ridge(alpha=1.0))
])
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")Schritte mit passthrough und None überspringen
Sie können Schritte bedingt überspringen, indem Sie sie auf 'passthrough' oder None setzen:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('scaler', StandardScaler()),
('reduce_dim', PCA()),
('classifier', SVC())
])
# Grid search can toggle steps on/off
param_grid = [
{
'reduce_dim': [PCA(5), PCA(10), PCA(15)],
'classifier__C': [1, 10]
},
{
'reduce_dim': ['passthrough'], # Skip PCA entirely
'classifier__C': [1, 10]
}
]
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score: {grid.best_score_:.4f}")Caching von Pipeline-Schritten
Beim Tuning von Hyperparametern können Zwischenschritte redundantly neu berechnet werden. Aktivieren Sie Caching, um das zu vermeiden:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
# Create a temporary cache directory
cachedir = mkdtemp()
pipe = Pipeline(
[
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('svc', SVC())
],
memory=cachedir # Cache intermediate transformations
)
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
# Clean up when done
# rmtree(cachedir)Daten vor dem Aufbau von Pipelines erkunden
Bevor Sie eine Pipeline konstruieren, müssen Sie Ihre Features verstehen: Verteilungen, Muster fehlender Werte, Korrelationen und mögliche Transformationen. PyGWalker (opens in a new tab) ermöglicht es Ihnen, jedes Pandas-DataFrame direkt in Jupyter-Notebooks in eine interaktive visuelle Erkundungsoberfläche zu verwandeln:
import pandas as pd
import pygwalker as pyg
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)Diese Art der visuellen Exploration hilft Ihnen zu entscheiden, welche Features skaliert werden müssen, welche Ausreißer haben und gekappt werden sollten und welche kategorialen Features eine hohe Kardinalität aufweisen. Sie können Muster fehlender Werte erkennen und Feature-Verteilungen verstehen, bevor Sie eine einzige Zeile Pipeline-Code schreiben.
Für die Iteration in Ihrem gesamten Experimentier-Workflow -- verschiedene Preprocessing-Strategien testen, Modellleistung vergleichen und Ergebnisse nachverfolgen -- bietet RunCell (opens in a new tab) eine KI-gestützte Jupyter-Umgebung, in der ein Agent bei Codegenerierung, Debugging und Experimentverwaltung unterstützt.
Häufige Fallstricke und Debugging-Tipps
Fallstrick 1: Vergessen, die Pipeline für Vorhersagen zu verwenden
# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)Fallstrick 2: Preprocessing außerhalb der Pipeline fitten
# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train) # Pipeline handles the scalingFallstrick 3: Falsche Parameternamen in GridSearchCV
Die Syntax stepname__param muss exakt zu Ihren Pipeline-Schrittnamen passen:
pipe = Pipeline([
('my_scaler', StandardScaler()),
('my_clf', LogisticRegression())
])
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]} # KeyError
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}Fallstrick 4: Spaltenreihenfolge ändert sich
Wenn Sie ColumnTransformer mit Pandas DataFrames verwenden (die Sie mit pandas read_csv laden können), hängt die Spaltenreihenfolge im Output von der Reihenfolge der Transformer ab, nicht vom ursprünglichen DataFrame:
# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
('num', numeric_transformer, numeric_features), # These come first
('cat', categorical_transformer, categorical_features) # These come second
])Debugging von Zwischenoutputs
Um zu sehen, was in jedem Schritt passiert:
# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1] # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas") # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debugDebugging von Shape-Mismatches
# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
for name, step in pipe.named_steps.items():
if hasattr(step, 'transform'):
# Check if the step has been fitted
try:
X_train = step.transform(X_train)
print(f"After '{name}': {X_train.shape}")
except Exception as e:
print(f"Error at '{name}': {e}")
breakPipeline-Methodenreferenz
| Methode | Beschreibung |
|---|---|
fit(X, y) | Alle Transformer und den finalen Estimator fitten |
predict(X) | X durch alle Schritte transformieren, dann mit dem finalen Estimator vorhersagen |
predict_proba(X) | Transformieren und Wahrscheinlichkeiten zurückgeben (nur Classifier) |
transform(X) | X durch alle Schritte transformieren (wenn der letzte Schritt ein Transformer ist) |
fit_transform(X, y) | Fit und Transform in einem Aufruf |
fit_predict(X, y) | Fit und Vorhersage in einem Aufruf |
score(X, y) | Transformieren und bewerten (Accuracy für Classifier, R2 für Regressoren) |
set_params(**params) | Parameter mit stepname__param-Syntax setzen |
get_params() | Alle Parameter abrufen |
named_steps | Dictionary-artiger Zugriff auf Pipeline-Schritte |
[i] oder [name] | Schritt per Index oder Name abrufen |
[start:end] | Slicing, um eine Sub-Pipeline zu erstellen |
FAQ
Was ist der Unterschied zwischen Pipeline und make_pipeline?
Pipeline verlangt, dass Sie für jeden Schritt (name, estimator)-Tuples angeben, wodurch Sie explizite Kontrolle über die Schrittnamen erhalten. make_pipeline akzeptiert direkte Estimator-Instanzen und erzeugt Namen automatisch aus den Klassennamen (in Kleinbuchstaben). Verwenden Sie Pipeline, wenn Sie beschreibende Namen benötigen oder Hyperparameter mit GridSearchCV tunen möchten. Verwenden Sie make_pipeline für schnelles Prototyping.
Verhindert sklearn Pipeline Data Leakage?
Ja. Wenn Sie pipeline.fit(X_train, y_train) aufrufen, werden alle Transformer nur auf den Trainingsdaten gefittet. Bei Cross-Validation mit cross_val_score oder GridSearchCV fitten sich alle Schritte auf jedem Trainings-Fold neu, sodass keine Test-Fold-Daten in das Preprocessing einfließen. Das ist der Hauptvorteil gegenüber manuellem Preprocessing.
Kann ich Pipeline mit Deep-Learning-Modellen verwenden?
Scikit-learn-Pipelines funktionieren mit jedem Estimator, der der sklearn-API folgt (implementiert fit, predict und optional transform). Bibliotheken wie scikeras stellen sklearn-kompatible Wrapper für Keras-Modelle bereit, sodass sie in Pipelines verwendet werden können. Auch XGBoost und LightGBM bieten sklearn-kompatible Schnittstellen.
Wie gehe ich mit Feature-Namen nach ColumnTransformer um?
Rufen Sie nach dem Fitten pipeline.named_steps['preprocessor'].get_feature_names_out() auf. Das gibt ein Array von Feature-Namen mit Präfixen zurück, die anzeigen, welcher Transformer sie erzeugt hat (z. B. num__age, cat__city_NYC). Das funktioniert in scikit-learn 1.0 und neuer.
Kann ich mehrere Modelle in einer Pipeline haben?
Nein. Eine Pipeline ist eine lineare Sequenz, bei der der letzte Schritt der Estimator ist. Wenn Sie mehrere Modelle vergleichen möchten, erstellen Sie separate Pipelines mit denselben Preprocessing-Schritten, aber unterschiedlichen finalen Estimators. Das können Sie automatisieren, indem Sie über ein Modell-Dictionary iterieren und für jedes Modell eine Pipeline erstellen.
Wie überspringe ich einen Schritt in einer Pipeline während GridSearchCV?
Setzen Sie den Schritt im Parameter-Gitter auf 'passthrough'. Zum Beispiel überspringt {'reduce_dim': ['passthrough']} den reduce_dim-Schritt in dieser Grid-Search-Iteration vollständig. Sie können einen Schritt auch auf None setzen, aber 'passthrough' ist der empfohlene Ansatz.
Was passiert, wenn die Pipeline bei der Vorhersage auf unbekannte Kategorien trifft?
Wenn Sie OneHotEncoder mit handle_unknown='ignore' innerhalb Ihrer Pipeline verwenden, werden unbekannte Kategorien als lauter Nullen kodiert. Ohne diese Einstellung wirft die Pipeline einen Fehler. Setzen Sie in produktiven Pipelines, in denen neue Kategorien auftreten können, immer handle_unknown='ignore'.
Fazit
Sklearn Pipeline verwandelt chaotischen, fehleranfälligen ML-Code in saubere, reproduzierbare Workflows. Indem Preprocessing und Modellierung in einem einzigen Objekt verknüpft werden, eliminieren Sie Data Leakage, vereinfachen das Deployment und machen Hyperparameter-Tuning über den gesamten Workflow hinweg trivial.
Beginnen Sie mit den Grundlagen: Verpacken Sie Ihren Scaler und Ihr Modell in eine Pipeline. Wechseln Sie zu ColumnTransformer, wenn Sie gemischte Feature-Typen haben. Verwenden Sie GridSearchCV, um Parameter über alle Pipeline-Schritte hinweg gleichzeitig zu optimieren. Erstellen Sie benutzerdefinierte Transformer, wenn die eingebauten Optionen Ihre Feature-Engineering-Anforderungen nicht abdecken.
Die Investition in das Erlernen von Pipelines zahlt sich sofort aus. Ihre Cross-Validation-Ergebnisse werden vertrauenswürdig, weil das Preprocessing pro Fold neu gefittet wird. Ihr Produktions-Deployment wird zu einem einzigen joblib.dump und joblib.load. Und Ihre Codebase wird wartbar, weil die gesamte Transformations- und Vorhersagelogik in einem inspizierbaren Objekt lebt.
Verwandte Leitfäden
- Sklearn Linear Regression -- Regression-Modelle bauen, die sich natürlich in Pipelines einfügen
- Sklearn Confusion Matrix -- die Klassifikationsausgabe Ihrer Pipeline bewerten
- Sklearn Random Forest -- ein leistungsstarker Classifier als letzter Pipeline-Schritt
- Pandas read_csv -- CSV-Daten in DataFrames laden, bevor sie einer Pipeline zugeführt werden