Skip to content
Temas
Scikit-Learn
Sklearn Pipeline: Guía completa para construir ML pipelines en Python

Sklearn Pipeline: Guía completa para construir ML pipelines en Python

Actualizado el

Tienes un proyecto de machine learning con cinco pasos de preprocesamiento, tres operaciones de feature engineering y un modelo final. Cada paso es un bloque de código separado. Haces fit de tu scaler sobre el dataset completo y luego divides en train y test. Tu one-hot encoding crea distintas columnas en entrenamiento que en producción. Meses después, alguien cambia la estrategia de imputación pero olvida actualizar el script de despliegue.

Esta es la realidad de la mayoría de los codebases de ML. Los pipelines manuales de preprocesamiento son frágiles, propensos a errores y una fuente constante de data leakage -- la causa más común de modelos que funcionan genial en notebooks pero fallan en datos reales. Cuando haces fit de un StandardScaler sobre todo el dataset antes de dividir, las estadísticas del conjunto de test se filtran al entrenamiento. Cuando codificas variables categóricas fuera de un flujo unificado, el skew entre train y test pasa desapercibido hasta que producción se rompe.

Pipeline de Scikit-learn resuelve estos problemas encadenando el preprocesamiento y el modelado en un único objeto. Una llamada a fit() entrena todo. Una llamada a predict() transforma y predice. Sin data leakage. Sin transformaciones desalineadas. Un solo objeto para guardar, cargar y desplegar. Esta guía cubre todo lo que necesitas para construir sklearn pipelines de calidad de producción, desde el uso básico hasta custom transformers y patrones reales de despliegue.

Por qué importan los Pipelines

El problema de Data Leakage

Data leakage ocurre cuando información fuera del conjunto de entrenamiento influye en el modelo durante el entrenamiento. La forma más común en preprocesamiento se ve así:

# WRONG: Data leakage -- scaler sees test data
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
 
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # Fitted on ALL data, including test
 
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)
# X_test was already influenced by scaler statistics computed on the full dataset

El scaler calcula la media y la desviación estándar usando todo el dataset, incluidos los samples de test. Tu evaluación sobre el test set ahora es demasiado optimista porque el modelo “vio” indirectamente información de esas muestras durante el preprocesamiento.

La forma correcta:

# CORRECT: No leakage -- scaler fitted only on training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
 
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # Fit only on training
X_test_scaled = scaler.transform(X_test)         # Transform only, no fitting

Esto funciona, pero el enfoque manual se vuelve inmanejable rápidamente. Con cinco pasos de preprocesamiento, tienes que seguir cinco objetos ajustados y recordar la llamada correcta a fit_transform frente a transform para cada uno. Un Pipeline lo maneja automáticamente.

Organización del código

Más allá de la fuga de datos, los pipelines resuelven un problema de organización del código. Compara estos dos enfoques:

AspectoPreprocesamiento manualsklearn Pipeline
Riesgo de data leakageAlto -- es fácil llamar fit_transform sobre datos de testNinguno -- el pipeline fuerza el flujo correcto fit/transform
Líneas de código para train + predict10-30 líneas por entorno2 líneas (fit, predict)
Despliegue en producciónSerializar cada transformer por separado, reconstruir el ordenSerializar un solo objeto con joblib
Cross-validationHay que refitar manualmente todos los pasos por foldcross_val_score maneja todo
Hyperparameter tuningIterar manualmente sobre preprocesamiento + parámetros del modeloGridSearchCV ajusta todos los parámetros juntos
ReproducibilidadDepende del orden de ejecución en el notebookDeterminista -- mismo objeto, mismo resultado
DebuggingImprimir shapes tras cada paso, manualmentepipeline.named_steps para inspección

Uso básico de Pipeline

La clase Pipeline toma una lista de tuplas (name, transformer). Todos los pasos excepto el último deben implementar fit y transform. El último paso puede ser cualquier estimator (classifier, regressor o transformer).

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
 
# Create pipeline with named steps
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

Fit y predicción

Cuando llamas pipe.fit(X_train, y_train), el pipeline:

  1. Llama a scaler.fit_transform(X_train, y_train) -- ajusta el scaler y transforma los datos de entrenamiento
  2. Pasa los datos transformados a classifier.fit(X_transformed, y_train)

Cuando llamas pipe.predict(X_test), el pipeline:

  1. Llama a scaler.transform(X_test) -- transforma únicamente, sin ajustar
  2. Pasa los datos transformados a classifier.predict(X_transformed)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
 
# Load data
iris = load_iris()
X, y = iris.data, iris.target
 
