Skip to content
トピック
Scikit-Learn
Sklearn Pipeline: PythonでMLパイプラインを構築する完全ガイド

Sklearn Pipeline: PythonでMLパイプラインを構築する完全ガイド

更新日

機械学習プロジェクトに、5段階の前処理、3つの特徴量エンジニアリング操作、そして最終モデルがあるとします。各ステップは別々のコードブロックです。スケーラーを全データセットに対して fit してから train/test 分割を行い、ワンホットエンコーディングによって学習時と本番環境で異なる列が作られてしまう。数か月後、誰かが補完戦略を変更したのに、デプロイスクリプトの更新を忘れてしまう。

これが多くの ML コードベースの現実です。手動の前処理パイプラインは脆弱で、ミスを誘発しやすく、データリーケージの絶え間ない原因になります。データリーケージは、ノートブックでは高精度なのに実運用では失敗するモデルの、最も一般的な原因です。StandardScaler を分割前の全データセットに対して fit すると、テストセットの統計情報が学習に漏れ込みます。統一されたワークフローの外でカテゴリ特徴量をエンコードすると、train-test のずれは本番で壊れるまで見えません。

Scikit-learn の Pipeline は、前処理とモデリングを 1 つのオブジェクトに連結することで、これらの問題を解決します。fit() を 1 回呼ぶだけで全体を学習し、predict() を 1 回呼ぶだけで変換と予測を行います。データリーケージなし。変換の不一致なし。保存・読み込み・デプロイが 1 つのオブジェクトで完結します。このガイドでは、基本的な使い方からカスタムトランスフォーマー、実運用のデプロイパターンまで、sklearn パイプライン構築に必要なすべてを解説します。

なぜパイプラインが重要なのか

データリーケージの問題

データリーケージは、学習データの外側にある情報が学習中にモデルへ影響を与えるときに発生します。前処理で最も一般的な形は次のようなものです。

# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
 
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # Fitted on ALL data, including test
 
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full dataset

このスケーラーは、テストサンプルを含むデータセット全体から平均と標準偏差を計算しています。そのため、前処理の段階でそのサンプルに関する情報を間接的に「見て」しまっており、テスト評価が楽観的になります。

正しい方法は次のとおりです。

# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
 
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # Fit only on training
X_test_scaled = scaler.transform(X_test)         # Transform only, no fitting

これでも動きますが、手動方式はすぐに扱いづらくなります。5つの前処理があれば、5つの学習済みオブジェクトを管理し、それぞれで fit_transformtransform のどちらを使うべきか覚えておく必要があります。Pipeline を使えば、これを自動化できます。

コードの整理

リーケージ以外にも、パイプラインはコードの整理という課題を解決します。次の 2 つのアプローチを比べてみましょう。

観点手動前処理sklearn Pipeline
データリーケージのリスク高い -- test データに fit_transform してしまいやすいなし -- pipeline が正しい fit/transform を強制
学習 + 予測のコード行数環境ごとに 10〜30 行2 行 (fit, predict)
本番デプロイ各トランスフォーマーを個別に保存し、順序を再構築joblib で 1 つのオブジェクトを保存
クロスバリデーション各 fold で手動再学習が必要cross_val_score がすべて処理
ハイパーパラメータ調整前処理 + モデルのパラメータを手動ループGridSearchCV が全パラメータを同時に調整
再現性notebook の実行順序に依存決定的 -- 同じオブジェクトなら同じ結果
デバッグ各段階で shape を手動で表示pipeline.named_steps で確認可能

基本的な Pipeline の使い方

Pipeline クラスは (name, transformer) のタプルのリストを受け取ります。最後のステップ以外は fittransform を実装している必要があります。最後のステップは、分類器・回帰器・トランスフォーマーのいずれでも構いません。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
 
# Create pipeline with named steps
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

学習と予測

pipe.fit(X_train, y_train) を呼ぶと、Pipeline は次の処理を行います。

  1. scaler.fit_transform(X_train, y_train) を呼ぶ -- スケーラーを学習し、学習データを変換
  2. 変換後のデータを classifier.fit(X_transformed, y_train) に渡す

pipe.predict(X_test) を呼ぶと、Pipeline は次の処理を行います。

  1. scaler.transform(X_test) を呼ぶ -- 変換のみ、学習はしない
  2. 変換後のデータを classifier.predict(X_transformed) に渡す
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
 
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
# Build and train pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000

