PySpark Joins and Broadcast: Choisir le bon type de jointure à chaque fois
Updated on
De mauvaises jointures masquent des lignes, dupliquent des colonnes ou font exploser le volume de données lorsque des clés déséquilibrées (skewed) se rencontrent. Des choix de jointures clairs et un usage maîtrisé du broadcast garantissent des résultats corrects et des jobs rapides.
PySpark propose plusieurs types de jointures ainsi que des hints de broadcast pour contrôler le comportement de shuffle. Ce guide montre quel type de jointure utiliser selon le besoin, comment gérer les noms de colonnes qui se chevauchent, et quand pousser une petite table vers chaque exécuteur.
Vous voulez créer rapidement des visualisations de données à partir d’un DataFrame Pandas en Python, sans code ?
PyGWalker est une bibliothèque Python pour l’Exploratory Data Analysis avec visualisation. PyGWalker (opens in a new tab) peut simplifier votre flux de travail d’analyse et de visualisation de données dans Jupyter Notebook, en transformant votre pandas dataframe (et polars dataframe) en une interface utilisateur de type Tableau pour l’exploration visuelle.
Guide rapide des types de jointures
| Type | Conserve | À utiliser pour |
|---|---|---|
inner | Les correspondances des deux côtés | Enrichissement standard ou filtrage |
left / right | Tout le côté gauche/droit + correspondances | Préserver un côté, qu’il y ait correspondance ou non |
full | Toutes les lignes, null pour les manquantes | Audits et contrôles de complétude |
left_semi | Lignes de gauche avec correspondance, aucune colonne de droite | Filtrage d’existence |
left_anti | Lignes de gauche sans correspondance | Anti-join pour les exclusions |
Données d’exemple
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
users = spark.createDataFrame(
[(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
"user_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
"user_id INT, order_id STRING, amount DOUBLE",
)Jointures inner et left avec conditions explicites
inner_join = users.join(orders, on="user_id", how="inner")
left_join = users.join(orders, on="user_id", how="left")- Préférez un
onexplicite pour éviter les cross joins accidentelles. - Surveillez les comptes de lignes : une inner jointure réduit le nombre de lignes ; une left jointure préserve tous les
users.
Gestion des noms de colonnes dupliqués
joined = (
users.join(orders, on="user_id", how="left")
.withColumnRenamed("name", "user_name")
)- Après la jointure, renommez les colonnes qui se chevauchent vers des noms stables.
- Autre approche : sélectionner uniquement les colonnes nécessaires avant la jointure.
Jointures semi et anti pour le filtrage
with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")left_semirenvoie uniquement les colonnes deuserslorsqu’une commande existe.left_antirenvoie lesuserssans commandes ; efficace pour les exclusions.
Diffuser (broadcast) les petites tables pour éviter le skew
small_dim = spark.createDataFrame(
[("US", "United States"), ("CA", "Canada")],
"country STRING, country_name STRING",
)
joined_dim = users.join(broadcast(small_dim), "country", "left")- Utilisez le broadcast lorsque le côté le plus petit tient confortablement en mémoire sur les exécuteurs (quelques dizaines de Mo).
- Le broadcast supprime le shuffle pour ce côté, accélère la jointure et atténue l’impact du skew.
Quand ne pas utiliser le broadcast
- Évitez de diffuser de grandes tables ou de taille non bornée : risque de manque de mémoire sur les exécuteurs.
- N’utilisez pas le broadcast si les clés sont uniformément réparties et que les tailles des tables sont similaires ; une jointure avec shuffle classique convient.
Pièges fréquents et correctifs
- Cross join involontaire : vérifiez que
onest bien défini ; Spark avertit en cas de produit cartésien. - Colonnes dupliquées : renommez ou sélectionnez avant la jointure pour ne pas perturber les consommateurs en aval.
- Clés déséquilibrées (skewed) : diffusez les petites tables de lookup ; envisagez le salting pour des jointures fact-to-fact fortement déséquilibrées.
- Types incompatibles : caste(z) les clés vers des types identiques avant la jointure pour éviter les correspondances vides.
Schéma minimal de bout en bout
result = (
users
.join(broadcast(small_dim), "country", "left")
.join(orders, "user_id", "left")
.select(
"user_id",
F.col("name").alias("user_name"),
"country_name",
"order_id",
"amount",
)
)Ce schéma garde des clés de jointure explicites, gère l’enrichissement par dimension via broadcast, et renvoie des noms de colonnes propres, prêts pour l’agrégation ou l’export.