# Split -- see our guide on train_test_split for details:
# /topics/Scikit-Learn/sklearn-train-test-split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
# Build and train pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Evaluate
accuracy = pipe.score(X_test, y_test)
print(f"Test accuracy: {accuracy:.4f}")
# Test accuracy: 1.0000

Acceder a pasos individuales

Puedes inspeccionar cualquier paso por nombre:

# Access scaler parameters after fitting
scaler = pipe.named_steps['scaler']
print(f"Feature means: {scaler.mean_}")
print(f"Feature stds:  {scaler.scale_}")
 
# Access the classifier
clf = pipe.named_steps['classifier']
print(f"Coefficients shape: {clf.coef_.shape}")
print(f"Classes: {clf.classes_}")

También puedes usar indexación:

# Access by index
first_step = pipe[0]   # StandardScaler
last_step = pipe[-1]    # LogisticRegression
 
# Slice the pipeline (returns a new Pipeline)
preprocessing = pipe[:-1]  # Just the scaler
X_test_transformed = preprocessing.transform(X_test)
print(f"Transformed shape: {X_test_transformed.shape}")

make_pipeline: el atajo

Cuando no necesitas nombres personalizados para los pasos, make_pipeline genera nombres automáticamente a partir de los nombres de clase:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
 
# Equivalent to Pipeline([('standardscaler', StandardScaler()),
#                          ('pca', PCA(n_components=2)),
#                          ('svc', SVC())])
pipe = make_pipeline(StandardScaler(), PCA(n_components=2), SVC())
 
print(pipe.named_steps)
# {'standardscaler': StandardScaler(), 'pca': PCA(n_components=2), 'svc': SVC()}

Los nombres auto-generados son el nombre de la clase en minúsculas. Si usas el mismo transformer dos veces, make_pipeline añade un número:

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
 
pipe = make_pipeline(PolynomialFeatures(2), StandardScaler(), PolynomialFeatures(3))
print(list(pipe.named_steps.keys()))
# ['polynomialfeatures-1', 'standardscaler', 'polynomialfeatures-2']

Pipeline vs make_pipeline

FeaturePipelinemake_pipeline
Nombres de paso personalizadosSí -- tú los eligesNo -- auto-generados
Legibilidad en pipelines grandesMejor -- nombres descriptivosPeor -- nombres genéricos
Sintaxis para hyperparameter tuningstepname__param con tus nombresclassname__param con nombres automáticos
Brevedad del códigoMás verbosoMás conciso
Mejor paraPipelines de producción, tuningPrototipado rápido

Usa Pipeline cuando planeas ajustar hiperparámetros o cuando nombres claros mejoran la legibilidad. Usa make_pipeline para experimentos rápidos.

Pasos de preprocesamiento comunes

Estos son los transformers más utilizados en pipelines de sklearn:

Características numéricas

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
 
# Scale to zero mean, unit variance
numeric_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
TransformerQué haceCuándo usarlo
StandardScalerCentra en media=0, escala a std=1Opción por defecto para la mayoría de algoritmos
MinMaxScalerEscala al rango [0, 1]Redes neuronales, algoritmos sensibles a la magnitud
RobustScalerUsa mediana e IQR, robusto ante outliersDatos con outliers significativos
SimpleImputerRellena valores faltantes (mean, median, most_frequent, constant)Manejo de missing data
PolynomialFeaturesGenera features polinómicas e interaccionesAñadir no linealidad a modelos lineales
PowerTransformerAplica transformación Yeo-Johnson o Box-CoxDistribuciones sesgadas

Características categóricas

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
 
# One-hot encode categorical features
categorical_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
TransformerQué haceCuándo usarlo
OneHotEncoderCrea columnas binarias para cada categoríaCategorías nominales (sin orden)
OrdinalEncoderMapea categorías a enterosCategorías ordinales (low/medium/high)
TargetEncoderCodifica usando estadísticas de la variable objetivoFeatures de alta cardinalidad (scikit-learn 1.3+)

ColumnTransformer para tipos de datos mixtos

Los datasets reales tienen columnas numéricas y categóricas. ColumnTransformer aplica distintas transformaciones a diferentes subconjuntos de columnas en paralelo:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
 
