Skip to content
Tópicos
Scikit-Learn
Sklearn Pipeline: guia completo para construir pipelines de ML em Python

Sklearn Pipeline: guia completo para construir pipelines de ML em Python

Atualizado em

Você tem um projeto de machine learning com cinco etapas de pré-processamento, três operações de feature engineering e um modelo final. Cada etapa é um bloco de código separado. Você ajusta o scaler no dataset inteiro e depois divide em treino e teste. Sua one-hot encoding cria colunas diferentes no treinamento e em produção. Meses depois, alguém altera a estratégia de imputação, mas esquece de atualizar o script de deployment.

Essa é a realidade da maioria dos codebases de ML. Pipelines manuais de pré-processamento são frágeis, propensos a erros e uma fonte constante de data leakage -- a causa isolada mais comum de modelos que vão muito bem em notebooks, mas falham em dados reais. Quando você ajusta um StandardScaler em todo o dataset antes da divisão, estatísticas do conjunto de teste vazam para o treino. Quando você codifica variáveis categóricas fora de um workflow unificado, o descompasso entre treino e teste fica invisível até a produção quebrar.

O Pipeline do Scikit-learn resolve esses problemas encadeando pré-processamento e modelagem em um único objeto. Uma chamada para fit() treina tudo. Uma chamada para predict() transforma e prevê. Sem data leakage. Sem transformações incompatíveis. Um único objeto para salvar, carregar e implantar. Este guia cobre tudo o que você precisa para construir sklearn pipelines de qualidade de produção, desde o uso básico até transformadores personalizados e padrões reais de implantação.

Por que Pipelines Importam

O Problema de Data Leakage

Data leakage acontece quando informações fora do conjunto de treinamento influenciam o modelo durante o treinamento. A forma mais comum no pré-processamento se parece com isto:

# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
 
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # Fitted on ALL data, including test
 
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full dataset

O scaler calcula média e desvio padrão com base em todo o dataset, incluindo as amostras de teste. Sua avaliação no conjunto de teste agora fica otimista porque o modelo “viu” indiretamente informações dessas amostras durante o pré-processamento.

A abordagem correta:

# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
 
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # Fit only on training
X_test_scaled = scaler.transform(X_test)         # Transform only, no fitting

Isso funciona, mas a abordagem manual rapidamente fica difícil de gerenciar. Com cinco etapas de pré-processamento, você precisa acompanhar cinco objetos ajustados e lembrar a chamada correta de fit_transform versus transform para cada um. Um Pipeline faz isso automaticamente.

Organização do Código

Além do leakage, pipelines resolvem um problema de organização de código. Compare estas duas abordagens:

AspectoPré-processamento manualsklearn Pipeline
Risco de data leakageAlto -- fácil chamar fit_transform em dados de testeNenhum -- o pipeline força fit/transform corretos
Linhas de código para treino + previsão10-30 linhas por ambiente2 linhas (fit, predict)
Implantação em produçãoSerializar cada transformador separadamente, reconstruir a ordemSerializar um objeto com joblib
Cross-validationPrecisa refazer manualmente todas as etapas por foldcross_val_score cuida de tudo
Hyperparameter tuningIterar manualmente sobre pré-processamento + parâmetros do modeloGridSearchCV ajusta todos os parâmetros juntos
ReprodutibilidadeDepende da ordem de execução no notebookDeterminística -- mesmo objeto, mesmo resultado
DebuggingImprimir shapes após cada etapa, manualmentepipeline.named_steps para inspeção

Uso Básico de Pipeline

A classe Pipeline recebe uma lista de tuplas (nome, transformer). Todas as etapas, exceto a última, devem implementar fit e transform. A última etapa pode ser qualquer estimador (classificador, regressor ou transformador).

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
 
# Create pipeline with named steps
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

Ajuste e Previsão

Quando você chama pipe.fit(X_train, y_train), o pipeline:

  1. Chama scaler.fit_transform(X_train, y_train) -- ajusta o scaler e transforma os dados de treino
  2. Passa os dados transformados para classifier.fit(X_transformed, y_train)

Quando você chama pipe.predict(X_test), o pipeline:

  1. Chama scaler.transform(X_test) -- transforma apenas, sem ajustar
  2. Passa os dados transformados para classifier.predict(X_transformed)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
 
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
# Build and train pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000

Acessando Etapas Individuais