各ステップへのアクセス

名前で各ステップを確認できます。

# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds:  {scaler.scale_}")
 
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")

インデックスでもアクセスできます。

# Access by index
first_step = pipe[0]   # StandardScaler
last_step = pipe[-1]    # LogisticRegression
 
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1]  # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")

make_pipeline: 省略記法

カスタムなステップ名が不要な場合、make_pipeline はクラス名から自動的に名前を生成します。

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
 
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
#                          ('pca', PCA(n_components=2)),
#                          ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
 
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}

自動生成される名前はクラス名を小文字にしたものです。同じトランスフォーマーを 2 回使うと、make_pipeline は番号を付けます。

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
 
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']

Pipeline と make_pipeline の比較

機能Pipelinemake_pipeline
カスタムなステップ名あり -- 自分で指定なし -- 自動生成
大規模パイプラインでの可読性高い -- 説明的な名前低い -- 汎用的な名前
ハイパーパラメータ調整の記法自分の名前で stepname__param自動名で classname__param
コードの簡潔さやや冗長より簡潔
向いている用途本番パイプライン、チューニングクイックな試作

ハイパーパラメータ調整を行う予定がある場合や、明確なステップ名が可読性を高める場合は Pipeline を使いましょう。素早い実験なら make_pipeline が便利です。

よく使う前処理ステップ

sklearn パイプラインでよく使うトランスフォーマーを紹介します。

数値特徴量

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
 
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
トランスフォーマー何をするか使う場面
StandardScaler平均=0、標準偏差=1 に変換ほとんどのアルゴリズムのデフォルト候補
MinMaxScaler[0, 1] の範囲にスケーリングニューラルネットワーク、スケールに敏感なアルゴリズム
RobustScaler中央値と IQR を使用し、外れ値に強い外れ値が多いデータ
SimpleImputer欠損値を補完する(mean, median, most_frequent, constant)欠損データの処理
PolynomialFeatures多項式特徴量と交互作用特徴量を生成線形モデルに非線形性を加える
PowerTransformerYeo-Johnson または Box-Cox 変換を適用歪んだ分布

カテゴリ特徴量

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
 
# One-hot encode categorical features
categorical_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
トランスフォーマー何をするか使う場面
OneHotEncoder各カテゴリごとに二値列を作る順序のない名義カテゴリ
OrdinalEncoderカテゴリを整数にマップする順序のあるカテゴリ(low/medium/high)
TargetEncoder目的変数統計を使ってエンコード高カーディナリティ特徴量(scikit-learn 1.3+)

ColumnTransformer で混在データ型を扱う

実際のデータセットには、数値列とカテゴリ列の両方があります。ColumnTransformer は、異なる列グループに対して別々の変換を並列で適用します。

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
 
# Sample data with mixed types
data = pd.DataFrame({
    'age': [25, 30, np.nan, 45, 50],
    'income': [40000, 55000, 60000, np.nan, 90000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
    'purchased': [0, 1, 1, 0, 1]
})
 
X = data.drop('purchased', axis=1)
y = data['purchased']
 
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
 
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)
 
# Full pipeline: preprocessing + model
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")

変換後の特徴量名を取得する

ColumnTransformer を fit した後、変換後の特徴量名を取得できます。

# After fitting the pipeline
pipeline.fit(X, y)
 
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
#  'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']

残りの列の扱い

デフォルトでは、ColumnTransformer はどのトランスフォーマーにも指定されていない列を削除します。これは remainder パラメータで制御できます。

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough'  # Keep unspecified columns as-is
    # remainder='drop'       # Default: drop unspecified columns
)

GridSearchCV と組み合わせた Pipeline

sklearn パイプラインの最も強力な機能の 1 つは、ハイパーパラメータ調整とのシームレスな統合です。stepname__parameter 記法を使って、パイプライン内部のパラメータを参照します。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
 
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
 
# Build pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svc', SVC())
])
 
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
    'pca__n_components': [5, 10, 15, 20],
    'svc__C': [0.1, 1, 10, 100],
    'svc__kernel': ['rbf', 'linear'],
    'svc__gamma': ['scale', 'auto']
}
 