# Sample data with mixed types
data = pd.DataFrame({
    'age': [25, 30, np.nan, 45, 50],
    'income': [40000, 55000, 60000, np.nan, 90000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['BS', 'MS', 'PhD', 'BS', 'MS'],
    'purchased': [0, 1, 1, 0, 1]
})
 
X = data.drop('purchased', axis=1)
y = data['purchased']
 
# Define column groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
 
# Build sub-pipelines for each column type
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# Combine with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)
 
# Full pipeline: preprocessing + model
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipeline.fit(X, y)
print(f"Pipeline fitted successfully")
print(f"Predictions: {pipeline.predict(X)}")

Obtener nombres de features después de la transformación

Después de ajustar un ColumnTransformer, puedes obtener los nombres de las features transformadas:

# After fitting the pipeline
pipeline.fit(X, y)
 
# Get feature names from the preprocessor step
feature_names = pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"Transformed features: {feature_names}")
# ['num__age', 'num__income', 'cat__city_Chicago', 'cat__city_LA',
#  'cat__city_NYC', 'cat__education_BS', 'cat__education_MS', 'cat__education_PhD']

Manejo de columnas restantes

Por defecto, ColumnTransformer elimina las columnas no especificadas en ningún transformer. Esto se controla con el parámetro remainder:

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='passthrough'  # Keep unspecified columns as-is
    # remainder='drop'       # Default: drop unspecified columns
)

Pipeline con GridSearchCV

Una de las características más potentes de los pipelines de sklearn es la integración fluida con hyperparameter tuning. Usa la sintaxis stepname__parameter para referenciar parámetros dentro de los pasos del pipeline:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.datasets import load_breast_cancer
 
# Load data
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42, stratify=cancer.target
)
 
# Build pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA()),
    ('svc', SVC())
])
 
# Define parameter grid
# Use stepname__param syntax to access nested parameters
param_grid = {
    'pca__n_components': [5, 10, 15, 20],
    'svc__C': [0.1, 1, 10, 100],
    'svc__kernel': ['rbf', 'linear'],
    'svc__gamma': ['scale', 'auto']
}
 
# Run grid search
grid = GridSearchCV(
    pipe,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid.fit(X_train, y_train)
 
print(f"Best parameters: {grid.best_params_}")
print(f"Best CV score:   {grid.best_score_:.4f}")
print(f"Test score:      {grid.score(X_test, y_test):.4f}")

Ajustar parámetros de ColumnTransformer

Para pipelines anidados con ColumnTransformer, encadena los nombres de los pasos con dobles guiones bajos:

# Accessing nested parameters:
# pipeline step 'preprocessor' -> transformer 'num' -> step 'imputer' -> parameter 'strategy'
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__cat__encoder__handle_unknown': ['ignore', 'infrequent_if_exist'],
    'classifier__C': [0.1, 1, 10]
}

Usar RandomizedSearchCV para espacios de búsqueda grandes

Cuando la cuadrícula de parámetros es muy grande, RandomizedSearchCV toma una cantidad fija de combinaciones al azar:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint
 
param_distributions = {
    'pca__n_components': randint(5, 25),
    'svc__C': uniform(0.1, 100),
    'svc__kernel': ['rbf', 'linear', 'poly'],
    'svc__gamma': uniform(0.001, 1)
}
 
random_search = RandomizedSearchCV(
    pipe,
    param_distributions,
    n_iter=50,       # Sample 50 combinations
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    random_state=42
)
 
random_search.fit(X_train, y_train)
print(f"Best parameters: {random_search.best_params_}")
print(f"Best CV score:   {random_search.best_score_:.4f}")

Custom Transformers

FunctionTransformer: pasos custom rápidos

Para transformaciones simples sin estado, usa FunctionTransformer:

from sklearn.preprocessing import FunctionTransformer
from sklearn.pipeline import Pipeline
import numpy as np
 
# Log transform (adding 1 to avoid log(0))
log_transformer = FunctionTransformer(
    func=np.log1p,
    inverse_func=np.expm1  # Optional inverse for inverse_transform
)
 
pipe = Pipeline([
    ('log', log_transformer),
    ('scaler', StandardScaler())
])
 
# Works with pipeline fit/transform
X_sample = np.array([[1, 10, 100], [2, 20, 200]])
X_transformed = pipe.fit_transform(X_sample)
print(f"Original:    {X_sample[0]}")
print(f"Transformed: {X_transformed[0]}")

Clase de custom transformer

Para transformaciones con estado (las que aprenden parámetros a partir de los datos), crea una clase que herede de BaseEstimator y TransformerMixin:

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
 
class OutlierClipper(BaseEstimator, TransformerMixin):
    """Clips values beyond a specified number of standard deviations."""
 
    def __init__(self, n_std=3):
        self.n_std = n_std
 
    def fit(self, X, y=None):
        # Learn the boundaries from training data
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        self.lower_ = self.mean_ - self.n_std * self.std_
        self.upper_ = self.mean_ + self.n_std * self.std_
        return self  # Always return self from fit
 
    def transform(self, X):
        # Apply learned boundaries to any data
        X_clipped = np.clip(X, self.lower_, self.upper_)
        return X_clipped

