Tutorial de PySpark UDF (Apto para principiantes): qué es, por qué y cómo usar pyspark udf
PySpark es excelente para procesar big data rápidamente—hasta que escribes lógica que Spark no entiende. Ahí es donde entra una UDF (User Defined Function): te permite ejecutar código Python personalizado sobre DataFrames de Spark.
La pega: las UDF también pueden hacer que Spark sea más lento si se usan sin cuidado. Este tutorial muestra cuándo usar UDF, cómo escribirlas correctamente y cómo elegir alternativas más rápidas (como funciones integradas o Pandas UDFs).
El problema que resuelven las UDF
Tienes una columna de un DataFrame y quieres aplicar lógica personalizada:
- parsing específico del dominio (IDs raros, reglas personalizadas)
- funciones de scoring personalizadas
- normalización de texto que no está cubierta por funciones integradas de Spark SQL
- mapeo de valores usando una regla compleja
Las funciones integradas de Spark están optimizadas en la JVM. Pero la lógica Python pura no es automáticamente “nativa de Spark”.
Las opciones de solución (más rápido → más lento)
En la práctica, prioriza este orden:
- Funciones integradas de Spark (
pyspark.sql.functions) ✅ más rápido - Expresiones SQL (
expr,when,regexp_extract) ✅ rápido - Pandas UDFs (vectorizadas con Apache Arrow) ✅ a menudo rápido
- UDFs regulares de Python (Python fila por fila) ⚠️ a menudo lo más lento
Este post se centra en (3) y (4), con una guía clara sobre cuándo evitarlas.
Setup: un DataFrame pequeño para practicar
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7),
],
["name", "country", "visits"]
)
df.show()Ejemplo 1: Una UDF simple de Python (limpieza de strings)
Objetivo
Recortar espacios, pasar a minúsculas y manejar nulos de forma segura.
from pyspark.sql.functions import udf
def clean_name(x: str) -> str:
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()Notas
- Debes proporcionar un tipo de retorno (
StringType()). - Una UDF de Python se ejecuta fila por fila en workers de Python (puede ser lenta en datos grandes).
Ejemplo 2: Prefiere funciones integradas cuando sea posible
La UDF anterior puede reemplazarse por funciones integradas (más rápido):
df_fast = df.withColumn(
"name_clean",
F.lower(F.trim(F.col("name")))
)
df_fast.show()Regla práctica: si puedes expresarlo con F.*, haz eso.
Ejemplo 3: UDF con múltiples entradas (scoring personalizado)
Creemos un score basado en country + visits.
def score(country: str, visits: int) -> float:
if country is None or visits is None:
return 0.0
bonus = 1.5 if country == "US" else 1.0
return float(visits) * bonus
score_udf = udf(score, DoubleType())
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()Ejemplo 4: Registrar una UDF para Spark SQL
Si quieres usarla en SQL:
spark.udf.register("score_udf_sql", score, DoubleType())
df.createOrReplaceTempView("t")
spark.sql("""
SELECT name, country, visits, score_udf_sql(country, visits) AS score
FROM t
""").show()Errores comunes (y cómo evitarlos)
1) Devolver el tipo incorrecto
Si Spark espera DoubleType() y a veces devuelves un string, obtendrás errores en tiempo de ejecución o nulls.
2) Usar UDFs en joins/filters innecesariamente
Las UDF pueden bloquear optimizaciones de Spark. Intenta calcular primero una columna derivada, o usa funciones integradas.
3) Olvidar el manejo de nulos
Asume siempre que las entradas pueden ser None.
Alternativa más rápida: Pandas UDF (vectorizada)
Una Pandas UDF procesa lotes de datos de una vez (vectorizada), a menudo mucho más rápido que una UDF regular.
Ejemplo 5: Pandas UDF para limpieza de nombres
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()Cuándo ayudan las Pandas UDFs
- la lógica es más sencilla en pandas
- necesitas operaciones vectorizadas
- el rendimiento importa y las funciones integradas no son suficientes
Ejemplo 6: Pandas UDF para scoring elemento a elemento
@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
bonus = country.eq("US").astype("float64") * 0.5 + 1.0
visits_filled = visits.fillna(0).astype("float64")
return visits_filled * bonus
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()Checklist de buenas prácticas (práctico)
- ✅ Prueba primero funciones integradas (
F.lower,F.when,F.regexp_extract, etc.) - ✅ Si necesitas lógica personalizada, considera Pandas UDF antes que una UDF regular
- ✅ Especifica siempre los tipos de retorno correctos
- ✅ Maneja los nulos (
None) explícitamente - ✅ Mantén la lógica de la UDF pura y determinista (misma entrada → misma salida)
- ⚠️ Evita UDFs en filters/joins cuando sea posible (pueden reducir optimizaciones)
- ⚠️ Mide: compara el runtime con/sin UDF en un dataset de muestra
Cuándo sí deberías usar una UDF regular de Python
Usa una UDF regular cuando:
- el tamaño de datos no es enorme (o el rendimiento no es crítico)
- la lógica es difícil de expresar con funciones integradas
- pandas/Arrow no está disponible o no encaja con la transformación
Si el rendimiento importa, recurre primero a funciones integradas o Pandas UDFs.