# Run grid search
grid = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid.fit(X_train, y_train)
 
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score:   {grid.best_score_:.4f}")
print(f"Test score:      {grid.score(X_test, y_test):.4f}")

ColumnTransformer のパラメータを調整する

ColumnTransformer を含むネストされたパイプラインでは、ステップ名を二重アンダースコアで連結します。

# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
    'classifier__C': [0.1, 1, 10]
}

大きな探索空間には RandomizedSearchCV を使う

パラメータグリッドが大きい場合、RandomizedSearchCV は指定回数だけパラメータ組み合わせをサンプリングします。

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
 
param_distributions = {
    'pca__n_components': randint(5, 25),
    'svc__C': uniform(0.1, 100),
    'svc__kernel': ['rbf', 'linear', 'poly'],
    'svc__gamma': uniform(0.001, 1)
}
 
random_search = RandomizedSearchCV(
    pipe,
    param_distributions,
    n_iter=50,       # Sample 50 combinations
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)
 
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score:   {random_search.best_score_:.4f}")

カスタムトランスフォーマー

FunctionTransformer: 簡単なカスタム処理

状態を持たない単純な変換には FunctionTransformer を使います。

from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
 
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1  # Optional inverse for inverse_transform
)
 
pipe = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler())
])
 
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original:    {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")

カスタムトランスフォーマークラス

データからパラメータを学習するような状態を持つ変換には、BaseEstimatorTransformerMixin を継承するクラスを作成します。

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
 
class OutlierClipper(BaseEstimator, TransformerMixin):
    """Clips values beyond a specified number of standard deviations."""
 
    def __init__(self, n_std=3):
        self.n_std = n_std
 
    def fit(self, X, y=None):
        # Learn the boundaries from training data
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        self.lower_ = self.mean_ - self.n_std * self.std_
        self.upper_ = self.mean_ + self.n_std * self.std_
        return self  # Always return self from fit
 
    def transform(self, X):
        # Apply learned boundaries to any data
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped

組み込みトランスフォーマーと同じようにパイプラインで使えます。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('clip', OutlierClipper(n_std=2)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
 
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
 
grid = GridSearchCV(
    pipe,
    {'clip__n_std': [1.5, 2, 2.5, 3]},
    cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")

特徴量エンジニアリング用のカスタムトランスフォーマー

より実用的な例として、特定の列同士の交互作用特徴量を作成する例です。

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
 
class FeatureInteraction(BaseEstimator, TransformerMixin):
    """Creates multiplication interactions between specified column pairs."""
 
    def __init__(self, interaction_pairs=None):
        self.interaction_pairs = interaction_pairs
 
    def fit(self, X, y=None):
        # Store column names if DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.tolist()
        else:
            self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
        return self
 
    def transform(self, X):
        X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
 
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                name = f"{col_a}_x_{col_b}"
                X_df[name] = X_df[col_a] * X_df[col_b]
 
        return X_df.values
 
    def get_feature_names_out(self, input_features=None):
        names = list(self.feature_names_in_)
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                names.append(f"{col_a}_x_{col_b}")
        return np.array(names)

FeatureUnion: 並列の特徴量エンジニアリング

Pipeline がステップを直列に連結するのに対し、FeatureUnion はトランスフォーマーを並列に実行し、その出力を横方向に結合します。

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
# Create parallel feature branches
feature_union = FeatureUnion([
    ('scaled', StandardScaler()),             # Original features, scaled
    ('pca', PCA(n_components=2)),             # 2 PCA components
    ('poly', PolynomialFeatures(degree=2, include_bias=False))  # Polynomial features
])
 
# Combine into a full pipeline
pipe = Pipeline([
    ('features', feature_union),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features:    {X_train.shape[1]}")
print(f"After FeatureUnion:   {X_transformed.shape[1]}")
print(f"Test accuracy:        {pipe.score(X_test, y_test):.4f}")

FeatureUnion と ColumnTransformer の比較

機能FeatureUnionColumnTransformer
入力すべての列をすべてのトランスフォーマーへ特定の列を特定のトランスフォーマーへ
出力横方向に結合横方向に結合
使いどころ同じ特徴量の複数表現異なる種類の特徴量に異なる処理が必要な場合
列選択できない -- すべての列に適用列指定が組み込みで可能
現代的な代替多くの場合 ColumnTransformer に置き換えほとんどの用途で推奨

現在の scikit-learn では、以前 FeatureUnion が使われていたケースの多くを ColumnTransformer が担当します。FeatureUnion は、同じ特徴量セットに対して複数の表現を作りたい場合(例: 生値 + PCA + 多項式特徴量)に依然として有用です。

Pipeline の保存と読み込み

Pipeline の大きな利点の 1 つは、デプロイの簡単さです。各トランスフォーマーとモデルを別々に保存するのではなく、1 つのオブジェクトとして保存できます。

import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
 
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
 
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")

Pipeline のバージョン管理

本番環境では、Pipeline に加えてメタデータも保存しましょう。

import joblib
import datetime
import sklearn
 
artifact = {
    'pipeline': pipe,
    'training_date': datetime.datetime.now().isoformat(),
    'sklearn_version': sklearn.__version__,
    'feature_names': list(iris.feature_names),
    'target_names': list(iris.target_names),
    'training_accuracy': pipe.score(X_train, y_train),
    'test_accuracy': pipe.score(X_test, y_test),
    'n_training_samples': len(X_train)
}
 
joblib.dump(artifact, 'model_artifact_v1.joblib')
 
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version:  {loaded['sklearn_version']}")
print(f"Test accuracy:    {loaded['test_accuracy']:.4f}")
 
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])

pickle を使う場合(代替)

joblib は大きな NumPy 配列を効率よく扱えるため、sklearn オブジェクトには推奨されます。標準の pickle でも動作します。

import pickle
 
# Save
with open('pipeline.pkl', 'wb') as f:
    pickle.dump(pipe, f)
 
# Load
with open('pipeline.pkl', 'rb') as f:
    loaded = pickle.load(f)

実践例: 完全な分類パイプライン

以下は、混在する特徴量型を扱う分類タスク向けの、実運用レベルの完全なパイプラインです。ColumnTransformerRandom Forest classifier を使い、最後に完全な evaluation report を出します。この例では、多くの ML 実務者が遭遇する Titanic 風のデータセットパターンを使います。

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
    train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
 
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
 
data = pd.DataFrame({
    'age': np.random.normal(35, 12, n),
    'income': np.random.lognormal(10.5, 0.8, n),
    'credit_score': np.random.normal(650, 80, n),
    'years_employed': np.random.exponential(5, n),
    'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
    'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
    'promoted': np.random.binomial(1, 0.3, n)
})
 
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
    mask = np.random.random(n) < 0.05
    data.loc[mask, col] = np.nan
 
for col in ['department', 'education']:
    mask = np.random.random(n) < 0.03
    data.loc[mask, col] = np.nan
 
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")
# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
 
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set:     {X_test.shape[0]} samples")
# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
 
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# -- Combine column transformers --
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'  # Explicitly drop unlisted columns
)
 
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(
        n_estimators=200,
        max_depth=10,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    ))
])
# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
 
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
 
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
# -- Hyperparameter tuning --
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [5, 10, 15, None],
    'classifier__min_samples_leaf': [3, 5, 10]
}
 
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid_search.fit(X_train, y_train)
 
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score:   {grid_search.best_score_:.4f}")
print(f"Test score:      {grid_search.score(X_test, y_test):.4f}")
# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
 
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
 
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values('importance', ascending=False)
 
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))
# -- Save the final pipeline --
import joblib
 
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
 
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
 
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
    'age': [28],
    'income': [65000],
    'credit_score': [720],
    'years_employed': [3.5],
    'department': ['Engineering'],
    'education': ['Master'],
    'city': ['NYC']
})
 
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")