Você pode inspecionar qualquer etapa pelo nome:

# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds:  {scaler.scale_}")
 
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")

Você também pode usar indexação:

# Access by index
first_step = pipe[0]   # StandardScaler
last_step = pipe[-1]    # LogisticRegression
 
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1]  # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")

make_pipeline: O Atalho

Quando você não precisa de nomes personalizados para as etapas, make_pipeline os gera automaticamente a partir dos nomes das classes:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
 
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
#                          ('pca', PCA(n_components=2)),
#                          ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
 
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}

Os nomes gerados automaticamente são o nome da classe em minúsculas. Se você usar o mesmo transformador duas vezes, make_pipeline acrescenta um número:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
 
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']

Pipeline vs make_pipeline

RecursoPipelinemake_pipeline
Nomes personalizados das etapasSim -- você escolheNão -- gerados automaticamente
Legibilidade para pipelines grandesMelhor -- nomes descritivosPior -- nomes genéricos
Sintaxe de hyperparameter tuningstepname__param com seus nomesclassname__param com nomes automáticos
Brevidade do códigoMais verbosoMais conciso
Melhor paraPipelines de produção, tuningPrototipagem rápida

Use Pipeline quando você planeja ajustar hiperparâmetros ou quando nomes claros melhoram a legibilidade. Use make_pipeline para experimentos rápidos.

Etapas Comuns de Pré-processamento

Aqui estão os transformadores mais usados em pipelines do sklearn:

Variáveis Numéricas

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
 
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
TransformadorO que FazQuando Usar
StandardScalerCentraliza para média=0, escala para desvio padrão=1Escolha padrão para a maioria dos algoritmos
MinMaxScalerEscala para a faixa [0, 1]Redes neurais, algoritmos sensíveis à magnitude
RobustScalerUsa mediana e IQR, robusto a outliersDados com outliers significativos
SimpleImputerPreenche valores ausentes (mean, median, most_frequent, constant)Tratamento de dados ausentes
PolynomialFeaturesGera features polinomiais e interaçõesAdicionar não linearidade a modelos lineares
PowerTransformerAplica transformação Yeo-Johnson ou Box-CoxDistribuições enviesadas

Variáveis Categóricas

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
 
# One-hot encode categorical features
categorical_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
TransformadorO que FazQuando Usar
OneHotEncoderCria colunas binárias para cada categoriaCategorias nominais (sem ordem)
OrdinalEncoderMapeia categorias para inteirosCategorias ordinais (low/medium/high)
TargetEncoderCodifica usando estatísticas da variável-alvoFeatures de alta cardinalidade (scikit-learn 1.3+)

ColumnTransformer para Tipos de Dados Mistos

Datasets reais têm colunas numéricas e categóricas. O ColumnTransformer aplica transformações diferentes a subconjuntos diferentes de colunas em paralelo:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
 
# Sample data with mixed types
data = pd.DataFrame({
    'age': [25, 30, np.nan, 45, 50],
    'income': [40000, 55000, 60000, np.nan, 90000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
    'purchased': [0, 1, 1, 0, 1]
})
 
X = data.drop('purchased', axis=1)
y = data['purchased']
 
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
 
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)
 
# Full pipeline: preprocessing + model
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")

Obtendo Nomes das Features Após a Transformação

Depois de ajustar um ColumnTransformer, você pode recuperar os nomes das features transformadas:

# After fitting the pipeline
pipeline.fit(X, y)
 
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
#  'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']

Lidando com Colunas Restantes

Por padrão, ColumnTransformer remove colunas não especificadas em nenhum transformador. Controle isso com o parâmetro remainder:

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough'  # Keep unspecified columns as-is
    # remainder='drop'       # Default: drop unspecified columns
)

Pipeline com GridSearchCV

Um dos recursos mais poderosos dos pipelines do sklearn é a integração fluida com hyperparameter tuning. Use a sintaxe stepname__parameter para referenciar parâmetros dentro das etapas do pipeline:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
 
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
 
# Build pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svc', SVC())
])
 
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
    'pca__n_components': [5, 10, 15, 20],
    'svc__C': [0.1, 1, 10, 100],
    'svc__kernel': ['rbf', 'linear'],
    'svc__gamma': ['scale', 'auto']
}
 
# Run grid search
grid = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid.fit(X_train, y_train)
 
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score:   {grid.best_score_:.4f}")
print(f"Test score:      {grid.score(X_test, y_test):.4f}")