Úsalo en un pipeline como cualquier transformer integrado:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('clip', OutlierClipper(n_std=2)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])
 
pipe.fit(X_train, y_train)
print(f"Accuracy: {pipe.score(X_test, y_test):.4f}")
 
# The n_std parameter works with GridSearchCV
from sklearn.model_selection import GridSearchCV
 
grid = GridSearchCV(
    pipe,
    {'clip__n_std': [1.5, 2, 2.5, 3]},
    cv=5
)
grid.fit(X_train, y_train)
print(f"Best n_std: {grid.best_params_['clip__n_std']}")

Custom transformer para feature engineering

Un ejemplo más práctico: crear features de interacción a partir de columnas específicas:

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
 
class FeatureInteraction(BaseEstimator, TransformerMixin):
    """Creates multiplication interactions between specified column pairs."""
 
    def __init__(self, interaction_pairs=None):
        self.interaction_pairs = interaction_pairs
 
    def fit(self, X, y=None):
        # Store column names if DataFrame
        if isinstance(X, pd.DataFrame):
            self.feature_names_in_ = X.columns.tolist()
        else:
            self.feature_names_in_ = [f"x{i}" for i in range(X.shape[1])]
        return self
 
    def transform(self, X):
        X_df = pd.DataFrame(X, columns=self.feature_names_in_) if not isinstance(X, pd.DataFrame) else X.copy()
 
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                name = f"{col_a}_x_{col_b}"
                X_df[name] = X_df[col_a] * X_df[col_b]
 
        return X_df.values
 
    def get_feature_names_out(self, input_features=None):
        names = list(self.feature_names_in_)
        if self.interaction_pairs:
            for col_a, col_b in self.interaction_pairs:
                names.append(f"{col_a}_x_{col_b}")
        return np.array(names)

FeatureUnion: feature engineering en paralelo

Mientras Pipeline encadena pasos secuencialmente, FeatureUnion ejecuta transformers en paralelo y concatena sus outputs horizontalmente:

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
# Create parallel feature branches
feature_union = FeatureUnion([
    ('scaled', StandardScaler()),             # Original features, scaled
    ('pca', PCA(n_components=2)),             # 2 PCA components
    ('poly', PolynomialFeatures(degree=2, include_bias=False))  # Polynomial features
])
 
# Combine into a full pipeline
pipe = Pipeline([
    ('features', feature_union),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42))
])
 
pipe.fit(X_train, y_train)
 
# Check the total number of features
X_transformed = feature_union.fit_transform(X_train)
print(f"Original features:    {X_train.shape[1]}")
print(f"After FeatureUnion:   {X_transformed.shape[1]}")
print(f"Test accuracy:        {pipe.score(X_test, y_test):.4f}")

FeatureUnion vs ColumnTransformer

FeatureFeatureUnionColumnTransformer
InputTodas las columnas van a todos los transformersColumnas específicas a transformers específicos
OutputConcatena horizontalmenteConcatena horizontalmente
Caso de usoMúltiples representaciones del mismo feature setDistintos tipos de features necesitan distinto procesamiento
Selección de columnasNo puede seleccionar -- opera sobre todas las columnasEspecificación de columnas integrada
Alternativa modernaA menudo reemplazado por ColumnTransformerPreferido para la mayoría de casos de uso

En el scikit-learn moderno, ColumnTransformer cubre la mayoría de los casos donde antes se usaba FeatureUnion. FeatureUnion sigue siendo útil cuando quieres múltiples representaciones del mismo conjunto de features (por ejemplo, valores crudos + PCA + features polinómicas).

Guardar y cargar pipelines

Una de las mayores ventajas de los pipelines es la simplicidad en despliegue. En lugar de serializar cada transformer y el modelo por separado, guardas un solo objeto:

import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
 
# Train a pipeline
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
pipe.fit(X_train, y_train)
 
# Save the entire pipeline -- one file
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved")
 
# Load and predict -- no preprocessing code needed
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_test)
accuracy = loaded_pipe.score(X_test, y_test)
print(f"Loaded pipeline accuracy: {accuracy:.4f}")

Versionado de pipelines

Para producción, incluye metadata junto al pipeline:

import joblib
import datetime
import sklearn
 
artifact = {
    'pipeline': pipe,
    'training_date': datetime.datetime.now().isoformat(),
    'sklearn_version': sklearn.__version__,
    'feature_names': list(iris.feature_names),
    'target_names': list(iris.target_names),
    'training_accuracy': pipe.score(X_train, y_train),
    'test_accuracy': pipe.score(X_test, y_test),
    'n_training_samples': len(X_train)
}
 
