Skip to content

PySpark Joins and Broadcast: Choisir le bon type de jointure à chaque fois

Updated on

De mauvaises jointures masquent des lignes, dupliquent des colonnes ou font exploser le volume de données lorsque des clés déséquilibrées (skewed) se rencontrent. Des choix de jointures clairs et un usage maîtrisé du broadcast garantissent des résultats corrects et des jobs rapides.

PySpark propose plusieurs types de jointures ainsi que des hints de broadcast pour contrôler le comportement de shuffle. Ce guide montre quel type de jointure utiliser selon le besoin, comment gérer les noms de colonnes qui se chevauchent, et quand pousser une petite table vers chaque exécuteur.

Vous voulez créer rapidement des visualisations de données à partir d’un DataFrame Pandas en Python, sans code ?

PyGWalker est une bibliothèque Python pour l’Exploratory Data Analysis avec visualisation. PyGWalker (opens in a new tab) peut simplifier votre flux de travail d’analyse et de visualisation de données dans Jupyter Notebook, en transformant votre pandas dataframe (et polars dataframe) en une interface utilisateur de type Tableau pour l’exploration visuelle.

PyGWalker for Data visualization (opens in a new tab)

Guide rapide des types de jointures

TypeConserveÀ utiliser pour
innerLes correspondances des deux côtésEnrichissement standard ou filtrage
left / rightTout le côté gauche/droit + correspondancesPréserver un côté, qu’il y ait correspondance ou non
fullToutes les lignes, null pour les manquantesAudits et contrôles de complétude
left_semiLignes de gauche avec correspondance, aucune colonne de droiteFiltrage d’existence
left_antiLignes de gauche sans correspondanceAnti-join pour les exclusions

Données d’exemple

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
 
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
 
users = spark.createDataFrame(
    [(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
    "user_id INT, name STRING, country STRING",
)
 
orders = spark.createDataFrame(
    [(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
    "user_id INT, order_id STRING, amount DOUBLE",
)

Jointures inner et left avec conditions explicites

inner_join = users.join(orders, on="user_id", how="inner")
 
left_join = users.join(orders, on="user_id", how="left")
  • Préférez un on explicite pour éviter les cross joins accidentelles.
  • Surveillez les comptes de lignes : une inner jointure réduit le nombre de lignes ; une left jointure préserve tous les users.

Gestion des noms de colonnes dupliqués

joined = (
    users.join(orders, on="user_id", how="left")
    .withColumnRenamed("name", "user_name")
)
  • Après la jointure, renommez les colonnes qui se chevauchent vers des noms stables.
  • Autre approche : sélectionner uniquement les colonnes nécessaires avant la jointure.

Jointures semi et anti pour le filtrage

with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")
  • left_semi renvoie uniquement les colonnes de users lorsqu’une commande existe.
  • left_anti renvoie les users sans commandes ; efficace pour les exclusions.

Diffuser (broadcast) les petites tables pour éviter le skew

small_dim = spark.createDataFrame(
    [("US", "United States"), ("CA", "Canada")],
    "country STRING, country_name STRING",
)
 
joined_dim = users.join(broadcast(small_dim), "country", "left")
  • Utilisez le broadcast lorsque le côté le plus petit tient confortablement en mémoire sur les exécuteurs (quelques dizaines de Mo).
  • Le broadcast supprime le shuffle pour ce côté, accélère la jointure et atténue l’impact du skew.

Quand ne pas utiliser le broadcast

  • Évitez de diffuser de grandes tables ou de taille non bornée : risque de manque de mémoire sur les exécuteurs.
  • N’utilisez pas le broadcast si les clés sont uniformément réparties et que les tailles des tables sont similaires ; une jointure avec shuffle classique convient.

Pièges fréquents et correctifs

  • Cross join involontaire : vérifiez que on est bien défini ; Spark avertit en cas de produit cartésien.
  • Colonnes dupliquées : renommez ou sélectionnez avant la jointure pour ne pas perturber les consommateurs en aval.
  • Clés déséquilibrées (skewed) : diffusez les petites tables de lookup ; envisagez le salting pour des jointures fact-to-fact fortement déséquilibrées.
  • Types incompatibles : caste(z) les clés vers des types identiques avant la jointure pour éviter les correspondances vides.

Schéma minimal de bout en bout

result = (
    users
    .join(broadcast(small_dim), "country", "left")
    .join(orders, "user_id", "left")
    .select(
        "user_id",
        F.col("name").alias("user_name"),
        "country_name",
        "order_id",
        "amount",
    )
)

Ce schéma garde des clés de jointure explicites, gère l’enrichissement par dimension via broadcast, et renvoie des noms de colonnes propres, prêts pour l’agrégation ou l’export.