この例は、混在する特徴量型、欠損値処理、クロスバリデーション、ハイパーパラメータ調整、特徴量の確認、本番用シリアライズといった、主要なパイプラインパターンをすべて示しています。

異なるモデル प्रकारでの Pipeline

同じ前処理パイプラインを別のモデルにも使えます。最後のステップだけ差し替えればOKです。

from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
 
# Reuse the preprocessor from the previous example
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42)
}
 
results = {}
for name, model in models.items():
    pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
    results[name] = {
        'mean': scores.mean(),
        'std': scores.std()
    }
    print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")

既存のパイプラインの estimator を動的に差し替えることもできます。

# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")

回帰における Pipeline

Pipeline は回帰タスクでも同様に使えます。sklearn での線形回帰については、sklearn linear regression guide を参照してください。

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
 
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
    'sqft': np.random.normal(1500, 400, n),
    'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
    'age': np.random.uniform(0, 50, n),
    'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
    'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
    housing['sqft'] * 200
    + housing['bedrooms'] * 15000
    - housing['age'] * 1000
    + np.random.normal(0, 20000, n)
)
 
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
 
preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('poly', PolynomialFeatures(degree=2, include_bias=False)),
        ('scaler', StandardScaler())
    ]), numeric_features),
    ('cat', Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ]), categorical_features)
])
 