joblib.dump(artifact, 'model_artifact_v1.joblib')
 
# Later, load and validate
loaded = joblib.load('model_artifact_v1.joblib')
print(f"Model trained on: {loaded['training_date']}")
print(f"Sklearn version:  {loaded['sklearn_version']}")
print(f"Test accuracy:    {loaded['test_accuracy']:.4f}")
 
# Use the pipeline
loaded_pipe = loaded['pipeline']
predictions = loaded_pipe.predict(X_test[:3])

Usar pickle (alternativa)

joblib es preferido para objetos de sklearn porque maneja eficientemente arrays grandes de NumPy. pickle estándar también funciona:

import pickle
 
# Save
with open('pipeline.pkl', 'wb') as f:
    pickle.dump(pipe, f)
 
# Load
with open('pipeline.pkl', 'rb') as f:
    loaded = pickle.load(f)

Ejemplo real: pipeline completo de clasificación

Aquí tienes un pipeline completo, listo para producción, para una tarea de clasificación con tipos de features mixtos. Usa un Random Forest classifier con ColumnTransformer y termina con un evaluation report completo. Este ejemplo usa el patrón de dataset estilo Titanic que la mayoría de los practicantes de ML conoce:

import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import (
    train_test_split, cross_val_score, GridSearchCV
)
from sklearn.metrics import classification_report, accuracy_score
 
# -- Create a realistic dataset with mixed types and missing values --
np.random.seed(42)
n = 1000
 
data = pd.DataFrame({
    'age': np.random.normal(35, 12, n),
    'income': np.random.lognormal(10.5, 0.8, n),
    'credit_score': np.random.normal(650, 80, n),
    'years_employed': np.random.exponential(5, n),
    'department': np.random.choice(['Engineering', 'Sales', 'Marketing', 'HR', 'Finance'], n),
    'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'], n),
    'city': np.random.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix', 'Dallas'], n),
    'promoted': np.random.binomial(1, 0.3, n)
})
 
# Introduce missing values (realistic pattern)
for col in ['age', 'income', 'credit_score']:
    mask = np.random.random(n) < 0.05
    data.loc[mask, col] = np.nan
 
for col in ['department', 'education']:
    mask = np.random.random(n) < 0.03
    data.loc[mask, col] = np.nan
 
print(f"Dataset shape: {data.shape}")
print(f"Missing values:\n{data.isnull().sum()}")
print(f"Target distribution:\n{data['promoted'].value_counts(normalize=True)}")
# -- Define features and target --
X = data.drop('promoted', axis=1)
y = data['promoted']
 
# Split the data (see /topics/Scikit-Learn/sklearn-train-test-split for details)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
 
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set:     {X_test.shape[0]} samples")
# -- Define column groups --
numeric_features = ['age', 'income', 'credit_score', 'years_employed']
categorical_features = ['department', 'education', 'city']
 
# -- Build preprocessing pipelines for each column type --
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])
 
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
 
# -- Combine column transformers --
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'  # Explicitly drop unlisted columns
)
 
# -- Full pipeline: preprocessing + classifier --
# (see /topics/Scikit-Learn/sklearn-random-forest for more on RandomForest)
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(
        n_estimators=200,
        max_depth=10,
        min_samples_leaf=5,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    ))
])
# -- Cross-validation first --
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5, scoring='accuracy')
print(f"Cross-validation scores: {cv_scores}")
print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
 
# -- Fit on full training set --
pipeline.fit(X_train, y_train)
 
# -- Evaluate on test set --
y_pred = pipeline.predict(X_test)
print(f"\nTest accuracy: {accuracy_score(y_test, y_pred):.4f}")
# For detailed evaluation, see /topics/Scikit-Learn/sklearn-confusion-matrix
print(f"\nClassification Report:\n{classification_report(y_test, y_pred)}")
# -- Hyperparameter tuning --
param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [5, 10, 15, None],
    'classifier__min_samples_leaf': [3, 5, 10]
}
 
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)
 
grid_search.fit(X_train, y_train)
 
print(f"\nBest parameters: {grid_search.best_params_}")
print(f"Best CV score:   {grid_search.best_score_:.4f}")
print(f"Test score:      {grid_search.score(X_test, y_test):.4f}")
# -- Inspect the best pipeline --
best_pipeline = grid_search.best_estimator_
 
# Get feature names after transformation
feature_names = best_pipeline.named_steps['preprocessor'].get_feature_names_out()
print(f"\nTransformed feature count: {len(feature_names)}")
 
# Get feature importances from the classifier
importances = best_pipeline.named_steps['classifier'].feature_importances_
feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': importances
}).sort_values('importance', ascending=False)
 