Ajustando Parâmetros do ColumnTransformer

Para pipelines aninhados com ColumnTransformer, encadeie os nomes das etapas com duplo underscore:

# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
    'classifier__C': [0.1, 1, 10]
}

Usando RandomizedSearchCV para Grandes Espaços de Busca

Quando a grade de parâmetros é grande, RandomizedSearchCV amostra um número fixo de combinações:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
 
param_distributions = {
    'pca__n_components': randint(5, 25),
    'svc__C': uniform(0.1, 100),
    'svc__kernel': ['rbf', 'linear', 'poly'],
    'svc__gamma': uniform(0.001, 1)
}
 
random_search = RandomizedSearchCV(
    pipe,
    param_distributions,
    n_iter=50,       # Sample 50 combinations
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)
 
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score:   {random_search.best_score_:.4f}")

Transformadores Personalizados

FunctionTransformer: Etapas Personalizadas Rápidas

Para transformações simples e sem estado, use FunctionTransformer:

from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
 
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1  # Optional inverse for inverse_transform
)
 
pipe = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler())
])
 
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original:    {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")

Classe de Transformador Personalizado

Para transformações com estado (as que aprendem parâmetros a partir dos dados), crie uma classe que herde de BaseEstimator e TransformerMixin:

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
 
class OutlierClipper(BaseEstimator, TransformerMixin):
    """Clips values beyond a specified number of standard deviations."""
 
    def __init__(self, n_std=3):
        self.n_std = n_std
 
    def fit(self, X, y=None):
        # Learn the boundaries from training data
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        self.lower_ = self.mean_ - self.n_std * self.std_
        self.upper_ = self.mean_ + self.n_std * self.std_
        return self  # Always return self from fit
 
    def transform(self, X):
        # Apply learned boundaries to any data
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped

Use-o em um pipeline como qualquer transformador nativo:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('clip', OutlierClipper(n_std=2)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
 
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
 
grid = GridSearchCV(
    pipe,
    {'clip__n_std': [1.5, 2, 2.5, 3]},
    cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")

Transformador Personalizado para Feature Engineering

Um exemplo mais prático -- criar features de interação a partir de colunas específicas:

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
 
class FeatureInteraction(BaseEstimator, TransformerMixin):
    """Creates multiplication interactions between specified column pairs."""
 
    def __init__(self, interaction_pairs=None):
        self.interaction_pairs = interaction_pairs
 
    def fit(self, X, y=None):
        # Store column names if DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.tolist()
        else:
            self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
        return self
 
    def transform(self, X):
        X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
 
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                name = f"{col_a}_x_{col_b}"
                X_df[name] = X_df[col_a] * X_df[col_b]
 
        return X_df.values
 
    def get_feature_names_out(self, input_features=None):
        names = list(self.feature_names_in_)
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                names.append(f"{col_a}_x_{col_b}")
        return np.array(names)

FeatureUnion: Feature Engineering em Paralelo

Enquanto Pipeline encadeia etapas sequencialmente, FeatureUnion executa transformadores em paralelo e concatena suas saídas horizontalmente:

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
# Create parallel feature branches
feature_union = FeatureUnion([
    ('scaled', StandardScaler()),             # Original features, scaled
    ('pca', PCA(n_components=2)),             # 2 PCA components
    ('poly', PolynomialFeatures(degree=2, include_bias=False))  # Polynomial features
])
 
# Combine into a full pipeline
pipe = Pipeline([
    ('features', feature_union),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features:    {X_train.shape[1]}")
print(f"After FeatureUnion:   {X_transformed.shape[1]}")
print(f"Test accuracy:        {pipe.score(X_test, y_test):.4f}")

FeatureUnion vs ColumnTransformer

RecursoFeatureUnionColumnTransformer
EntradaTodas as colunas vão para todos os transformadoresColunas específicas para transformadores específicos
SaídaConcatena horizontalmenteConcatena horizontalmente
Caso de usoMúltiplas representações das mesmas featuresTipos diferentes de features exigem processamento diferente
Seleção de colunasNão pode selecionar -- opera em todas as colunasEspecificação de colunas embutida
Alternativa modernaFrequentemente substituído por ColumnTransformerPreferido na maioria dos casos

No scikit-learn moderno, ColumnTransformer resolve a maioria dos casos em que antes se usava FeatureUnion. FeatureUnion continua útil quando você quer múltiplas representações do mesmo conjunto de features (por exemplo, valores brutos + PCA + features polinomiais).

Salvando e Carregando Pipelines

Uma das maiores vantagens dos pipelines é a simplicidade de deployment. Em vez de serializar cada transformador e modelo separadamente, você salva um único objeto:

import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
 
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
 
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")

Versionando Pipelines

Para produção, inclua metadados junto com o pipeline:

import joblib
import datetime
import sklearn
 
artifact = {
    'pipeline': pipe,
    'training_date': datetime.datetime.now().isoformat(),
    'sklearn_version': sklearn.__version__,
    'feature_names': list(iris.feature_names),
    'target_names': list(iris.target_names),
    'training_accuracy': pipe.score(X_train, y_train),
    'test_accuracy': pipe.score(X_test, y_test),
    'n_training_samples': len(X_train)
}
 
joblib.dump(artifact, 'model_artifact_v1.joblib')
 
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version:  {loaded['sklearn_version']}")
print(f"Test accuracy:    {loaded['test_accuracy']:.4f}")
 
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])

Usando pickle (Alternativa)

joblib é preferido para objetos sklearn porque lida de forma eficiente com grandes arrays NumPy. O pickle padrão também funciona:

import pickle
 
# Save
with open('pipeline.pkl', 'wb') as f:
    pickle.dump(pipe, f)
 
# Load
with open('pipeline.pkl', 'rb') as f:
    loaded = pickle.load(f)

Exemplo Real: Pipeline Completo de Classificação

Aqui está um pipeline completo e de qualidade de produção para uma tarefa de classificação com tipos de features mistos. Ele usa um Random Forest classifier com ColumnTransformer e termina com um evaluation report. Este exemplo usa o padrão de dataset no estilo Titanic que a maioria dos profissionais de ML encontra:

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
    train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
 
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
 
data = pd.DataFrame({
    'age': np.random.normal(35, 12, n),
    'income': np.random.lognormal(10.5, 0.8, n),
    'credit_score': np.random.normal(650, 80, n),
    'years_employed': np.random.exponential(5, n),
    'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
    'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
    'promoted': np.random.binomial(1, 0.3, n)
})
 
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
    mask = np.random.random(n) < 0.05
    data.loc[mask, col] = np.nan
 
for col in ['department', 'education']:
    mask = np.random.random(n) < 0.03
    data.loc[mask, col] = np.nan
 
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")
# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
 
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set:     {X_test.shape[0]} samples")
# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
 
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# -- Combine column transformers --
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'  # Explicitly drop unlisted columns
)
 
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(
        n_estimators=200,
        max_depth=10,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    ))
])
# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
 
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
 
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
# -- Hyperparameter tuning --
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [5, 10, 15, None],
    'classifier__min_samples_leaf': [3, 5, 10]
}
 
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid_search.fit(X_train, y_train)
 
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score:   {grid_search.best_score_:.4f}")
print(f"Test score:      {grid_search.score(X_test, y_test):.4f}")
# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
 
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
 
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values('importance', ascending=False)
 
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))
# -- Save the final pipeline --
import joblib
 
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
 
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
 
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
    'age': [28],
    'income': [65000],
    'credit_score': [720],
    'years_employed': [3.5],
    'department': ['Engineering'],
    'education': ['Master'],
    'city': ['NYC']
})
 
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")

