Sklearn Pipeline: Python에서 ML 파이프라인을 구축하는 완전 가이드
업데이트
당신에게는 다섯 단계의 preprocessing, 세 개의 feature engineering 작업, 그리고 최종 model이 있는 machine learning 프로젝트가 있습니다. 각 단계는 별도의 code block입니다. scaler를 전체 dataset에 fit한 다음 train/test로 split합니다. one-hot encoding은 training과 production에서 서로 다른 columns를 만들어냅니다. 몇 달 뒤 누군가 imputation strategy를 바꾸지만 deployment script는 업데이트하는 것을 잊습니다.
이것이 대부분의 ML codebase의 현실입니다. 수동 preprocessing pipeline은 취약하고 오류가 발생하기 쉬우며, data leakage의 끊임없는 원인이 됩니다. data leakage는 notebook에서는 훌륭하게 작동하지만 live data에서는 실패하는 model의 가장 흔한 원인입니다. StandardScaler를 split 전에 전체 dataset에 fit하면 test set statistics가 training에 섞여 들어갑니다. 통합된 workflow 밖에서 categorical feature를 encode하면 train-test skew는 production이 깨질 때까지 보이지 않습니다.
Scikit-learn의 Pipeline은 preprocessing과 modeling을 하나의 object로 연결함으로써 이런 문제를 해결합니다. fit() 한 번이면 모든 것이 학습됩니다. predict() 한 번이면 transform과 prediction이 수행됩니다. data leakage 없음. transformation 불일치 없음. 저장, 로드, 배포를 위한 단일 object. 이 가이드는 기본 사용법부터 custom transformer, 실제 production deployment 패턴까지 production-quality sklearn pipeline을 구축하는 데 필요한 모든 것을 다룹니다.
Pipeline이 중요한 이유
Data Leakage 문제
Data leakage는 training set 외부의 정보가 training 중 model에 영향을 줄 때 발생합니다. preprocessing에서 가장 흔한 형태는 다음과 같습니다:
# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # Fitted on ALL data, including test
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full datasetscaler는 test sample을 포함한 전체 dataset에서 mean과 standard deviation을 계산합니다. 따라서 preprocessing 과정에서 해당 sample들의 정보를 간접적으로 "본" 셈이 되므로 test set evaluation은 지나치게 낙관적으로 나옵니다.
올바른 방법은 다음과 같습니다:
# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # Fit only on training
X_test_scaled = scaler.transform(X_test) # Transform only, no fitting이 방법은 맞지만, 수동 접근은 금방 다루기 어려워집니다. 다섯 개 preprocessing step이 있으면 다섯 개 fitted object를 추적하고 각 step마다 fit_transform과 transform을 정확히 기억해야 합니다. Pipeline은 이를 자동으로 처리합니다.
Code Organization
Leakage를 넘어서, pipeline은 code organization 문제도 해결합니다. 두 가지 접근 방식을 비교해 보세요:
| Aspect | Manual Preprocessing | sklearn Pipeline |
|---|---|---|
| Data leakage risk | High -- test data에 fit_transform을 호출하기 쉬움 | None -- pipeline이 올바른 fit/transform을 강제 |
| Train + predict를 위한 code lines | environment당 10-30 lines | 2 lines (fit, predict) |
| Production 배포 | 각 transformer를 개별 serialize하고 순서를 재구성 | joblib로 하나의 object serialize |
| Cross-validation | 각 fold마다 모든 step을 수동으로 refit해야 함 | cross_val_score가 모두 처리 |
| Hyperparameter tuning | preprocessing + model params를 직접 loop | GridSearchCV가 모든 params를 함께 tuning |
| Reproducibility | notebook 실행 순서에 의존 | Deterministic -- 같은 object, 같은 결과 |
| Debugging | 각 step마다 shape를 출력하며 수동 확인 | 점검을 위한 pipeline.named_steps |
Basic Pipeline Usage
Pipeline class는 (name, transformer) tuple의 list를 받습니다. 마지막 step을 제외한 모든 step은 fit과 transform을 구현해야 합니다. 마지막 step은 classifier, regressor 또는 transformer일 수 있습니다.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
# Create pipeline with named steps
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])Fitting and Predicting
pipe.fit(X_train, y_train)를 호출하면 pipeline은:
scaler.fit_transform(X_train, y_train)를 호출합니다 -- scaler를 fit하고 training data를 transform합니다.- 변환된 데이터를
classifier.fit(X_transformed, y_train)에 전달합니다.
pipe.predict(X_test)를 호출하면 pipeline은:
scaler.transform(X_test)를 호출합니다 -- transform만 수행하고 fit은 하지 않습니다.- 변환된 데이터를
classifier.predict(X_transformed)에 전달합니다.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
# Load data
iris = load_iris()
X, y = iris.data, iris.target
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Build and train pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
pipe.fit(X_train, y_train)
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000Individual Step 접근하기
이름으로 어떤 step이든 검사할 수 있습니다:
# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds: {scaler.scale_}")
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")인덱싱도 사용할 수 있습니다:
# Access by index
first_step = pipe[0] # StandardScaler
last_step = pipe[-1] # LogisticRegression
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1] # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")make_pipeline: Shorthand
custom step name이 필요하지 않다면 make_pipeline이 class name에서 자동으로 이름을 생성합니다:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
# ('pca', PCA(n_components=2)),
# ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}자동 생성된 이름은 class name을 소문자로 바꾼 것입니다. 같은 transformer를 두 번 사용하면 make_pipeline은 숫자를 덧붙입니다:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']Pipeline vs make_pipeline
| Feature | Pipeline | make_pipeline |
|---|---|---|
| Custom step names | Yes -- 직접 지정 가능 | No -- 자동 생성 |
| Large pipeline 가독성 | Better -- 설명적인 이름 | Worse -- 일반적인 이름 |
| Hyperparameter tuning syntax | stepname__param 형태, 직접 이름 사용 | classname__param 형태, 자동 이름 사용 |
| Code brevity | 더 verbose | 더 concise |
| Best for | Production pipeline, tuning | 빠른 prototyping |
hyperparameter를 tuning할 계획이 있거나 명확한 step name이 가독성에 도움이 된다면 Pipeline을 사용하세요. 빠른 실험에는 make_pipeline을 사용하세요.
Common Preprocessing Steps
아래는 sklearn pipeline에서 가장 자주 사용되는 transformer들입니다:
Numeric Features
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])| Transformer | What It Does | When to Use |
|---|---|---|
StandardScaler | mean=0, std=1이 되도록 중심화 및 scaling | 대부분의 algorithm에서 기본 선택 |
MinMaxScaler | 값을 [0, 1] 범위로 scaling | Neural network, magnitude에 민감한 algorithm |
RobustScaler | median과 IQR을 사용하며 outlier에 강함 | 큰 outlier가 있는 data |
SimpleImputer | missing value 채우기 (mean, median, most_frequent, constant) | missing data 처리 |
PolynomialFeatures | polynomial 및 interaction feature 생성 | linear model에 nonlinearity 추가 |
PowerTransformer | Yeo-Johnson 또는 Box-Cox transform 적용 | skewed distribution |
Categorical Features
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
# One-hot encode categorical features
categorical_pipe = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])| Transformer | What It Does | When to Use |
|---|---|---|
OneHotEncoder | 각 category마다 binary column 생성 | Nominal category (순서 없음) |
OrdinalEncoder | category를 integer로 매핑 | Ordinal category (low/medium/high) |
TargetEncoder | target variable statistics를 사용해 encode | High-cardinality feature (scikit-learn 1.3+) |
ColumnTransformer for Mixed Data Types
실제 dataset에는 numeric column과 categorical column이 모두 있습니다. ColumnTransformer는 서로 다른 column subset에 서로 다른 transformation을 병렬로 적용합니다:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
# Sample data with mixed types
data = pd.DataFrame({
'age': [25, 30, np.nan, 45, 50],
'income': [40000, 55000, 60000, np.nan, 90000],
'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
'purchased': [0, 1, 1, 0, 1]
})
X = data.drop('purchased', axis=1)
y = data['purchased']
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# Full pipeline: preprocessing + model
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', LogisticRegression(max_iter=1000))
])
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")Transformation 후 Feature Name 얻기
ColumnTransformer를 fit한 뒤 변환된 feature name을 가져올 수 있습니다:
# After fitting the pipeline
pipeline.fit(X, y)
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
# 'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']Remainder Columns 처리하기
기본적으로 ColumnTransformer는 어떤 transformer에도 지정되지 않은 column을 drop합니다. remainder parameter로 이를 제어할 수 있습니다:
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='passthrough' # Keep unspecified columns as-is
# remainder='drop' # Default: drop unspecified columns
)Pipeline with GridSearchCV
sklearn pipeline의 가장 강력한 기능 중 하나는 hyperparameter tuning과의 매끄러운 통합입니다. pipeline step 내부의 parameter를 참조하려면 stepname__parameter syntax를 사용하세요:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
# Build pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA()),
('svc', SVC())
])
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
'pca__n_components': [5, 10, 15, 20],
'svc__C': [0.1, 1, 10, 100],
'svc__kernel': ['rbf', 'linear'],
'svc__gamma': ['scale', 'auto']
}
# Run grid search
grid = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid.fit(X_train, y_train)
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score: {grid.best_score_:.4f}")
print(f"Test score: {grid.score(X_test, y_test):.4f}")ColumnTransformer Parameters Tuning
중첩된 pipeline과 ColumnTransformer의 parameter를 tuning하려면 step name을 double underscore로 이어 붙이면 됩니다:
# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
'classifier__C': [0.1, 1, 10]
}Large Search Space에는 RandomizedSearchCV 사용하기
parameter grid가 크면 RandomizedSearchCV가 고정된 개수의 parameter 조합을 샘플링합니다:
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
param_distributions = {
'pca__n_components': randint(5, 25),
'svc__C': uniform(0.1, 100),
'svc__kernel': ['rbf', 'linear', 'poly'],
'svc__gamma': uniform(0.001, 1)
}
random_search = RandomizedSearchCV(
pipe,
param_distributions,
n_iter=50, # Sample 50 combinations
cv=5,
scoring='accuracy',
n_jobs=-1,
random_state=42
)
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")Custom Transformers
FunctionTransformer: 간단한 Custom Step
간단하고 state가 없는 변환에는 FunctionTransformer를 사용하세요:
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
func=np.log1p,
inverse_func=np.expm1 # Optional inverse for inverse_transform
)
pipe = Pipeline([
('log', log_transformer),
('scaler', StandardScaler())
])
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original: {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")Custom Transformer Class
stateful transformation(데이터로부터 parameter를 학습하는 경우)에는 BaseEstimator와 TransformerMixin을 상속하는 class를 만드세요:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class OutlierClipper(BaseEstimator, TransformerMixin):
"""Clips values beyond a specified number of standard deviations."""
def __init__(self, n_std=3):
self.n_std = n_std
def fit(self, X, y=None):
# Learn the boundaries from training data
self.mean_ = np.mean(X, axis=0)
self.std_ = np.std(X, axis=0)
self.lower_ = self.mean_ - self.n_std * self.std_
self.upper_ = self.mean_ + self.n_std * self.std_
return self # Always return self from fit
def transform(self, X):
# Apply learned boundaries to any data
X_clipped = np.clip(X, self.lower_, self.upper_)
return X_clippedbuilt-in transformer처럼 pipeline에서 사용할 수 있습니다:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('clip', OutlierClipper(n_std=2)),
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
grid = GridSearchCV(
pipe,
{'clip__n_std': [1.5, 2, 2.5, 3]},
cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")Feature Engineering을 위한 Custom Transformer
더 실용적인 예시로, 특정 column들 사이의 interaction feature를 생성하는 경우입니다:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
class FeatureInteraction(BaseEstimator, TransformerMixin):
"""Creates multiplication interactions between specified column pairs."""
def __init__(self, interaction_pairs=None):
self.interaction_pairs = interaction_pairs
def fit(self, X, y=None):
# Store column names if DataFrame
if isinstance(X, pd.DataFrame):
self.feature_names_in_ = X.columns.tolist()
else:
self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
return self
def transform(self, X):
X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
if self.interaction_pairs:
for col_a, col_b in self.interaction_pairs:
name = f"{col_a}_x_{col_b}"
X_df[name] = X_df[col_a] * X_df[col_b]
return X_df.values
def get_feature_names_out(self, input_features=None):
names = list(self.feature_names_in_)
if self.interaction_pairs:
for col_a, col_b in self.interaction_pairs:
names.append(f"{col_a}_x_{col_b}")
return np.array(names)FeatureUnion: Parallel Feature Engineering
Pipeline이 step을 순차적으로 연결하는 반면, FeatureUnion은 transformer를 병렬로 실행하고 그 출력을 가로로 concatenation합니다:
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# Create parallel feature branches
feature_union = FeatureUnion([
('scaled', StandardScaler()), # Original features, scaled
('pca', PCA(n_components=2)), # 2 PCA components
('poly', PolynomialFeatures(degree=2, include_bias=False)) # Polynomial features
])
# Combine into a full pipeline
pipe = Pipeline([
('features', feature_union),
('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
pipe.fit(X_train, y_train)
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features: {X_train.shape[1]}")
print(f"After FeatureUnion: {X_transformed.shape[1]}")
print(f"Test accuracy: {pipe.score(X_test, y_test):.4f}")FeatureUnion vs ColumnTransformer
| Feature | FeatureUnion | ColumnTransformer |
|---|---|---|
| Input | All columns go to all transformers | Specific columns to specific transformers |
| Output | Concatenates horizontally | Concatenates horizontally |
| Use case | 동일한 features의 여러 representation | 서로 다른 type의 feature에 서로 다른 processing 필요 |
| Column selection | Cannot select -- operates on all columns | Built-in column specification |
| Modern alternative | Often replaced by ColumnTransformer | 대부분의 use case에서 preferred |
현대 scikit-learn에서는 이전에 FeatureUnion이 사용되던 대부분의 경우를 ColumnTransformer가 처리합니다. FeatureUnion은 동일한 feature set의 여러 representation이 필요할 때(예: raw values + PCA + polynomial features) 여전히 유용합니다.
Saving and Loading Pipelines
pipeline의 가장 큰 장점 중 하나는 deployment의 단순성입니다. 각 transformer와 model을 따로 serialize하는 대신 하나의 object만 저장하면 됩니다:
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")Versioning Pipelines
production에서는 pipeline과 함께 metadata를 포함하세요:
import joblib
import datetime
import sklearn
artifact = {
'pipeline': pipe,
'training_date': datetime.datetime.now().isoformat(),
'sklearn_version': sklearn.__version__,
'feature_names': list(iris.feature_names),
'target_names': list(iris.target_names),
'training_accuracy': pipe.score(X_train, y_train),
'test_accuracy': pipe.score(X_test, y_test),
'n_training_samples': len(X_train)
}
joblib.dump(artifact, 'model_artifact_v1.joblib')
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version: {loaded['sklearn_version']}")
print(f"Test accuracy: {loaded['test_accuracy']:.4f}")
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])pickle 사용하기 (대안)
joblib은 큰 NumPy array를 효율적으로 처리하기 때문에 sklearn object에 더 선호됩니다. 표준 pickle도 동작합니다:
import pickle
# Save
with open('pipeline.pkl', 'wb') as f:
pickle.dump(pipe, f)
# Load
with open('pipeline.pkl', 'rb') as f:
loaded = pickle.load(f)Real-World Example: Complete Classification Pipeline
여기서는 mixed feature type을 가진 classification task를 위한 완전한 production-quality pipeline을 보여줍니다. ColumnTransformer와 함께 Random Forest classifier를 사용하고, 마지막에는 전체 evaluation report로 끝납니다. 이 예시는 대부분의 ML practitioner가 접하는 Titanic 스타일의 dataset 패턴을 사용합니다:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
data = pd.DataFrame({
'age': np.random.normal(35, 12, n),
'income': np.random.lognormal(10.5, 0.8, n),
'credit_score': np.random.normal(650, 80, n),
'years_employed': np.random.exponential(5, n),
'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
'promoted': np.random.binomial(1, 0.3, n)
})
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
mask = np.random.random(n) < 0.05
data.loc[mask, col] = np.nan
for col in ['department', 'education']:
mask = np.random.random(n) < 0.03
data.loc[mask, col] = np.nan
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# -- Combine column transformers --
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='drop' # Explicitly drop unlisted columns
)
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(
n_estimators=200,
max_depth=10,
min_samples_leaf=5,
class_weight='balanced',
random_state=42,
n_jobs=-1
))
])# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")# -- Hyperparameter tuning --
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'classifier__n_estimators': [100, 200, 300],
'classifier__max_depth': [5, 10, 15, None],
'classifier__min_samples_leaf': [3, 5, 10]
}
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
print(f"Test score: {grid_search.score(X_test, y_test):.4f}")# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': importances
}).sort_values('importance', ascending=False)
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))# -- Save the final pipeline --
import joblib
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
'age': [28],
'income': [65000],
'credit_score': [720],
'years_employed': [3.5],
'department': ['Engineering'],
'education': ['Master'],
'city': ['NYC']
})
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")이 예시는 mixed feature type, missing value handling, cross-validation, hyperparameter tuning, feature inspection, production serialization 등 모든 핵심 pipeline 패턴을 보여줍니다.
Different Model Types와 함께 Pipeline 사용하기
같은 preprocessing pipeline을 여러 model에 사용할 수 있습니다. 마지막 step만 교체하면 됩니다:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
# Reuse the preprocessor from the previous example
models = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'SVM': SVC(kernel='rbf', random_state=42)
}
results = {}
for name, model in models.items():
pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', model)
])
scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
results[name] = {
'mean': scores.mean(),
'std': scores.std()
}
print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")기존 pipeline의 estimator step을 동적으로 설정할 수도 있습니다:
# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")Regression에서의 Pipeline
Pipeline은 regression task에서도 동일하게 동작합니다. sklearn의 linear regression에 대한 자세한 내용은 sklearn linear regression guide를 참고하세요.
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
'sqft': np.random.normal(1500, 400, n),
'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
'age': np.random.uniform(0, 50, n),
'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
housing['sqft'] * 200
+ housing['bedrooms'] * 15000
- housing['age'] * 1000
+ np.random.normal(0, 20000, n)
)
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
preprocessor = ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
]), categorical_features)
])
reg_pipeline = Pipeline([
('preprocessor', preprocessor),
('regressor', Ridge(alpha=1.0))
])
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")passthrough와 None으로 Step 건너뛰기
step을 'passthrough' 또는 None으로 설정하여 조건부로 생략할 수 있습니다:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('scaler', StandardScaler()),
('reduce_dim', PCA()),
('classifier', SVC())
])
# Grid search can toggle steps on/off
param_grid = [
{
'reduce_dim': [PCA(5), PCA(10), PCA(15)],
'classifier__C': [1, 10]
},
{
'reduce_dim': ['passthrough'], # Skip PCA entirely
'classifier__C': [1, 10]
}
]
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score: {grid.best_score_:.4f}")Pipeline Step Caching
hyperparameter를 tuning할 때 intermediate step이 불필요하게 반복 계산될 수 있습니다. caching을 활성화하여 이를 방지할 수 있습니다:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
# Create a temporary cache directory
cachedir = mkdtemp()
pipe = Pipeline(
[
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('svc', SVC())
],
memory=cachedir # Cache intermediate transformations
)
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
# Clean up when done
# rmtree(cachedir)Pipeline을 만들기 전에 데이터 탐색하기
pipeline을 구성하기 전에 feature의 분포, missing value 패턴, correlation, 그리고 가능한 transformation을 이해해야 합니다. PyGWalker (opens in a new tab)는 Pandas DataFrame을 Jupyter notebook 안에서 바로 interactive visual exploration interface로 바꿔줍니다:
import pandas as pd
import pygwalker as pyg
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)이러한 visual exploration은 어떤 feature를 scaling해야 하는지, 어떤 feature에 outlier clipping이 필요한지, 어떤 categorical feature가 high cardinality인지 결정하는 데 도움이 됩니다. pipeline code를 한 줄 쓰기도 전에 missing value pattern을 파악하고 feature distribution을 이해할 수 있습니다.
전체 pipeline experimentation workflow를 반복하면서 preprocessing strategy를 시험하고, model performance를 비교하고, 결과를 추적하려면 RunCell (opens in a new tab)이 code generation, debugging, experiment management를 지원하는 AI-powered Jupyter environment를 제공합니다.
Common Pitfalls and Debugging Tips
Pitfall 1: Prediction에 Pipeline을 사용하지 않기
# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)Pitfall 2: Pipeline 밖에서 Preprocessing을 Fit하기
# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train) # Pipeline handles the scalingPitfall 3: GridSearchCV에서 잘못된 Parameter Name 사용하기
stepname__param syntax는 pipeline step name과 정확히 일치해야 합니다:
pipe = Pipeline([
('my_scaler', StandardScaler()),
('my_clf', LogisticRegression())
])
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]} # KeyError
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}Pitfall 4: Column Order Changes
ColumnTransformer를 Pandas DataFrame과 함께 사용할 때(pandas read_csv로 불러올 수 있습니다), output의 column order는 원본 DataFrame이 아니라 transformer의 순서에 따라 결정됩니다:
# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
('num', numeric_transformer, numeric_features), # These come first
('cat', categorical_transformer, categorical_features) # These come second
])Intermediate Output 디버깅
각 step에서 무엇이 일어나는지 확인하려면:
# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1] # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas") # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debugShape Mismatch 디버깅
# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
for name, step in pipe.named_steps.items():
if hasattr(step, 'transform'):
# Check if the step has been fitted
try:
X_train = step.transform(X_train)
print(f"After '{name}': {X_train.shape}")
except Exception as e:
print(f"Error at '{name}': {e}")
breakPipeline Method Reference
| Method | Description |
|---|---|
fit(X, y) | 모든 transformer와 final estimator를 fit |
predict(X) | 모든 step을 통해 X를 transform한 뒤 final estimator로 predict |
predict_proba(X) | transform 후 probability estimate 반환 (classifier만) |
transform(X) | 마지막 step이 transformer일 때, 모든 step을 통해 X를 transform |
fit_transform(X, y) | 한 번에 fit과 transform 수행 |
fit_predict(X, y) | 한 번에 fit과 predict 수행 |
score(X, y) | transform 후 score (classifier는 accuracy, regressor는 R2) |
set_params(**params) | stepname__param syntax로 parameter 설정 |
get_params() | 모든 parameter 가져오기 |
named_steps | pipeline step에 dictionary처럼 접근 |
[i] or [name] | index 또는 name으로 step 접근 |
[start:end] | sub-pipeline 생성을 위한 slice |
FAQ
Pipeline과 make_pipeline의 차이는 무엇인가요?
Pipeline은 각 step에 대해 (name, estimator) tuple을 제공해야 하므로 step name을 명시적으로 제어할 수 있습니다. make_pipeline은 estimator instance를 그대로 받아 class name을 기반으로 자동으로 이름을 생성합니다(소문자). GridSearchCV로 hyperparameter를 tuning하거나 설명적인 이름이 필요할 때는 Pipeline을 사용하세요. 빠른 prototyping에는 make_pipeline이 적합합니다.
sklearn Pipeline이 data leakage를 막아주나요?
네. pipeline.fit(X_train, y_train)를 호출하면 각 transformer는 training data에만 fit됩니다. cross_val_score 또는 GridSearchCV로 cross-validation을 수행할 때 pipeline은 각 training fold마다 모든 step을 다시 fit하므로 test fold data가 preprocessing에 섞이지 않습니다. 이것이 수동 preprocessing에 대한 가장 큰 장점입니다.
Pipeline을 deep learning model과 함께 사용할 수 있나요?
Scikit-learn pipeline은 sklearn API(fit, predict, 그리고 선택적으로 transform 구현)를 따르는 어떤 estimator와도 함께 사용할 수 있습니다. scikeras 같은 library는 Keras model을 sklearn-compatible wrapper로 제공하여 pipeline에서 사용할 수 있게 합니다. XGBoost와 LightGBM도 sklearn-compatible interface를 제공합니다.
ColumnTransformer 이후 feature name은 어떻게 처리하나요?
fit 후 pipeline.named_steps['preprocessor'].get_feature_names_out()를 호출하세요. 그러면 어떤 transformer가 생성했는지를 나타내는 prefix가 붙은 feature name 배열이 반환됩니다(예: num__age, cat__city_NYC). 이는 scikit-learn 1.0 이상에서 동작합니다.
하나의 Pipeline에 여러 model을 넣을 수 있나요?
아니요. Pipeline은 마지막 step이 estimator인 linear sequence입니다. 여러 model을 비교하려면 preprocessing step은 같고 final estimator만 다른 별도의 pipeline을 만드세요. model dictionary를 loop 돌며 각 model마다 pipeline을 생성하는 방식으로 자동화할 수 있습니다.
GridSearchCV에서 Pipeline의 step을 어떻게 건너뛰나요?
parameter grid에서 step을 'passthrough'로 설정하세요. 예를 들어 {'reduce_dim': ['passthrough']}는 grid search iteration 동안 reduce_dim step을 완전히 건너뜁니다. None도 사용할 수 있지만 'passthrough'가 권장됩니다.
Prediction 시 Pipeline이 unseen category를 만나면 어떻게 되나요?
Pipeline 내부의 OneHotEncoder에 handle_unknown='ignore'를 설정하면 unseen category는 모두 0으로 encode됩니다. 이 설정이 없으면 pipeline은 error를 발생시킵니다. production pipeline에서 새 category 값이 등장할 가능성이 있다면 항상 handle_unknown='ignore'를 설정하세요.
결론
Sklearn Pipeline은 복잡하고 오류가 잦은 ML code를 깔끔하고 재현 가능한 workflow로 바꿉니다. preprocessing과 modeling을 하나의 object로 연결함으로써 data leakage를 제거하고, deployment를 단순화하며, 전체 workflow에 걸친 hyperparameter tuning을 훨씬 쉽게 만듭니다.
기본부터 시작하세요: scaler와 model을 Pipeline으로 감싸세요. mixed feature type이 있다면 ColumnTransformer로 발전시키세요. pipeline의 모든 step에 걸쳐 parameter를 동시에 tuning하려면 GridSearchCV를 사용하세요. built-in 옵션이 feature engineering 요구를 충족하지 못할 때는 custom transformer를 만드세요.
pipeline을 배우는 데 드는 투자는 즉시 보상을 줍니다. preprocessing이 fold마다 다시 fit되므로 cross-validation 결과를 신뢰할 수 있게 됩니다. production deployment는 단일 joblib.dump와 joblib.load로 끝납니다. 그리고 전체 transformation 및 prediction logic이 하나의 inspect 가능한 object 안에 들어 있으므로 codebase는 유지보수하기 쉬워집니다.
관련 가이드
- Sklearn Linear Regression -- pipeline에 자연스럽게 들어가는 regression model 구축
- Sklearn Confusion Matrix -- pipeline의 classification output 평가
- Sklearn Random Forest -- final pipeline step으로 사용할 수 있는 강력한 classifier
- Pandas read_csv -- pipeline에 넣기 전에 CSV data를 DataFrame으로 로드하기