Skip to content

Tutorial de PySpark UDF (Apto para principiantes): qué es, por qué y cómo usar pyspark udf

PySpark es excelente para procesar big data rápidamente—hasta que escribes lógica que Spark no entiende. Ahí es donde entra una UDF (User Defined Function): te permite ejecutar código Python personalizado sobre DataFrames de Spark.

La pega: las UDF también pueden hacer que Spark sea más lento si se usan sin cuidado. Este tutorial muestra cuándo usar UDF, cómo escribirlas correctamente y cómo elegir alternativas más rápidas (como funciones integradas o Pandas UDFs).


El problema que resuelven las UDF

Tienes una columna de un DataFrame y quieres aplicar lógica personalizada:

  • parsing específico del dominio (IDs raros, reglas personalizadas)
  • funciones de scoring personalizadas
  • normalización de texto que no está cubierta por funciones integradas de Spark SQL
  • mapeo de valores usando una regla compleja

Las funciones integradas de Spark están optimizadas en la JVM. Pero la lógica Python pura no es automáticamente “nativa de Spark”.


Las opciones de solución (más rápido → más lento)

En la práctica, prioriza este orden:

  1. Funciones integradas de Spark (pyspark.sql.functions) ✅ más rápido
  2. Expresiones SQL (expr, when, regexp_extract) ✅ rápido
  3. Pandas UDFs (vectorizadas con Apache Arrow) ✅ a menudo rápido
  4. UDFs regulares de Python (Python fila por fila) ⚠️ a menudo lo más lento

Este post se centra en (3) y (4), con una guía clara sobre cuándo evitarlas.


Setup: un DataFrame pequeño para practicar

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [
        (" Alice  ", "US", 10),
        ("bob", "UK", 3),
        (None, "US", 7),
    ],
    ["name", "country", "visits"]
)
 
df.show()

Ejemplo 1: Una UDF simple de Python (limpieza de strings)

Objetivo

Recortar espacios, pasar a minúsculas y manejar nulos de forma segura.

from pyspark.sql.functions import udf
 
def clean_name(x: str) -> str:
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()

Notas

  • Debes proporcionar un tipo de retorno (StringType()).
  • Una UDF de Python se ejecuta fila por fila en workers de Python (puede ser lenta en datos grandes).

Ejemplo 2: Prefiere funciones integradas cuando sea posible

La UDF anterior puede reemplazarse por funciones integradas (más rápido):

df_fast = df.withColumn(
    "name_clean",
    F.lower(F.trim(F.col("name")))
)
df_fast.show()

Regla práctica: si puedes expresarlo con F.*, haz eso.


Ejemplo 3: UDF con múltiples entradas (scoring personalizado)

Creemos un score basado en country + visits.

def score(country: str, visits: int) -> float:
    if country is None or visits is None:
        return 0.0
    bonus = 1.5 if country == "US" else 1.0
    return float(visits) * bonus
 
score_udf = udf(score, DoubleType())
 
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()

Ejemplo 4: Registrar una UDF para Spark SQL

Si quieres usarla en SQL:

spark.udf.register("score_udf_sql", score, DoubleType())
 
df.createOrReplaceTempView("t")
spark.sql("""
  SELECT name, country, visits, score_udf_sql(country, visits) AS score
  FROM t
""").show()

Errores comunes (y cómo evitarlos)

1) Devolver el tipo incorrecto

Si Spark espera DoubleType() y a veces devuelves un string, obtendrás errores en tiempo de ejecución o nulls.

2) Usar UDFs en joins/filters innecesariamente

Las UDF pueden bloquear optimizaciones de Spark. Intenta calcular primero una columna derivada, o usa funciones integradas.

3) Olvidar el manejo de nulos

Asume siempre que las entradas pueden ser None.


Alternativa más rápida: Pandas UDF (vectorizada)

Una Pandas UDF procesa lotes de datos de una vez (vectorizada), a menudo mucho más rápido que una UDF regular.

Ejemplo 5: Pandas UDF para limpieza de nombres

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()

Cuándo ayudan las Pandas UDFs

  • la lógica es más sencilla en pandas
  • necesitas operaciones vectorizadas
  • el rendimiento importa y las funciones integradas no son suficientes

Ejemplo 6: Pandas UDF para scoring elemento a elemento

@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
    bonus = country.eq("US").astype("float64") * 0.5 + 1.0
    visits_filled = visits.fillna(0).astype("float64")
    return visits_filled * bonus
 
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()

Checklist de buenas prácticas (práctico)

  • ✅ Prueba primero funciones integradas (F.lower, F.when, F.regexp_extract, etc.)
  • ✅ Si necesitas lógica personalizada, considera Pandas UDF antes que una UDF regular
  • ✅ Especifica siempre los tipos de retorno correctos
  • ✅ Maneja los nulos (None) explícitamente
  • ✅ Mantén la lógica de la UDF pura y determinista (misma entrada → misma salida)
  • ⚠️ Evita UDFs en filters/joins cuando sea posible (pueden reducir optimizaciones)
  • ⚠️ Mide: compara el runtime con/sin UDF en un dataset de muestra

Cuándo deberías usar una UDF regular de Python

Usa una UDF regular cuando:

  • el tamaño de datos no es enorme (o el rendimiento no es crítico)
  • la lógica es difícil de expresar con funciones integradas
  • pandas/Arrow no está disponible o no encaja con la transformación

Si el rendimiento importa, recurre primero a funciones integradas o Pandas UDFs.