Este exemplo demonstra todos os padrões principais de pipeline: tipos mistos de features, tratamento de valores ausentes, cross-validation, hyperparameter tuning, inspeção de features e serialização para produção.

Pipeline com Diferentes Tipos de Modelos

O mesmo pipeline de pré-processamento pode servir a modelos diferentes. Troque apenas a última etapa:

from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
 
# Reuse the preprocessor from the previous example
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42)
}
 
results = {}
for name, model in models.items():
    pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
    results[name] = {
        'mean': scores.mean(),
        'std': scores.std()
    }
    print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")

Você também pode definir dinamicamente a etapa do estimador:

# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")

Pipelines em Regressão

Pipelines funcionam da mesma forma para tarefas de regressão. Para detalhes sobre linear regression com sklearn, veja nosso sklearn linear regression guide.

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
 
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
    'sqft': np.random.normal(1500, 400, n),
    'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
    'age': np.random.uniform(0, 50, n),
    'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
    'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
    housing['sqft'] * 200
    + housing['bedrooms'] * 15000
    - housing['age'] * 1000
    + np.random.normal(0, 20000, n)
)
 
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
 
preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('poly', PolynomialFeatures(degree=2, include_bias=False)),
        ('scaler', StandardScaler())
    ]), numeric_features),
    ('cat', Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ]), categorical_features)
])
 
