UDF de PySpark vs UDF de Pandas vs mapInPandas: ¿Cuál deberías usar?
Cuando necesitas lógica personalizada en PySpark, normalmente recurrirás a una de estas tres herramientas:
- UDF normal de Python (
udf(...)) - UDF de Pandas (
@pandas_udf) mapInPandas(DataFrame → iterador de DataFrames de Pandas)
Las tres pueden “ejecutar Python sobre Spark”, pero se comportan de forma muy diferente en rendimiento, flexibilidad y cuánto de la optimización de Spark conservas.
Esta guía te da un marco de decisión práctico, además de ejemplos que puedes copiar.
Modelo mental: ¿qué cambia entre las tres?
1) UDF normal (Python fila por fila)
- Spark envía columnas a procesos worker de Python.
- Tu función se ejecuta una fila a la vez.
- A menudo es la más lenta.
- Puede bloquear el optimizador de Spark y la generación de código.
Úsala cuando: la lógica es simple, los datos son pocos o la velocidad no importa.
2) UDF de Pandas (lotes vectorizados vía Arrow)
- Spark envía datos a Python en lotes columnares usando Apache Arrow.
- Tu función opera sobre Series / DataFrames de Pandas (vectorizado).
- Normalmente es mucho más rápida que una UDF normal.
Úsala cuando: necesitas lógica personalizada a nivel de columna y quieres mejor rendimiento.
3) mapInPandas (control total por lote de partición)
- Spark llama a tu función una vez por chunk de partición, dándote un iterador de DataFrames de Pandas.
- Puedes hacer lógica multi-columna, transformaciones complejas e incluso expansión de filas.
- Es excelente para pasos tipo “mini ETL” en Python manteniendo la paralelización de Spark.
Úsala cuando: necesitas transformaciones complejas que no encajan en la forma “una columna entra → una columna sale”.
Tabla rápida de decisión
| Necesitas… | Mejor opción |
|---|---|
| Transformación personalizada simple, bajo volumen | UDF normal |
| Transformación por columna, datos medianos/grandes | UDF de Pandas |
| Lógica compleja: múltiples columnas, múltiples filas de salida, joins dentro de pandas, libs pesadas de Python | mapInPandas |
| Máximo rendimiento si es posible | Funciones integradas de Spark SQL (evita las tres) |
Regla: Funciones integradas de Spark > UDF de Pandas > mapInPandas > UDF normal (típico, no absoluto).
Dataset de ejemplo
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7)],
["name", "country", "visits"]
)Ejemplo de UDF normal (simple pero lenta)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def clean_name(x):
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()Pros
- La más fácil de entender
- Funciona en todas partes
Contras
- Sobrecoste de Python fila por fila
- A menudo impide que Spark haga optimizaciones agresivas
Ejemplo de UDF de Pandas (vectorizada, normalmente más rápida)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()Pros
- Procesamiento por lotes + vectorización
- Mucho mejor rendimiento/throughput en columnas grandes
Contras
- Requiere soporte de Arrow y un entorno compatible
- Sigue siendo del lado de Python; aún es menos optimizable que las funciones integradas
Ejemplo de mapInPandas (el más flexible)
Caso de uso: salida de múltiples columnas derivadas + reglas personalizadas
Quizá quieres:
- nombre limpiado
- puntuación basada en país y visitas
- etiquetas por rangos (buckets)
import pandas as pd
def transform(pdf_iter):
for pdf in pdf_iter:
pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0) # US -> 2.0x, else 1.0x
pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
yield pdf
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()Pros
- Extremadamente flexible
- Ideal para “pipelines de pandas por partición”
- Puede expandir filas, calcular múltiples salidas, llamar a librerías externas (con cuidado)
Contras
- Debes definir el schema correctamente
- Más posibilidades de crear sesgo (skew) / particiones grandes accidentalmente
- Sigue existiendo el overhead de Python/Arrow
¿Y el rendimiento?
No necesitas benchmarks perfectos para elegir bien. Usa estas heurísticas prácticas:
La UDF normal suele ser la peor cuando:
- decenas de millones de filas
- transformaciones simples que Spark podría hacer de forma nativa
- se usa dentro de filtros/joins
La UDF de Pandas destaca cuando:
- estás transformando una o más columnas con operaciones vectorizadas
- puedes escribir la lógica eficientemente con pandas/numpy
mapInPandas es mejor cuando:
- necesitas transformaciones de varios pasos que serían dolorosas en Spark SQL
- quieres crear múltiples columnas a la vez
- necesitas expansión de filas o lógica condicional compleja
Correctitud y “gotchas” de schema
UDF de Pandas
- La salida debe coincidir exactamente con el tipo declarado.
- Pueden aparecer nulls/NaNs; manéjalos explícitamente.
mapInPandas
- El DataFrame de salida debe coincidir con el schema: nombres de columnas + dtypes + orden.
- Ten cuidado con el dtype
objectde Python; convierte a string/float explícitamente.
Lista de “evita esto” (anti-patrones comunes)
- Usar UDF normal para operaciones básicas de strings (
lower,trim, regex) → usa funciones integradas de Spark. - Llamar APIs de red dentro de UDFs → será lento, inestable y difícil de reintentar de forma segura.
- Bucles pesados por fila en Python dentro de UDF de Pandas/mapInPandas → destruye los beneficios de la vectorización.
- Devolver tipos inconsistentes (a veces int, a veces string) → fallos en runtime / nulls.
Flujo de decisión recomendado
-
¿Las funciones integradas de Spark pueden hacerlo? → Usa las integradas.
-
¿Es una transformación por columna (mismo número de filas de entrada/salida)? → Usa UDF de Pandas.
-
¿Necesitas lógica multi-columna, múltiples columnas de salida o expansión de filas? → Usa
mapInPandas. -
¿Son pocos datos / un prototipo rápido? → La UDF normal es aceptable.
Un tip final: valida con los planes de Spark
Incluso sin benchmarking completo, puedes aprender mucho con:
df_pandas_udf.explain(True)Si ves que Spark hace menos optimización después de añadir tu función, es una pista para probar funciones integradas o reestructurar.