reg_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', Ridge(alpha=1.0))
])
 
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
 
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")

passthroughNone でステップをスキップする

ステップは 'passthrough' または None に設定することで、条件付きでスキップできます。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
 
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('reduce_dim', PCA()),
    ('classifier', SVC())
])
 
# Grid search can toggle steps on/off
param_grid = [
    {
        'reduce_dim': [PCA(5), PCA(10), PCA(15)],
        'classifier__C': [1, 10]
    },
    {
        'reduce_dim': ['passthrough'],  # Skip PCA entirely
        'classifier__C': [1, 10]
    }
]
 
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score:  {grid.best_score_:.4f}")

Pipeline ステップのキャッシュ

ハイパーパラメータ調整時には、中間ステップが不要に再計算されることがあります。キャッシュを有効にすると、これを避けられます。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
 
# Create a temporary cache directory
cachedir = mkdtemp()
 
pipe = Pipeline(
    [
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=10)),
        ('svc', SVC())
    ],
    memory=cachedir  # Cache intermediate transformations
)
 
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
 
# Clean up when done
# rmtree(cachedir)

パイプラインを作る前にデータを探索する

パイプラインを構築する前に、特徴量の分布、欠損パターン、相関、変換の必要性を理解する必要があります。PyGWalker (opens in a new tab) を使うと、任意の Pandas DataFrame を Jupyter notebook 上で対話的に探索できる可視化インターフェースに変えられます。

import pandas as pd
import pygwalker as pyg
 
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)

こうした可視的な探索を通して、どの特徴量にスケーリングが必要か、どの特徴量に外れ値クリッピングが必要か、どのカテゴリ特徴量が高カーディナリティかを判断できます。パイプラインコードを 1 行も書く前に、欠損値パターンや特徴量分布を把握できます。

パイプライン実験ワークフロー全体を繰り返し改善したい場合 -- 前処理戦略の比較、モデル性能の比較、結果の追跡など -- RunCell (opens in a new tab) は、コード生成・デバッグ・実験管理をエージェントが支援する AI 搭載の Jupyter 環境を提供します。

よくある落とし穴とデバッグのヒント

落とし穴 1: 予測時に Pipeline を使い忘れる

# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
 
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)

落とし穴 2: 前処理を Pipeline の外で fit する

# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
 
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train)  # Pipeline handles the scaling

落とし穴 3: GridSearchCV のパラメータ名が間違っている

stepname__param の記法は、Pipeline のステップ名と完全に一致している必要があります。

pipe = Pipeline([
    ('my_scaler', StandardScaler()),
    ('my_clf', LogisticRegression())
])
 
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]}  # KeyError
 
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}

落とし穴 4: 列順が変わる

Pandas DataFrame で ColumnTransformer を使う場合(DataFrame は pandas read_csv で読み込めます)、出力列の順序は元の DataFrame ではなく、トランスフォーマーの順序に依存します。

# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
    ('num', numeric_transformer, numeric_features),     # These come first
    ('cat', categorical_transformer, categorical_features)  # These come second
])

中間出力のデバッグ

各ステップで何が起きているか確認するには、次の方法があります。

# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
 
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1]  # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
 
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas")  # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debug

shape の不一致をデバッグする

# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
 
for name, step in pipe.named_steps.items():
    if hasattr(step, 'transform'):
        # Check if the step has been fitted
        try:
            X_train = step.transform(X_train)
            print(f"After '{name}': {X_train.shape}")
        except Exception as e:
            print(f"Error at '{name}': {e}")
            break

Pipeline のメソッドリファレンス

