Skip to content

Tutorial de PySpark UDF (para iniciantes): O que é, por que usar e como usar pyspark udf

PySpark é ótimo para processar big data rapidamente — até você escrever uma lógica que o Spark não entende. É aí que entra uma UDF (User Defined Function): ela permite executar código Python personalizado em DataFrames do Spark.

O porém: UDFs também podem deixar o Spark mais lento se usadas sem cuidado. Este tutorial mostra quando usar UDFs, como escrevê-las corretamente e como escolher alternativas mais rápidas (como funções embutidas ou Pandas UDFs).


O problema que as UDFs resolvem

Você tem uma coluna de um DataFrame e quer aplicar uma lógica personalizada:

  • parsing específico do domínio (IDs estranhos, regras personalizadas)
  • funções de pontuação personalizadas
  • normalização de texto não coberta pelas funções embutidas do Spark SQL
  • mapeamento de valores usando uma regra complexa

As funções embutidas do Spark são otimizadas na JVM. Mas lógica puramente em Python não é automaticamente “nativa do Spark”.


As opções de solução (mais rápida → mais lenta)

Na prática, prefira esta ordem:

  1. Funções embutidas do Spark (pyspark.sql.functions) ✅ mais rápido
  2. Expressões SQL (expr, when, regexp_extract) ✅ rápido
  3. Pandas UDFs (vetorizadas com Apache Arrow) ✅ muitas vezes rápido
  4. UDFs Python regulares (Python linha a linha) ⚠️ geralmente a mais lenta

Este post foca em (3) e (4), com orientações fortes sobre quando evitá-las.


Setup: um pequeno DataFrame para praticar

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [
        (" Alice  ", "US", 10),
        ("bob", "UK", 3),
        (None, "US", 7),
    ],
    ["name", "country", "visits"]
)
 
df.show()

Exemplo 1: Uma UDF Python simples (limpeza de string)

Objetivo

Remover espaços, transformar em minúsculas e lidar com nulls com segurança.

from pyspark.sql.functions import udf
 
def clean_name(x: str) -> str:
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()

Observações

  • Você precisa fornecer um tipo de retorno (StringType()).
  • A UDF Python roda linha a linha em workers Python (pode ser lenta em big data).

Exemplo 2: Prefira funções embutidas quando possível

A UDF anterior pode ser substituída por funções embutidas (mais rápido):

df_fast = df.withColumn(
    "name_clean",
    F.lower(F.trim(F.col("name")))
)
df_fast.show()

Regra geral: se você consegue expressar com F.*, faça isso.


Exemplo 3: UDF com múltiplas entradas (pontuação personalizada)

Vamos criar uma pontuação baseada em country + visits.

def score(country: str, visits: int) -> float:
    if country is None or visits is None:
        return 0.0
    bonus = 1.5 if country == "US" else 1.0
    return float(visits) * bonus
 
score_udf = udf(score, DoubleType())
 
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()

Exemplo 4: Registrar uma UDF para Spark SQL

Se você quiser usá-la em SQL:

spark.udf.register("score_udf_sql", score, DoubleType())
 
df.createOrReplaceTempView("t")
spark.sql("""
  SELECT name, country, visits, score_udf_sql(country, visits) AS score
  FROM t
""").show()

Erros comuns (e como evitá-los)

1) Retornar o tipo errado

Se o Spark espera DoubleType() e você retorna uma string às vezes, você terá erros em runtime ou nulls.

2) Usar UDFs em joins/filters desnecessariamente

UDFs podem bloquear otimizações do Spark. Tente calcular uma coluna derivada primeiro, ou use funções embutidas.

3) Esquecer o tratamento de null

Sempre assuma que as entradas podem ser None.


Alternativa mais rápida: Pandas UDF (vetorizada)

Uma Pandas UDF processa lotes de dados de uma vez (vetorizada), muitas vezes bem mais rápido do que uma UDF regular.

Exemplo 5: Pandas UDF para limpeza de nome

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()

Quando Pandas UDFs ajudam

  • a lógica é mais fácil em pandas
  • você precisa de operações vetorizadas
  • performance importa e as funções embutidas não são suficientes

Exemplo 6: Pandas UDF para pontuação elemento a elemento

@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
    bonus = country.eq("US").astype("float64") * 0.5 + 1.0
    visits_filled = visits.fillna(0).astype("float64")
    return visits_filled * bonus
 
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()

Checklist de boas práticas (na prática)

  • ✅ Tente funções embutidas primeiro (F.lower, F.when, F.regexp_extract, etc.)
  • ✅ Se precisar de lógica personalizada, considere Pandas UDF antes de UDF regular
  • ✅ Sempre especifique os tipos de retorno corretos
  • ✅ Trate nulls (None) explicitamente
  • ✅ Mantenha a lógica da UDF pura e determinística (mesma entrada → mesma saída)
  • ⚠️ Evite UDFs em filters/joins quando possível (podem reduzir otimizações)
  • ⚠️ Meça: compare o tempo de execução com/sem UDF em um dataset de amostra

Quando você deve usar uma UDF Python regular

Use uma UDF regular quando:

  • o tamanho dos dados não é enorme (ou performance não é crítica)
  • a lógica é difícil de expressar com funções embutidas
  • pandas/Arrow não está disponível ou não se encaixa na transformação

Se performance importa, prefira funções embutidas ou Pandas UDFs primeiro.