print(f"\nTop 10 features:")
print(feature_importance.head(10).to_string(index=False))
# -- Save the final pipeline --
import joblib
 
joblib.dump(best_pipeline, 'promotion_predictor.joblib')
print("Pipeline saved to promotion_predictor.joblib")
 
# -- Production usage --
loaded = joblib.load('promotion_predictor.joblib')
 
# Predict on new data -- same format as original DataFrame
new_employee = pd.DataFrame({
    'age': [28],
    'income': [65000],
    'credit_score': [720],
    'years_employed': [3.5],
    'department': ['Engineering'],
    'education': ['Master'],
    'city': ['NYC']
})
 
prediction = loaded.predict(new_employee)
probability = loaded.predict_proba(new_employee)
print(f"\nNew employee prediction: {'Promoted' if prediction[0] else 'Not promoted'}")
print(f"Probability: {probability[0][1]:.2%}")

Este ejemplo demuestra todos los patrones clave de pipeline: tipos de features mixtos, manejo de valores faltantes, cross-validation, ajuste de hiperparámetros, inspección de features y serialización para producción.

Pipeline con distintos tipos de modelo

El mismo pipeline de preprocesamiento puede servir para distintos modelos. Cambia solo el último paso:

from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
 
# Reuse the preprocessor from the previous example
models = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42)
}
 
results = {}
for name, model in models.items():
    pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='accuracy')
    results[name] = {
        'mean': scores.mean(),
        'std': scores.std()
    }
    print(f"{name:25s} | Accuracy: {scores.mean():.4f} +/- {scores.std():.4f}")

También puedes cambiar dinámicamente el estimator step:

# Replace the classifier in an existing pipeline
pipeline.set_params(classifier=GradientBoostingClassifier(n_estimators=200))
pipeline.fit(X_train, y_train)
print(f"Gradient Boosting test accuracy: {pipeline.score(X_test, y_test):.4f}")

Pipelines en regresión

Los pipelines funcionan igual para tareas de regresión. Para más detalles sobre linear regression con sklearn, consulta nuestra sklearn linear regression guide.

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
 
# Simulated housing data
np.random.seed(42)
n = 500
housing = pd.DataFrame({
    'sqft': np.random.normal(1500, 400, n),
    'bedrooms': np.random.choice([1, 2, 3, 4, 5], n),
    'age': np.random.uniform(0, 50, n),
    'neighborhood': np.random.choice(['downtown', 'suburbs', 'rural'], n),
    'condition': np.random.choice(['poor', 'fair', 'good', 'excellent'], n),
})
housing['price'] = (
    housing['sqft'] * 200
    + housing['bedrooms'] * 15000
    - housing['age'] * 1000
    + np.random.normal(0, 20000, n)
)
 
X = housing.drop('price', axis=1)
y = housing['price']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Regression pipeline
numeric_features = ['sqft', 'bedrooms', 'age']
categorical_features = ['neighborhood', 'condition']
 
preprocessor = ColumnTransformer([
    ('num', Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('poly', PolynomialFeatures(degree=2, include_bias=False)),
        ('scaler', StandardScaler())
    ]), numeric_features),
    ('cat', Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ]), categorical_features)
])
 
reg_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', Ridge(alpha=1.0))
])
 
cv_scores = cross_val_score(reg_pipeline, X_train, y_train, cv=5, scoring='r2')
print(f"Cross-validation R2: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
 
reg_pipeline.fit(X_train, y_train)
print(f"Test R2: {reg_pipeline.score(X_test, y_test):.4f}")

Saltar pasos con passthrough y None

Puedes omitir pasos condicionalmente configurándolos como 'passthrough' o None:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.datasets import load_breast_cancer
 
cancer = load_breast_cancer()
X_train, X_test, y_train, y_test = train_test_split(
    cancer.data, cancer.target, test_size=0.2, random_state=42
)
 
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('reduce_dim', PCA()),
    ('classifier', SVC())
])
 
# Grid search can toggle steps on/off
param_grid = [
    {
        'reduce_dim': [PCA(5), PCA(10), PCA(15)],
        'classifier__C': [1, 10]
    },
    {
        'reduce_dim': ['passthrough'],  # Skip PCA entirely
        'classifier__C': [1, 10]
    }
]
 
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)
grid.fit(X_train, y_train)
print(f"Best params: {grid.best_params_}")
print(f"Best score:  {grid.best_score_:.4f}")

Cachear pasos del Pipeline

Cuando ajustas hiperparámetros, los pasos intermedios pueden recalcularse de forma redundante. Habilita el caching para evitarlo:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from tempfile import mkdtemp
from shutil import rmtree
 