reg_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', Ridge(alpha=1.0))
])
 
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
 
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")

Pulando Etapas com passthrough e None

Você pode pular etapas condicionalmente definindo-as como 'passthrough' ou None:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
 
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('reduce_dim', PCA()),
    ('classifier', SVC())
])
 
# Grid search can toggle steps on/off
param_grid = [
    {
        'reduce_dim': [PCA(5), PCA(10), PCA(15)],
        'classifier__C': [1, 10]
    },
    {
        'reduce_dim': ['passthrough'],  # Skip PCA entirely
        'classifier__C': [1, 10]
    }
]
 
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score:  {grid.best_score_:.4f}")

Fazendo Cache de Etapas do Pipeline

Ao ajustar hiperparâmetros, etapas intermediárias podem ser recalculadas de forma redundante. Ative o cache para evitar isso:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
 
# Create a temporary cache directory
cachedir = mkdtemp()
 
pipe = Pipeline(
    [
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=10)),
        ('svc', SVC())
    ],
    memory=cachedir  # Cache intermediate transformations
)
 
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
 
# Clean up when done
# rmtree(cachedir)

Explorando Dados Antes de Construir Pipelines

Antes de construir um pipeline, você precisa entender suas features: distribuições, padrões de valores ausentes, correlações e possíveis transformações. PyGWalker (opens in a new tab) permite transformar qualquer DataFrame do Pandas em uma interface interativa de exploração visual diretamente em notebooks Jupyter:

import pandas as pd
import pygwalker as pyg
 
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)

Esse tipo de exploração visual ajuda você a decidir quais features precisam de scaling, quais têm outliers que precisam ser limitados e quais variáveis categóricas têm alta cardinalidade. Você pode identificar padrões de valores ausentes e entender distribuições antes de escrever uma única linha de código de pipeline.

Para iterar no seu fluxo completo de experimentação de pipeline -- testar diferentes estratégias de pré-processamento, comparar desempenho de modelos e acompanhar resultados -- o RunCell (opens in a new tab) oferece um ambiente Jupyter com IA, onde um agente ajuda na geração de código, debugging e gestão de experimentos.

Armadilhas Comuns e Dicas de Debugging

Armadilha 1: Esquecer de Usar o Pipeline na Previsão

# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
 
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)

Armadilha 2: Ajustar o Pré-processamento Fora do Pipeline

# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
 
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train)  # Pipeline handles the scaling

Armadilha 3: Nomes de Parâmetros Errados no GridSearchCV

A sintaxe stepname__param deve corresponder exatamente aos nomes das etapas do seu pipeline:

pipe = Pipeline([
    ('my_scaler', StandardScaler()),
    ('my_clf', LogisticRegression())
])
 
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]}  # KeyError
 
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}

Armadilha 4: Mudanças na Ordem das Colunas

Ao usar ColumnTransformer com DataFrames do Pandas (que você pode carregar com pandas read_csv), a ordem das colunas na saída depende da ordem dos transformadores, não do DataFrame original:

# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
    ('num', numeric_transformer, numeric_features),     # These come first
    ('cat', categorical_transformer, categorical_features)  # These come second
])

Debugging de Saídas Intermediárias

Para inspecionar o que acontece em cada etapa:

# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
 
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1]  # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
 
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas")  # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debug

Debugging de Incompatibilidades de Shape

# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
 
for name, step in pipe.named_steps.items():
    if hasattr(step, 'transform'):
        # Check if the step has been fitted
        try:
            X_train = step.transform(X_train)
            print(f"After '{name}': {X_train.shape}")
        except Exception as e:
            print(f"Error at '{name}': {e}")
            break

Referência de Métodos do Pipeline

