Skip to content

PySpark Joins und Broadcast: Jedes Mal den richtigen Join wählen

Updated on

Fehlerhafte Joins verstecken Zeilen, duplizieren Spalten oder lassen Daten explodieren, wenn schiefe (skewed) Schlüssel kollidieren. Klare Join-Entscheidungen und sinnvoller Broadcast-Einsatz halten Ergebnisse korrekt und Jobs schnell.

PySpark bietet mehrere Join-Typen plus Broadcast-Hints, um Shuffle-Verhalten zu steuern. Diese Anleitung zeigt den passenden Join je Anwendungsfall, wie man überlappende Spaltennamen behandelt und wann man eine kleine Tabelle auf alle Executor verteilt.

Möchtest du schnell Data Visualization aus einem Python Pandas Dataframe ohne Code erstellen?

PyGWalker ist eine Python-Bibliothek für Exploratory Data Analysis mit Visualization. PyGWalker (opens in a new tab) kann deinen Jupyter Notebook Workflow für Data Analysis und Data Visualization vereinfachen, indem es deinen pandas dataframe (und polars dataframe) in ein tableau-alternatives User Interface für visuelle Exploration verwandelt.

PyGWalker for Data visualization (opens in a new tab)

Schneller Überblick über Join-Typen

TypeBehältAm besten geeignet für
innerÜbereinstimmungen in beidenStandard‑Enrichment oder Filter
left / rightAlle left/right + ÜbereinstimmungenEine Seite unabhängig vom Match erhalten
fullAlle Zeilen, Nulls bei fehlendenAudits und Vollständigkeitsprüfungen
left_semiLeft‑Zeilen mit Match, keine Spalten von rightExistenz‑Filterung
left_antiLeft‑Zeilen ohne MatchAnti‑Join für Ausschlüsse

Beispieldaten

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
 
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
 
users = spark.createDataFrame(
    [(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
    "user_id INT, name STRING, country STRING",
)
 
orders = spark.createDataFrame(
    [(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
    "user_id INT, order_id STRING, amount DOUBLE",
)

Inner- und Left-Joins mit klaren Bedingungen

inner_join = users.join(orders, on="user_id", how="inner")
 
left_join = users.join(orders, on="user_id", how="left")
  • Bevorzuge ein explizites on, um versehentliche Cross Joins zu vermeiden.
  • Zeilenanzahl prüfen: Inner Join reduziert Zeilen; Left Join behält alle users.

Umgang mit doppelten Spaltennamen

joined = (
    users.join(orders, on="user_id", how="left")
    .withColumnRenamed("name", "user_name")
)
  • Nach dem Join überlappende Spalten auf stabile Namen umbenennen.
  • Alternativ vor dem Join nur benötigte Spalten auswählen.

Semi- und Anti-Joins als Filter

with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")
  • left_semi liefert nur Spalten aus users, wenn eine Order existiert.
  • left_anti liefert users ohne Orders; effizient für Ausschlusslogik.

Kleine Tabellen per Broadcast verteilen, um Skew zu vermeiden

small_dim = spark.createDataFrame(
    [("US", "United States"), ("CA", "Canada")],
    "country STRING, country_name STRING",
)
 
joined_dim = users.join(broadcast(small_dim), "country", "left")
  • Broadcast nutzen, wenn die kleinere Seite bequem in den Executor‑Speicher passt (typisch einige dutzend MB).
  • Broadcasting überspringt den Shuffle für diese Seite, beschleunigt Joins und reduziert die Auswirkungen von Skew.

Wann man nicht broadcasten sollte

  • Kein Broadcast für große oder unbeschränkt wachsende Tabellen; Gefahr von Out‑of‑Memory auf Executoren.
  • Nicht broadcasten, wenn Schlüssel gleichmäßig verteilt sind und Tabellengrößen ähnlich sind; ein normaler Shuffle Join ist dann ausreichend.

Häufige Fallstricke und Gegenmaßnahmen

  • Unbeabsichtigter Cross Join: sicherstellen, dass on gesetzt ist; Spark warnt bei kartesischen Produkten.
  • Doppelte Spalten: vor oder nach dem Join umbenennen oder selektieren, um nachgelagerte Verbraucher nicht zu verwirren.
  • Skewed Keys: kleine Lookup-Tabellen broadcasten; bei extrem schiefen Fact‑zu‑Fact‑Joins zusätzlich „Salting“ erwägen.
  • Typinkonsistenzen: Join‑Keys vor dem Join auf übereinstimmende Typen casten, um leere Matches zu vermeiden.

Minimaler End‑to‑End‑Pattern

result = (
    users
    .join(broadcast(small_dim), "country", "left")
    .join(orders, "user_id", "left")
    .select(
        "user_id",
        F.col("name").alias("user_name"),
        "country_name",
        "order_id",
        "amount",
    )
)

Dieses Pattern hält Join-Keys explizit, erledigt Dimension‑Enrichment per Broadcast und liefert saubere Spaltennamen, bereit für Aggregation oder Export.