# Create a temporary cache directory
cachedir = mkdtemp()
 
pipe = Pipeline(
    [
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=10)),
        ('svc', SVC())
    ],
    memory=cachedir  # Cache intermediate transformations
)
 
# During GridSearchCV, the scaler and PCA results are cached
# Only recomputed when their parameters change
# This speeds up searches where only the final estimator params change
 
# Clean up when done
# rmtree(cachedir)

Explorar datos antes de construir pipelines

Antes de construir un pipeline, necesitas entender tus features: sus distribuciones, patrones de valores faltantes, correlaciones y posibles transformaciones. PyGWalker (opens in a new tab) te permite convertir cualquier Pandas DataFrame en una interfaz interactiva de exploración visual directamente en notebooks de Jupyter:

import pandas as pd
import pygwalker as pyg
 
# Explore your dataset interactively before building the pipeline
# Drag features to axes, create histograms, scatter plots, box plots
walker = pyg.walk(data)

Este tipo de exploración visual te ayuda a decidir qué features necesitan escalado, cuáles tienen outliers que deben recortarse y cuáles features categóricas tienen alta cardinalidad. Puedes identificar patrones de valores faltantes y entender distribuciones antes de escribir una sola línea de código del pipeline.

Para iterar sobre tu flujo completo de experimentación de pipeline -- probar distintas estrategias de preprocesamiento, comparar rendimiento de modelos y registrar resultados -- RunCell (opens in a new tab) ofrece un entorno de Jupyter potenciado por IA donde un agente ayuda con generación de código, debugging y gestión de experimentos.

Errores comunes y consejos de debugging

Error 1: olvidar usar el Pipeline para predecir

# WRONG: Preprocessing manually, predicting with just the model
X_test_scaled = scaler.transform(X_test)
predictions = pipeline.named_steps['classifier'].predict(X_test_scaled)
 
# CORRECT: Let the pipeline handle everything
predictions = pipeline.predict(X_test)

Error 2: hacer fit del preprocesamiento fuera del pipeline

# WRONG: This defeats the purpose of the pipeline
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
pipe = Pipeline([('classifier', LogisticRegression())])
pipe.fit(X_train_scaled, y_train)
# Now you must remember to manually scale at prediction time
 
# CORRECT: Include preprocessing in the pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])
pipe.fit(X_train, y_train)  # Pipeline handles the scaling

Error 3: nombres de parámetros incorrectos en GridSearchCV

La sintaxis stepname__param debe coincidir exactamente con los nombres de los pasos de tu pipeline:

pipe = Pipeline([
    ('my_scaler', StandardScaler()),
    ('my_clf', LogisticRegression())
])
 
# WRONG: Using the class name instead of the step name
# param_grid = {'StandardScaler__with_mean': [True, False]}  # KeyError
 
# CORRECT: Using the step name you defined
param_grid = {'my_scaler__with_mean': [True, False], 'my_clf__C': [0.1, 1, 10]}

Error 4: cambios en el orden de columnas

Cuando usas ColumnTransformer con Pandas DataFrames (que puedes cargar con pandas read_csv), el orden de columnas en la salida depende del orden de los transformers, no del DataFrame original:

# The output order is: numeric features first, then categorical
# This matters if you manually inspect transformed data
preprocessor = ColumnTransformer([
    ('num', numeric_transformer, numeric_features),     # These come first
    ('cat', categorical_transformer, categorical_features)  # These come second
])

Debugging de outputs intermedios

Para inspeccionar qué ocurre en cada paso:

# Method 1: Transform step by step
pipe.fit(X_train, y_train)
X_after_preprocessor = pipe.named_steps['preprocessor'].transform(X_test)
print(f"Shape after preprocessing: {X_after_preprocessor.shape}")
print(f"Sample values:\n{X_after_preprocessor[:2]}")
 
# Method 2: Slice the pipeline
preprocessing_pipe = pipe[:-1]  # Everything except the classifier
X_transformed = preprocessing_pipe.transform(X_test)
print(f"Transformed shape: {X_transformed.shape}")
 
# Method 3: Use set_config for verbose output
from sklearn import set_config
set_config(transform_output="pandas")  # Get DataFrames from transformers
# Now transform outputs include column names -- easier to debug

Debugging de desajustes de shapes

# Print shapes at each stage to find where things break
print(f"Input shape: {X_train.shape}")
 
for name, step in pipe.named_steps.items():
    if hasattr(step, 'transform'):
        # Check if the step has been fitted
        try:
            X_train = step.transform(X_train)
            print(f"After '{name}': {X_train.shape}")
        except Exception as e:
            print(f"Error at '{name}': {e}")
            break

Referencia de métodos de Pipeline