MétodoDescrição
fit(X, y)Ajusta todos os transformadores e o estimador final
predict(X)Transforma X por todas as etapas e depois prevê com o estimador final
predict_proba(X)Transforma e obtém estimativas de probabilidade (apenas classificadores)
transform(X)Transforma X por todas as etapas (se a última etapa for um transformador)
fit_transform(X, y)Ajusta e transforma em uma única chamada
fit_predict(X, y)Ajusta e prevê em uma única chamada
score(X, y)Transforma e calcula a pontuação (accuracy para classificadores, R2 para regressors)
set_params(**params)Define parâmetros usando a sintaxe stepname__param
get_params()Obtém todos os parâmetros
named_stepsAcesso tipo dicionário às etapas do pipeline
[i] or [name]Acessa uma etapa por índice ou nome
[start:end]Faz slice para criar um sub-pipeline

FAQ

Qual é a diferença entre Pipeline e make_pipeline?

Pipeline exige que você forneça tuplas (name, estimator) para cada etapa, dando controle explícito sobre os nomes das etapas. make_pipeline aceita instâncias de estimadores diretamente e gera nomes automaticamente a partir das classes (em minúsculas). Use Pipeline quando precisar de nomes descritivos ou quando planejar ajustar hiperparâmetros com GridSearchCV. Use make_pipeline para prototipagem rápida.

O sklearn Pipeline evita data leakage?

Sim. Quando você chama pipeline.fit(X_train, y_train), cada transformador é ajustado apenas nos dados de treinamento. Durante cross-validation com cross_val_score ou GridSearchCV, o pipeline refaz todos os passos em cada fold de treino, garantindo que nenhum dado do fold de teste vaze para o pré-processamento. Essa é a principal vantagem sobre o pré-processamento manual.

Posso usar Pipeline com modelos de deep learning?

Pipelines do Scikit-learn funcionam com qualquer estimador que siga a API do sklearn (implemente fit, predict e, opcionalmente, transform). Bibliotecas como scikeras fornecem wrappers compatíveis com sklearn para modelos Keras, permitindo seu uso em pipelines. XGBoost e LightGBM também oferecem interfaces compatíveis com sklearn.

Como lidar com nomes de features após ColumnTransformer?

Chame pipeline.named_steps['preprocessor'].get_feature_names_out() depois de ajustar. Isso retorna um array de nomes de features com prefixos indicando qual transformador as produziu (por exemplo, num__age, cat__city_NYC). Isso funciona no scikit-learn 1.0 e posteriores.

Posso ter vários modelos em um único Pipeline?

Não. Um Pipeline é uma sequência linear em que a última etapa é o estimador. Se quiser comparar vários modelos, crie pipelines separados com as mesmas etapas de pré-processamento, mas com estimadores finais diferentes. Você pode automatizar isso iterando sobre um dicionário de modelos e criando um pipeline para cada um.

Como pulo uma etapa em um Pipeline durante o GridSearchCV?

Defina a etapa como 'passthrough' na grade de parâmetros. Por exemplo: {'reduce_dim': ['passthrough']} irá pular completamente a etapa reduce_dim durante aquela iteração da busca. Você também pode definir uma etapa como None, mas 'passthrough' é a abordagem recomendada.

O que acontece se o Pipeline encontrar categorias não vistas no momento da previsão?

Se você usar OneHotEncoder com handle_unknown='ignore' dentro do seu pipeline, categorias não vistas são codificadas como todos zeros. Sem essa configuração, o pipeline gera um erro. Sempre defina handle_unknown='ignore' em pipelines de produção onde novos valores categóricos podem aparecer.

Conclusão

O Sklearn Pipeline transforma código de ML bagunçado e propenso a erros em workflows limpos e reproduzíveis. Ao encadear pré-processamento e modelagem em um único objeto, você elimina data leakage, simplifica a implantação e torna trivial o ajuste de hiperparâmetros em todo o workflow.

Comece pelo básico: envolva seu scaler e seu modelo em um Pipeline. Avance para ColumnTransformer quando tiver tipos de features mistos. Use GridSearchCV para ajustar parâmetros de todas as etapas do pipeline simultaneamente. Crie transformadores personalizados quando as opções nativas não cobrirem suas necessidades de feature engineering.

O investimento em aprender pipelines traz retorno imediato. Seus resultados de cross-validation se tornam confiáveis porque o pré-processamento é reajustado a cada fold. Sua implantação em produção se resume a um único joblib.dump e joblib.load. E seu codebase se torna mais fácil de manter porque toda a lógica de transformação e previsão vive em um único objeto inspecionável.

Guias Relacionados

📚