メソッド説明
fit(X, y)すべてのトランスフォーマーと最終 estimator を学習
predict(X)X をすべてのステップで変換し、最終 estimator で予測
predict_proba(X)変換して確率を返す(分類器のみ)
transform(X)X をすべてのステップで変換(最後がトランスフォーマーの場合)
fit_transform(X, y)学習と変換を 1 回で実行
fit_predict(X, y)学習と予測を 1 回で実行
score(X, y)変換してスコアリング(分類器は accuracy、回帰は R2)
set_params(**params)stepname__param 記法でパラメータを設定
get_params()すべてのパラメータを取得
named_stepsPipeline のステップへ辞書のようにアクセス
[i] or [name]インデックスまたは名前でステップにアクセス
[start:end]サブパイプラインを作成するためにスライス

FAQ

Pipeline と make_pipeline の違いは何ですか?

Pipeline では各ステップに (name, estimator) タプルを指定する必要があり、ステップ名を明示的に管理できます。make_pipeline は estimator をそのまま受け取り、クラス名を小文字化して名前を自動生成します。可読性の高い名前が必要な場合や、GridSearchCV でハイパーパラメータ調整を行う場合は Pipeline を使いましょう。素早い試作には make_pipeline が便利です。

sklearn Pipeline はデータリーケージを防ぎますか?

はい。pipeline.fit(X_train, y_train) を呼ぶと、各トランスフォーマーは学習データのみに対して fit されます。cross_val_scoreGridSearchCV でクロスバリデーションを行う場合も、Pipeline は各 training fold で全ステップを再 fit するため、test fold のデータが前処理計算へ漏れることはありません。これが手動前処理に対する主な利点です。

Pipeline は深層学習モデルでも使えますか?

Scikit-learn のパイプラインは、sklearn API に従う estimator(fitpredict、必要に応じて transform を実装)なら何でも使えます。scikeras のようなライブラリは Keras モデルを sklearn 互換でラップできるため、パイプライン内で使えます。XGBoost や LightGBM も sklearn 互換インターフェースを提供しています。

ColumnTransformer の後で特徴量名をどう取得しますか?

fit 後に pipeline.named_steps['preprocessor'].get_feature_names_out() を呼びます。これにより、どのトランスフォーマーが生成したかを示すプレフィックス付きの特徴量名の配列が返ります(例: num__age, cat__city_NYC)。これは scikit-learn 1.0 以降で利用できます。

1 つの Pipeline に複数モデルを入れられますか?

いいえ。Pipeline は線形なシーケンスで、最後のステップが estimator です。複数モデルを比較したい場合は、同じ前処理ステップを使い、最後の estimator だけ異なる複数の Pipeline を作成します。モデルの辞書をループしてそれぞれの Pipeline を作ると自動化できます。

GridSearchCV 中に Pipeline のステップをスキップするには?

パラメータグリッドでステップを 'passthrough' に設定します。たとえば {'reduce_dim': ['passthrough']} とすると、その grid search イテレーションでは reduce_dim ステップ全体をスキップします。None も設定できますが、推奨は 'passthrough' です。

予測時に未見のカテゴリが出たらどうなりますか?

OneHotEncoderhandle_unknown='ignore' を設定して Pipeline に組み込んでいれば、未見カテゴリはすべて 0 のベクトルとしてエンコードされます。これを設定しないとエラーになります。本番環境では新しいカテゴリ値が出る可能性があるため、常に handle_unknown='ignore' を設定しましょう。

結論

Sklearn Pipeline は、散らかっていてミスの起きやすい ML コードを、クリーンで再現可能なワークフローに変えます。前処理とモデリングを 1 つのオブジェクトに連結することで、データリーケージを防ぎ、デプロイを簡単にし、ワークフロー全体にわたるハイパーパラメータ調整を容易にします。

まずは基本から始めましょう。スケーラーとモデルを Pipeline に入れるところからです。特徴量型が混在するなら ColumnTransformer に進みます。Pipeline の各ステップをまとめて調整したいなら GridSearchCV を使いましょう。組み込みの選択肢では足りない特徴量エンジニアリングが必要なら、カスタムトランスフォーマーを作成します。

Pipeline を学ぶ投資はすぐに報われます。前処理が fold ごとに再学習されるため、クロスバリデーション結果を信頼できるようになります。本番デプロイは joblib.dumpjoblib.load だけで済みます。そして、変換と予測のロジック全体が 1 つの検査可能なオブジェクトにまとまるため、コードベースの保守性が高まります。

関連ガイド

📚