MethodDescription
fit(X, y)Ajusta todos los transformers y el estimator final
predict(X)Transforma X a través de todos los pasos y luego predice con el estimator final
predict_proba(X)Transforma y obtiene probabilidades (solo classifiers)
transform(X)Transforma X a través de todos los pasos (si el último paso es un transformer)
fit_transform(X, y)Ajusta y transforma en una sola llamada
fit_predict(X, y)Ajusta y predice en una sola llamada
score(X, y)Transforma y puntúa (accuracy para classifiers, R2 para regressors)
set_params(**params)Establece parámetros usando la sintaxis stepname__param
get_params()Obtiene todos los parámetros
named_stepsAcceso tipo diccionario a los pasos del pipeline
[i] o [name]Acceso al paso por índice o por nombre
[start:end]Slicing para crear un sub-pipeline

FAQ

¿Cuál es la diferencia entre Pipeline y make_pipeline?

Pipeline requiere que proporciones tuplas (name, estimator) para cada paso, dando control explícito sobre los nombres. make_pipeline acepta estimators directamente y genera nombres automáticamente a partir de las clases (en minúsculas). Usa Pipeline cuando necesites nombres descriptivos o planees ajustar hiperparámetros con GridSearchCV. Usa make_pipeline para prototipado rápido.

¿sklearn Pipeline evita data leakage?

Sí. Cuando llamas a pipeline.fit(X_train, y_train), cada transformer se ajusta solo con los datos de entrenamiento. Durante la cross-validation con cross_val_score o GridSearchCV, el pipeline refita todos los pasos en cada fold de entrenamiento, asegurando que ningún dato del fold de test se filtre al preprocesamiento. Esta es la principal ventaja frente al preprocesamiento manual.

¿Puedo usar Pipeline con modelos de deep learning?

Los pipelines de Scikit-learn funcionan con cualquier estimator que siga la API de sklearn (implemente fit, predict y opcionalmente transform). Librerías como scikeras proporcionan wrappers compatibles con sklearn para modelos Keras, permitiendo usarlos en pipelines. XGBoost y LightGBM también ofrecen interfaces compatibles con sklearn.

¿Cómo manejo los nombres de features después de ColumnTransformer?

Llama a pipeline.named_steps['preprocessor'].get_feature_names_out() después de ajustar. Esto devuelve un array de nombres de features con prefijos que indican qué transformer los produjo (por ejemplo, num__age, cat__city_NYC). Esto funciona en scikit-learn 1.0 y posteriores.

¿Puedo tener múltiples modelos en un solo Pipeline?

No. Un Pipeline es una secuencia lineal donde el último paso es el estimator. Si quieres comparar varios modelos, crea pipelines separados con los mismos pasos de preprocesamiento pero distintos estimators finales. Puedes automatizar esto recorriendo un diccionario de modelos y creando un pipeline para cada uno.

¿Cómo salto un paso en un Pipeline durante GridSearchCV?

Configura el paso como 'passthrough' en la cuadrícula de parámetros. Por ejemplo: {'reduce_dim': ['passthrough']} omitirá por completo el paso reduce_dim durante esa iteración de grid search. También puedes poner un paso en None, pero 'passthrough' es la opción recomendada.

¿Qué pasa si el Pipeline encuentra categorías no vistas en tiempo de predicción?

Si usas OneHotEncoder con handle_unknown='ignore' dentro de tu pipeline, las categorías no vistas se codifican como todo ceros. Sin esa configuración, el pipeline lanza un error. En pipelines de producción, siempre configura handle_unknown='ignore' cuando sea posible que aparezcan nuevos valores categóricos.

Conclusión

Sklearn Pipeline transforma código de ML desordenado y propenso a errores en workflows limpios y reproducibles. Al encadenar el preprocesamiento y el modelado en un solo objeto, eliminas data leakage, simplificas el despliegue y haces trivial el ajuste de hiperparámetros en todo el flujo.

Empieza por lo básico: envuelve tu scaler y tu modelo en un Pipeline. Pasa a ColumnTransformer cuando tengas tipos de features mixtos. Usa GridSearchCV para ajustar parámetros en todos los pasos del pipeline simultáneamente. Construye custom transformers cuando las opciones integradas no cubran tus necesidades de feature engineering.

La inversión en aprender pipelines se recupera de inmediato. Tus resultados de cross-validation se vuelven confiables porque el preprocesamiento se vuelve a ajustar por fold. Tu despliegue en producción se convierte en un único joblib.dump y joblib.load. Y tu codebase se vuelve mantenible porque toda la lógica de transformación y predicción vive en un solo objeto inspeccionable.

Guías relacionadas

📚