UDF PySpark vs UDF Pandas vs mapInPandas : lequel devriez-vous utiliser ?
Quand vous avez besoin de logique personnalisée dans PySpark, vous allez généralement utiliser l’un de ces trois outils :
- UDF Python classique (
udf(...)) - UDF Pandas (
@pandas_udf) mapInPandas(DataFrame → itérateur de DataFrames Pandas)
Ils peuvent tous « exécuter du Python sur Spark », mais ils se comportent très différemment en termes de performances, de flexibilité et du niveau d’optimisation Spark que vous conservez.
Ce guide vous propose un cadre de décision pratique, ainsi que des exemples prêts à copier.
Modèle mental : qu’est-ce qui change entre les trois ?
1) UDF classique (Python ligne par ligne)
- Spark envoie les colonnes à des processus workers Python.
- Votre fonction s’exécute une ligne à la fois.
- Souvent la plus lente.
- Peut bloquer l’optimiseur et la génération de code de Spark.
À utiliser quand : la logique est simple, les données sont petites, ou la vitesse n’a pas d’importance.
2) UDF Pandas (lots vectorisés via Arrow)
- Spark envoie les données à Python sous forme de lots colonnaires via Apache Arrow.
- Votre fonction s’exécute sur des Series / DataFrames Pandas (vectorisé).
- Généralement beaucoup plus rapide qu’une UDF classique.
À utiliser quand : vous avez besoin d’une logique personnalisée sur des colonnes et voulez de meilleures performances.
3) mapInPandas (contrôle complet par lot de partition)
- Spark appelle votre fonction une fois par fragment de partition, en fournissant un itérateur de DataFrames Pandas.
- Vous pouvez faire de la logique multi-colonnes, des transformations complexes, voire une expansion de lignes.
- Très utile pour des étapes de « mini ETL » en Python tout en restant parallélisé par Spark.
À utiliser quand : vous avez besoin de transformations complexes qui ne rentrent pas dans le modèle « une colonne en entrée → une colonne en sortie ».
Tableau de décision rapide
| Vous avez besoin de… | Meilleur choix |
|---|---|
| Transformation personnalisée simple, faible volume | UDF classique |
| Transformation par colonne, données moyennes/grandes | UDF Pandas |
| Logique complexe : plusieurs colonnes, plusieurs lignes en sortie, jointures dans pandas, libs Python lourdes | mapInPandas |
| Performances maximales si possible | Fonctions Spark SQL intégrées (éviter les trois) |
Règle : fonctions Spark intégrées > UDF Pandas > mapInPandas > UDF classique (en général, pas absolu).
Jeu de données d’exemple
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7)],
["name", "country", "visits"]
)Exemple d’UDF classique (simple mais lent)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def clean_name(x):
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()Avantages
- Le plus simple à comprendre
- Fonctionne partout
Inconvénients
- Surcoût Python ligne par ligne
- Empêche souvent Spark de faire des optimisations agressives
Exemple d’UDF Pandas (vectorisé, généralement plus rapide)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()Avantages
- Traitement par lots + vectorisation
- Débit bien meilleur sur de grandes colonnes
Inconvénients
- Nécessite le support Arrow et un environnement compatible
- Reste côté Python, et reste moins optimisable que les fonctions intégrées
Exemple mapInPandas (le plus flexible)
Cas d’usage : produire plusieurs colonnes dérivées + règles personnalisées
Peut-être que vous voulez :
- nom nettoyé
- score basé sur le pays et le nombre de visites
- catégories (buckets)
import pandas as pd
def transform(pdf_iter):
for pdf in pdf_iter:
pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0) # US -> 2.0x, else 1.0x
pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
yield pdf
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()Avantages
- Extrêmement flexible
- Idéal pour des « pipelines pandas par partition »
- Peut étendre les lignes, calculer plusieurs sorties, appeler des bibliothèques externes (avec prudence)
Inconvénients
- Vous devez définir le schéma correctement
- Plus de risques de créer du skew / des partitions trop volumineuses
- Toujours un surcoût Python/Arrow
Et côté performances ?
Vous n’avez pas besoin de benchmarks parfaits pour faire le bon choix. Utilisez plutôt ces heuristiques pratiques :
L’UDF classique est généralement la pire option quand :
- dizaines de millions de lignes
- transformations simples que Spark pourrait faire nativement
- utilisée dans des filtres/jointures
L’UDF Pandas est excellente quand :
- vous transformez une ou plusieurs colonnes avec des opérations vectorisées
- vous pouvez écrire la logique efficacement avec pandas/numpy
mapInPandas est le meilleur choix quand :
- vous avez besoin de transformations en plusieurs étapes qui seraient pénibles en Spark SQL
- vous voulez créer plusieurs colonnes d’un coup
- vous avez besoin d’une expansion de lignes ou d’une logique conditionnelle complexe
Pièges de justesse (correctness) et de schéma
UDF Pandas
- La sortie doit correspondre exactement au type déclaré.
- Des nulls/NaN peuvent apparaître ; gérez-les explicitement.
mapInPandas
- Le DataFrame de sortie doit correspondre au schéma : noms de colonnes + dtypes + ordre.
- Attention au dtype Python
object; convertissez explicitement en string/float.
La liste « à éviter » (anti-patterns courants)
- Utiliser une UDF classique pour des opérations de base sur les strings (
lower,trim, regex) → utilisez les fonctions Spark intégrées. - Appeler des APIs réseau dans des UDFs → ce sera lent, instable, et difficile à rejouer en toute sécurité.
- Boucles Python lourdes ligne par ligne dans UDF Pandas/mapInPandas → ça détruit les bénéfices de la vectorisation.
- Renvoyer des types incohérents (parfois int, parfois string) → erreurs à l’exécution / nulls.
Flux de décision recommandé
-
Les fonctions Spark intégrées peuvent-elles le faire ? → Utilisez les fonctions intégrées.
-
Est-ce une transformation « par colonne » (même nombre de lignes en entrée/sortie) ? → Utilisez une UDF Pandas.
-
Avez-vous besoin de logique multi-colonnes, de plusieurs colonnes en sortie, ou d’une expansion de lignes ? → Utilisez
mapInPandas. -
Petites données / prototype rapide ? → Une UDF classique est acceptable.
Un dernier conseil : validez avec les plans Spark
Même sans benchmark complet, vous pouvez apprendre beaucoup avec :
df_pandas_udf.explain(True)Si vous voyez que Spark optimise moins après l’ajout de votre fonction, c’est un indice pour essayer des fonctions intégrées ou restructurer votre logique.