Sklearn Pipeline: guia completo para construir pipelines de ML em Python
Atualizado em
Você tem um projeto de machine learning com cinco etapas de pré-processamento, três operações de feature engineering e um modelo final. Cada etapa é um bloco de código separado. Você ajusta o scaler no dataset inteiro e depois divide em treino e teste. Sua one-hot encoding cria colunas diferentes no treinamento e em produção. Meses depois, alguém altera a estratégia de imputação, mas esquece de atualizar o script de deployment.
Essa é a realidade da maioria dos codebases de ML. Pipelines manuais de pré-processamento são frágeis, propensos a erros e uma fonte constante de data leakage -- a causa isolada mais comum de modelos que vão muito bem em notebooks, mas falham em dados reais. Quando você ajusta um StandardScaler em todo o dataset antes da divisão, estatísticas do conjunto de teste vazam para o treino. Quando você codifica variáveis categóricas fora de um workflow unificado, o descompasso entre treino e teste fica invisível até a produção quebrar.
O Pipeline do Scikit-learn resolve esses problemas encadeando pré-processamento e modelagem em um único objeto. Uma chamada para fit() treina tudo. Uma chamada para predict() transforma e prevê. Sem data leakage. Sem transformações incompatíveis. Um único objeto para salvar, carregar e implantar. Este guia cobre tudo o que você precisa para construir sklearn pipelines de qualidade de produção, desde o uso básico até transformadores personalizados e padrões reais de implantação.
Por que Pipelines Importam
O Problema de Data Leakage
Data leakage acontece quando informações fora do conjunto de treinamento influenciam o modelo durante o treinamento. A forma mais comum no pré-processamento se parece com isto:
# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # Fitted on ALL data, including test
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full datasetO scaler calcula média e desvio padrão com base em todo o dataset, incluindo as amostras de teste. Sua avaliação no conjunto de teste agora fica otimista porque o modelo “viu” indiretamente informações dessas amostras durante o pré-processamento.
A abordagem correta:
# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # Fit only on training
X_test_scaled = scaler.transform(X_test) # Transform only, no fittingIsso funciona, mas a abordagem manual rapidamente fica difícil de gerenciar. Com cinco etapas de pré-processamento, você precisa acompanhar cinco objetos ajustados e lembrar a chamada correta de fit_transform versus transform para cada um. Um Pipeline faz isso automaticamente.
Organização do Código
Além do leakage, pipelines resolvem um problema de organização de código. Compare estas duas abordagens:
| Aspecto | Pré-processamento manual | sklearn Pipeline |
|---|---|---|
| Risco de data leakage | Alto -- fácil chamar fit_transform em dados de teste | Nenhum -- o pipeline força fit/transform corretos |
| Linhas de código para treino + previsão | 10-30 linhas por ambiente | 2 linhas (fit, predict) |
| Implantação em produção | Serializar cada transformador separadamente, reconstruir a ordem | Serializar um objeto com joblib |
| Cross-validation | Precisa refazer manualmente todas as etapas por fold | cross_val_score cuida de tudo |
| Hyperparameter tuning | Iterar manualmente sobre pré-processamento + parâmetros do modelo | GridSearchCV ajusta todos os parâmetros juntos |
| Reprodutibilidade | Depende da ordem de execução no notebook | Determinística -- mesmo objeto, mesmo resultado |
| Debugging | Imprimir shapes após cada etapa, manualmente | pipeline.named_steps para inspeção |
Uso Básico de Pipeline
A classe Pipeline recebe uma lista de tuplas (nome, transformer). Todas as etapas, exceto a última, devem implementar fit e transform. A última etapa pode ser qualquer estimador (classificador, regressor ou transformador).
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
# Create pipeline with named steps
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])Ajuste e Previsão
Quando você chama pipe.fit(X_train, y_train), o pipeline:
- Chama
scaler.fit_transform(X_train, y_train)-- ajusta o scaler e transforma os dados de treino - Passa os dados transformados para
classifier.fit(X_transformed, y_train)
Quando você chama pipe.predict(X_test), o pipeline:
- Chama
scaler.transform(X_test)-- transforma apenas, sem ajustar - Passa os dados transformados para
classifier.predict(X_transformed)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
# Load data
iris = load_iris()
X, y = iris.data, iris.target
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Build and train pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
pipe.fit(X_train, y_train)
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000Acessando Etapas Individuais
Você pode inspecionar qualquer etapa pelo nome:
# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds: {scaler.scale_}")
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")Você também pode usar indexação:
# Access by index
first_step = pipe[0] # StandardScaler
last_step = pipe[-1] # LogisticRegression
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1] # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")make_pipeline: O Atalho
Quando você não precisa de nomes personalizados para as etapas, make_pipeline os gera automaticamente a partir dos nomes das classes:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
# ('pca', PCA(n_components=2)),
# ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}Os nomes gerados automaticamente são o nome da classe em minúsculas. Se você usar o mesmo transformador duas vezes, make_pipeline acrescenta um número:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']Pipeline vs make_pipeline
| Recurso | Pipeline | make_pipeline |
|---|---|---|
| Nomes personalizados das etapas | Sim -- você escolhe | Não -- gerados automaticamente |
| Legibilidade para pipelines grandes | Melhor -- nomes descritivos | Pior -- nomes genéricos |
| Sintaxe de hyperparameter tuning | stepname__param com seus nomes | classname__param com nomes automáticos |
| Brevidade do código | Mais verboso | Mais conciso |
| Melhor para | Pipelines de produção, tuning | Prototipagem rápida |
Use Pipeline quando você planeja ajustar hiperparâmetros ou quando nomes claros melhoram a legibilidade. Use make_pipeline para experimentos rápidos.
Etapas Comuns de Pré-processamento
Aqui estão os transformadores mais usados em pipelines do sklearn:
Variáveis Numéricas
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])| Transformador | O que Faz | Quando Usar |
|---|---|---|
StandardScaler | Centraliza para média=0, escala para desvio padrão=1 | Escolha padrão para a maioria dos algoritmos |
MinMaxScaler | Escala para a faixa [0, 1] | Redes neurais, algoritmos sensíveis à magnitude |
RobustScaler | Usa mediana e IQR, robusto a outliers | Dados com outliers significativos |
SimpleImputer | Preenche valores ausentes (mean, median, most_frequent, constant) | Tratamento de dados ausentes |
PolynomialFeatures | Gera features polinomiais e interações | Adicionar não linearidade a modelos lineares |
PowerTransformer | Aplica transformação Yeo-Johnson ou Box-Cox | Distribuições enviesadas |
Variáveis Categóricas
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
# One-hot encode categorical features
categorical_pipe = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])| Transformador | O que Faz | Quando Usar |
|---|---|---|
OneHotEncoder | Cria colunas binárias para cada categoria | Categorias nominais (sem ordem) |
OrdinalEncoder | Mapeia categorias para inteiros | Categorias ordinais (low/medium/high) |
TargetEncoder | Codifica usando estatísticas da variável-alvo | Features de alta cardinalidade (scikit-learn 1.3+) |
ColumnTransformer para Tipos de Dados Mistos
Datasets reais têm colunas numéricas e categóricas. O ColumnTransformer aplica transformações diferentes a subconjuntos diferentes de colunas em paralelo:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
# Sample data with mixed types
data = pd.DataFrame({
'age': [25, 30, np.nan, 45, 50],
'income': [40000, 55000, 60000, np.nan, 90000],
'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
'purchased': [0, 1, 1, 0, 1]
})
X = data.drop('purchased', axis=1)
y = data['purchased']
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# Full pipeline: preprocessing + model
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', LogisticRegression(max_iter=1000))
])
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")Obtendo Nomes das Features Após a Transformação
Depois de ajustar um ColumnTransformer, você pode recuperar os nomes das features transformadas:
# After fitting the pipeline
pipeline.fit(X, y)
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
# 'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']Lidando com Colunas Restantes
Por padrão, ColumnTransformer remove colunas não especificadas em nenhum transformador. Controle isso com o parâmetro remainder:
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='passthrough' # Keep unspecified columns as-is
# remainder='drop' # Default: drop unspecified columns
)Pipeline com GridSearchCV
Um dos recursos mais poderosos dos pipelines do sklearn é a integração fluida com hyperparameter tuning. Use a sintaxe stepname__parameter para referenciar parâmetros dentro das etapas do pipeline:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
# Build pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('pca', PCA()),
('svc', SVC())
])
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
'pca__n_components': [5, 10, 15, 20],
'svc__C': [0.1, 1, 10, 100],
'svc__kernel': ['rbf', 'linear'],
'svc__gamma': ['scale', 'auto']
}
# Run grid search
grid = GridSearchCV(
pipe,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid.fit(X_train, y_train)
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score: {grid.best_score_:.4f}")
print(f"Test score: {grid.score(X_test, y_test):.4f}")Ajustando Parâmetros do ColumnTransformer
Para pipelines aninhados com ColumnTransformer, encadeie os nomes das etapas com duplo underscore:
# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
'classifier__C': [0.1, 1, 10]
}Usando RandomizedSearchCV para Grandes Espaços de Busca
Quando a grade de parâmetros é grande, RandomizedSearchCV amostra um número fixo de combinações:
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
param_distributions = {
'pca__n_components': randint(5, 25),
'svc__C': uniform(0.1, 100),
'svc__kernel': ['rbf', 'linear', 'poly'],
'svc__gamma': uniform(0.001, 1)
}
random_search = RandomizedSearchCV(
pipe,
param_distributions,
n_iter=50, # Sample 50 combinations
cv=5,
scoring='accuracy',
n_jobs=-1,
random_state=42
)
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score: {random_search.best_score_:.4f}")Transformadores Personalizados
FunctionTransformer: Etapas Personalizadas Rápidas
Para transformações simples e sem estado, use FunctionTransformer:
from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
func=np.log1p,
inverse_func=np.expm1 # Optional inverse for inverse_transform
)
pipe = Pipeline([
('log', log_transformer),
('scaler', StandardScaler())
])
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original: {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")Classe de Transformador Personalizado
Para transformações com estado (as que aprendem parâmetros a partir dos dados), crie uma classe que herde de BaseEstimator e TransformerMixin:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class OutlierClipper(BaseEstimator, TransformerMixin):
"""Clips values beyond a specified number of standard deviations."""
def __init__(self, n_std=3):
self.n_std = n_std
def fit(self, X, y=None):
# Learn the boundaries from training data
self.mean_ = np.mean(X, axis=0)
self.std_ = np.std(X, axis=0)
self.lower_ = self.mean_ - self.n_std * self.std_
self.upper_ = self.mean_ + self.n_std * self.std_
return self # Always return self from fit
def transform(self, X):
# Apply learned boundaries to any data
X_clipped = np.clip(X, self.lower_, self.upper_)
return X_clippedUse-o em um pipeline como qualquer transformador nativo:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('clip', OutlierClipper(n_std=2)),
('scaler', StandardScaler()),
('classifier', LogisticRegression(max_iter=1000))
])
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
grid = GridSearchCV(
pipe,
{'clip__n_std': [1.5, 2, 2.5, 3]},
cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")Transformador Personalizado para Feature Engineering
Um exemplo mais prático -- criar features de interação a partir de colunas específicas:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
class FeatureInteraction(BaseEstimator, TransformerMixin):
"""Creates multiplication interactions between specified column pairs."""
def __init__(self, interaction_pairs=None):
self.interaction_pairs = interaction_pairs
def fit(self, X, y=None):
# Store column names if DataFrame
if isinstance(X, pd.DataFrame):
self.feature_names_in_ = X.columns.tolist()
else:
self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
return self
def transform(self, X):
X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
if self.interaction_pairs:
for col_a, col_b in self.interaction_pairs:
name = f"{col_a}_x_{col_b}"
X_df[name] = X_df[col_a] * X_df[col_b]
return X_df.values
def get_feature_names_out(self, input_features=None):
names = list(self.feature_names_in_)
if self.interaction_pairs:
for col_a, col_b in self.interaction_pairs:
names.append(f"{col_a}_x_{col_b}")
return np.array(names)FeatureUnion: Feature Engineering em Paralelo
Enquanto Pipeline encadeia etapas sequencialmente, FeatureUnion executa transformadores em paralelo e concatena suas saídas horizontalmente:
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
# Create parallel feature branches
feature_union = FeatureUnion([
('scaled', StandardScaler()), # Original features, scaled
('pca', PCA(n_components=2)), # 2 PCA components
('poly', PolynomialFeatures(degree=2, include_bias=False)) # Polynomial features
])
# Combine into a full pipeline
pipe = Pipeline([
('features', feature_union),
('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
pipe.fit(X_train, y_train)
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features: {X_train.shape[1]}")
print(f"After FeatureUnion: {X_transformed.shape[1]}")
print(f"Test accuracy: {pipe.score(X_test, y_test):.4f}")FeatureUnion vs ColumnTransformer
| Recurso | FeatureUnion | ColumnTransformer |
|---|---|---|
| Entrada | Todas as colunas vão para todos os transformadores | Colunas específicas para transformadores específicos |
| Saída | Concatena horizontalmente | Concatena horizontalmente |
| Caso de uso | Múltiplas representações das mesmas features | Tipos diferentes de features exigem processamento diferente |
| Seleção de colunas | Não pode selecionar -- opera em todas as colunas | Especificação de colunas embutida |
| Alternativa moderna | Frequentemente substituído por ColumnTransformer | Preferido na maioria dos casos |
No scikit-learn moderno, ColumnTransformer resolve a maioria dos casos em que antes se usava FeatureUnion. FeatureUnion continua útil quando você quer múltiplas representações do mesmo conjunto de features (por exemplo, valores brutos + PCA + features polinomiais).
Salvando e Carregando Pipelines
Uma das maiores vantagens dos pipelines é a simplicidade de deployment. Em vez de serializar cada transformador e modelo separadamente, você salva um único objeto:
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")Versionando Pipelines
Para produção, inclua metadados junto com o pipeline:
import joblib
import datetime
import sklearn
artifact = {
'pipeline': pipe,
'training_date': datetime.datetime.now().isoformat(),
'sklearn_version': sklearn.__version__,
'feature_names': list(iris.feature_names),
'target_names': list(iris.target_names),
'training_accuracy': pipe.score(X_train, y_train),
'test_accuracy': pipe.score(X_test, y_test),
'n_training_samples': len(X_train)
}
joblib.dump(artifact, 'model_artifact_v1.joblib')
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version: {loaded['sklearn_version']}")
print(f"Test accuracy: {loaded['test_accuracy']:.4f}")
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])Usando pickle (Alternativa)
joblib é preferido para objetos sklearn porque lida de forma eficiente com grandes arrays NumPy. O pickle padrão também funciona:
import pickle
# Save
with open('pipeline.pkl', 'wb') as f:
pickle.dump(pipe, f)
# Load
with open('pipeline.pkl', 'rb') as f:
loaded = pickle.load(f)Exemplo Real: Pipeline Completo de Classificação
Aqui está um pipeline completo e de qualidade de produção para uma tarefa de classificação com tipos de features mistos. Ele usa um Random Forest classifier com ColumnTransformer e termina com um evaluation report. Este exemplo usa o padrão de dataset no estilo Titanic que a maioria dos profissionais de ML encontra:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
data = pd.DataFrame({
'age': np.random.normal(35, 12, n),
'income': np.random.lognormal(10.5, 0.8, n),
'credit_score': np.random.normal(650, 80, n),
'years_employed': np.random.exponential(5, n),
'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
'promoted': np.random.binomial(1, 0.3, n)
})
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
mask = np.random.random(n) < 0.05
data.loc[mask, col] = np.nan
for col in ['department', 'education']:
mask = np.random.random(n) < 0.03
data.loc[mask, col] = np.nan
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
# -- Combine column transformers --
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='drop' # Explicitly drop unlisted columns
)
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(
n_estimators=200,
max_depth=10,
min_samples_leaf=5,
class_weight='balanced',
random_state=42,
n_jobs=-1
))
])# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")# -- Hyperparameter tuning --
param_grid = {
'preprocessor__num__imputer__strategy': ['mean', 'median'],
'classifier__n_estimators': [100, 200, 300],
'classifier__max_depth': [5, 10, 15, None],
'classifier__min_samples_leaf': [3, 5, 10]
}
grid_search = GridSearchCV(
pipeline,
param_grid,
cv=5,
scoring='accuracy',
n_jobs=-1,
verbose=1
)
grid_search.fit(X_train, y_train)
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
print(f"Test score: {grid_search.score(X_test, y_test):.4f}")# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': importances
}).sort_values('importance', ascending=False)
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))# -- Save the final pipeline --
import joblib
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
'age': [28],
'income': [65000],
'credit_score': [720],
'years_employed': [3.5],
'department': ['Engineering'],
'education': ['Master'],
'city': ['NYC']
})
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")Este exemplo demonstra todos os padrões principais de pipeline: tipos mistos de features, tratamento de valores ausentes, cross-validation, hyperparameter tuning, inspeção de features e serialização para produção.
Pipeline com Diferentes Tipos de Modelos
O mesmo pipeline de pré-processamento pode servir a modelos diferentes. Troque apenas a última etapa:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
# Reuse the preprocessor from the previous example
models = {
'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
'SVM': SVC(kernel='rbf', random_state=42)
}
results = {}
for name, model in models.items():
pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', model)
])
scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
results[name] = {
'mean': scores.mean(),
'std': scores.std()
}
print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")Você também pode definir dinamicamente a etapa do estimador:
# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")Pipelines em Regressão
Pipelines funcionam da mesma forma para tarefas de regressão. Para detalhes sobre linear regression com sklearn, veja nosso sklearn linear regression guide.
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
'sqft': np.random.normal(1500, 400, n),
'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
'age': np.random.uniform(0, 50, n),
'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
housing['sqft'] * 200
+ housing['bedrooms'] * 15000
- housing['age'] * 1000
+ np.random.normal(0, 20000, n)
)
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
preprocessor = ColumnTransformer([
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
]), categorical_features)
])
reg_pipeline = Pipeline([
('preprocessor', preprocessor),
('regressor', Ridge(alpha=1.0))
])
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")Pulando Etapas com passthrough e None
Você pode pular etapas condicionalmente definindo-as como 'passthrough' ou None:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
cancer.data, cancer.target, test_size=0.2, random_state=42
)
pipe = Pipeline([
('scaler', StandardScaler()),
('reduce_dim', PCA()),
('classifier', SVC())
])
# Grid search can toggle steps on/off
param_grid = [
{
'reduce_dim': [PCA(5), PCA(10), PCA(15)],
'classifier__C': [1, 10]
},
{
'reduce_dim': ['passthrough'], # Skip PCA entirely
'classifier__C': [1, 10]
}
]
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score: {grid.best_score_:.4f}")Fazendo Cache de Etapas do Pipeline
Ao ajustar hiperparâmetros, etapas intermediárias podem ser recalculadas de forma redundante. Ative o cache para evitar isso:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
# Create a temporary cache directory
cachedir = mkdtemp()
pipe = Pipeline(
[
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('svc', SVC())
],
memory=cachedir # Cache intermediate transformations
)
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
# Clean up when done
# rmtree(cachedir)Explorando Dados Antes de Construir Pipelines
Antes de construir um pipeline, você precisa entender suas features: distribuições, padrões de valores ausentes, correlações e possíveis transformações. PyGWalker (opens in a new tab) permite transformar qualquer DataFrame do Pandas em uma interface interativa de exploração visual diretamente em notebooks Jupyter:
import pandas as pd
import pygwalker as pyg
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)Esse tipo de exploração visual ajuda você a decidir quais features precisam de scaling, quais têm outliers que precisam ser limitados e quais variáveis categóricas têm alta cardinalidade. Você pode identificar padrões de valores ausentes e entender distribuições antes de escrever uma única linha de código de pipeline.
Para iterar no seu fluxo completo de experimentação de pipeline -- testar diferentes estratégias de pré-processamento, comparar desempenho de modelos e acompanhar resultados -- o RunCell (opens in a new tab) oferece um ambiente Jupyter com IA, onde um agente ajuda na geração de código, debugging e gestão de experimentos.
Armadilhas Comuns e Dicas de Debugging
Armadilha 1: Esquecer de Usar o Pipeline na Previsão
# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)Armadilha 2: Ajustar o Pré-processamento Fora do Pipeline
# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train) # Pipeline handles the scalingArmadilha 3: Nomes de Parâmetros Errados no GridSearchCV
A sintaxe stepname__param deve corresponder exatamente aos nomes das etapas do seu pipeline:
pipe = Pipeline([
('my_scaler', StandardScaler()),
('my_clf', LogisticRegression())
])
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]} # KeyError
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}Armadilha 4: Mudanças na Ordem das Colunas
Ao usar ColumnTransformer com DataFrames do Pandas (que você pode carregar com pandas read_csv), a ordem das colunas na saída depende da ordem dos transformadores, não do DataFrame original:
# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
('num', numeric_transformer, numeric_features), # These come first
('cat', categorical_transformer, categorical_features) # These come second
])Debugging de Saídas Intermediárias
Para inspecionar o que acontece em cada etapa:
# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1] # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas") # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debugDebugging de Incompatibilidades de Shape
# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
for name, step in pipe.named_steps.items():
if hasattr(step, 'transform'):
# Check if the step has been fitted
try:
X_train = step.transform(X_train)
print(f"After '{name}': {X_train.shape}")
except Exception as e:
print(f"Error at '{name}': {e}")
breakReferência de Métodos do Pipeline
| Método | Descrição |
|---|---|
fit(X, y) | Ajusta todos os transformadores e o estimador final |
predict(X) | Transforma X por todas as etapas e depois prevê com o estimador final |
predict_proba(X) | Transforma e obtém estimativas de probabilidade (apenas classificadores) |
transform(X) | Transforma X por todas as etapas (se a última etapa for um transformador) |
fit_transform(X, y) | Ajusta e transforma em uma única chamada |
fit_predict(X, y) | Ajusta e prevê em uma única chamada |
score(X, y) | Transforma e calcula a pontuação (accuracy para classificadores, R2 para regressors) |
set_params(**params) | Define parâmetros usando a sintaxe stepname__param |
get_params() | Obtém todos os parâmetros |
named_steps | Acesso tipo dicionário às etapas do pipeline |
[i] or [name] | Acessa uma etapa por índice ou nome |
[start:end] | Faz slice para criar um sub-pipeline |
FAQ
Qual é a diferença entre Pipeline e make_pipeline?
Pipeline exige que você forneça tuplas (name, estimator) para cada etapa, dando controle explícito sobre os nomes das etapas. make_pipeline aceita instâncias de estimadores diretamente e gera nomes automaticamente a partir das classes (em minúsculas). Use Pipeline quando precisar de nomes descritivos ou quando planejar ajustar hiperparâmetros com GridSearchCV. Use make_pipeline para prototipagem rápida.
O sklearn Pipeline evita data leakage?
Sim. Quando você chama pipeline.fit(X_train, y_train), cada transformador é ajustado apenas nos dados de treinamento. Durante cross-validation com cross_val_score ou GridSearchCV, o pipeline refaz todos os passos em cada fold de treino, garantindo que nenhum dado do fold de teste vaze para o pré-processamento. Essa é a principal vantagem sobre o pré-processamento manual.
Posso usar Pipeline com modelos de deep learning?
Pipelines do Scikit-learn funcionam com qualquer estimador que siga a API do sklearn (implemente fit, predict e, opcionalmente, transform). Bibliotecas como scikeras fornecem wrappers compatíveis com sklearn para modelos Keras, permitindo seu uso em pipelines. XGBoost e LightGBM também oferecem interfaces compatíveis com sklearn.
Como lidar com nomes de features após ColumnTransformer?
Chame pipeline.named_steps['preprocessor'].get_feature_names_out() depois de ajustar. Isso retorna um array de nomes de features com prefixos indicando qual transformador as produziu (por exemplo, num__age, cat__city_NYC). Isso funciona no scikit-learn 1.0 e posteriores.
Posso ter vários modelos em um único Pipeline?
Não. Um Pipeline é uma sequência linear em que a última etapa é o estimador. Se quiser comparar vários modelos, crie pipelines separados com as mesmas etapas de pré-processamento, mas com estimadores finais diferentes. Você pode automatizar isso iterando sobre um dicionário de modelos e criando um pipeline para cada um.
Como pulo uma etapa em um Pipeline durante o GridSearchCV?
Defina a etapa como 'passthrough' na grade de parâmetros. Por exemplo: {'reduce_dim': ['passthrough']} irá pular completamente a etapa reduce_dim durante aquela iteração da busca. Você também pode definir uma etapa como None, mas 'passthrough' é a abordagem recomendada.
O que acontece se o Pipeline encontrar categorias não vistas no momento da previsão?
Se você usar OneHotEncoder com handle_unknown='ignore' dentro do seu pipeline, categorias não vistas são codificadas como todos zeros. Sem essa configuração, o pipeline gera um erro. Sempre defina handle_unknown='ignore' em pipelines de produção onde novos valores categóricos podem aparecer.
Conclusão
O Sklearn Pipeline transforma código de ML bagunçado e propenso a erros em workflows limpos e reproduzíveis. Ao encadear pré-processamento e modelagem em um único objeto, você elimina data leakage, simplifica a implantação e torna trivial o ajuste de hiperparâmetros em todo o workflow.
Comece pelo básico: envolva seu scaler e seu modelo em um Pipeline. Avance para ColumnTransformer quando tiver tipos de features mistos. Use GridSearchCV para ajustar parâmetros de todas as etapas do pipeline simultaneamente. Crie transformadores personalizados quando as opções nativas não cobrirem suas necessidades de feature engineering.
O investimento em aprender pipelines traz retorno imediato. Seus resultados de cross-validation se tornam confiáveis porque o pré-processamento é reajustado a cada fold. Sua implantação em produção se resume a um único joblib.dump e joblib.load. E seu codebase se torna mais fácil de manter porque toda a lógica de transformação e previsão vive em um único objeto inspecionável.
Guias Relacionados
- Sklearn Linear Regression -- construindo modelos de regressão que se encaixam naturalmente em pipelines
- Sklearn Confusion Matrix -- avaliando a saída de classificação do seu pipeline
- Sklearn Random Forest -- um classificador poderoso para usar como etapa final do pipeline
- Pandas read_csv -- carregando dados CSV em DataFrames antes de alimentá-los em um pipeline