Tutoriel PySpark UDF (adapté aux débutants) : quoi, pourquoi et comment utiliser pyspark udf
PySpark est excellent pour traiter de la big data rapidement — jusqu’au moment où vous écrivez une logique que Spark ne comprend pas. C’est là qu’intervient une UDF (User Defined Function) : elle vous permet d’exécuter du code Python personnalisé sur des DataFrames Spark.
Le piège : les UDF peuvent aussi rendre Spark plus lent si elles sont utilisées sans précaution. Ce tutoriel explique quand utiliser des UDF, comment les écrire correctement, et comment choisir des alternatives plus rapides (comme les fonctions intégrées ou les Pandas UDF).
Le problème que les UDF résolvent
Vous avez une colonne de DataFrame et vous voulez appliquer une logique personnalisée :
- parsing spécifique au domaine (IDs bizarres, règles personnalisées)
- fonctions de scoring personnalisées
- normalisation de texte non couverte par les fonctions Spark SQL intégrées
- mapping de valeurs via une règle complexe
Les fonctions intégrées de Spark sont optimisées dans la JVM. Mais une logique en Python pur n’est pas automatiquement “native Spark”.
Les options de solution (de la plus rapide à la plus lente)
En pratique, privilégiez cet ordre :
- Fonctions Spark intégrées (
pyspark.sql.functions) ✅ les plus rapides - Expressions SQL (
expr,when,regexp_extract) ✅ rapides - Pandas UDFs (vectorisées avec Apache Arrow) ✅ souvent rapides
- UDFs Python classiques (Python ligne par ligne) ⚠️ souvent les plus lentes
Cet article se concentre sur (3) et (4), avec des recommandations fortes sur les cas où il vaut mieux les éviter.
Setup : un petit DataFrame pour s’exercer
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7),
],
["name", "country", "visits"]
)
df.show()Exemple 1 : une UDF Python simple (nettoyage de chaîne)
Objectif
Supprimer les espaces, passer en minuscules, et gérer les nulls en toute sécurité.
from pyspark.sql.functions import udf
def clean_name(x: str) -> str:
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()Notes
- Vous devez fournir un type de retour (
StringType()). - Une UDF Python s’exécute ligne par ligne dans des workers Python (peut être lente sur de gros volumes).
Exemple 2 : préférez les fonctions intégrées quand c’est possible
La UDF précédente peut être remplacée par des fonctions intégrées (plus rapide) :
df_fast = df.withColumn(
"name_clean",
F.lower(F.trim(F.col("name")))
)
df_fast.show()Règle générale : si vous pouvez l’exprimer avec F.*, faites-le.
Exemple 3 : UDF avec plusieurs entrées (scoring personnalisé)
Créons un score basé sur country + visits.
def score(country: str, visits: int) -> float:
if country is None or visits is None:
return 0.0
bonus = 1.5 if country == "US" else 1.0
return float(visits) * bonus
score_udf = udf(score, DoubleType())
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()Exemple 4 : enregistrer une UDF pour Spark SQL
Si vous voulez l’utiliser en SQL :
spark.udf.register("score_udf_sql", score, DoubleType())
df.createOrReplaceTempView("t")
spark.sql("""
SELECT name, country, visits, score_udf_sql(country, visits) AS score
FROM t
""").show()Erreurs courantes (et comment les éviter)
1) Renvoyer le mauvais type
Si Spark attend DoubleType() et que vous renvoyez parfois une string, vous aurez des erreurs à l’exécution ou des nulls.
2) Utiliser des UDF dans des joins/filters sans nécessité
Les UDF peuvent bloquer les optimisations de Spark. Essayez plutôt de calculer d’abord une colonne dérivée, ou d’utiliser des fonctions intégrées.
3) Oublier la gestion des valeurs nulles
Supposez toujours que les entrées peuvent être None.
Alternative plus rapide : Pandas UDF (vectorisée)
Une Pandas UDF traite des lots (batches) de données d’un coup (vectorisée), souvent beaucoup plus rapidement qu’une UDF classique.
Exemple 5 : Pandas UDF pour nettoyer les noms
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()Quand les Pandas UDFs aident
- la logique est plus simple en pandas
- vous avez besoin d’opérations vectorisées
- la performance compte et les fonctions intégrées ne suffisent pas
Exemple 6 : Pandas UDF pour un scoring élément par élément
@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
bonus = country.eq("US").astype("float64") * 0.5 + 1.0
visits_filled = visits.fillna(0).astype("float64")
return visits_filled * bonus
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()Checklist de bonnes pratiques (pratique)
- ✅ Essayez d’abord les fonctions intégrées (
F.lower,F.when,F.regexp_extract, etc.) - ✅ Si une logique personnalisée est nécessaire, envisagez une Pandas UDF avant une UDF classique
- ✅ Spécifiez toujours les bons types de retour
- ✅ Gérez explicitement les nulls (
None) - ✅ Gardez une logique d’UDF pure et déterministe (même entrée → même sortie)
- ⚠️ Évitez les UDF dans les filters/joins quand c’est possible (elles peuvent réduire les optimisations)
- ⚠️ Mesurez : comparez le temps d’exécution avec/sans UDF sur un dataset d’échantillon
Quand vous devriez utiliser une UDF Python classique
Utilisez une UDF classique quand :
- le volume de données n’est pas énorme (ou la performance n’est pas critique)
- la logique est difficile à exprimer avec des fonctions intégrées
- pandas/Arrow n’est pas disponible ou ne convient pas à la transformation
Si la performance compte, privilégiez d’abord les fonctions intégrées ou les Pandas UDFs.