PySpark Joins und Broadcast: Jedes Mal den richtigen Join wählen
Updated on
Fehlerhafte Joins verstecken Zeilen, duplizieren Spalten oder lassen Daten explodieren, wenn schiefe (skewed) Schlüssel kollidieren. Klare Join-Entscheidungen und sinnvoller Broadcast-Einsatz halten Ergebnisse korrekt und Jobs schnell.
PySpark bietet mehrere Join-Typen plus Broadcast-Hints, um Shuffle-Verhalten zu steuern. Diese Anleitung zeigt den passenden Join je Anwendungsfall, wie man überlappende Spaltennamen behandelt und wann man eine kleine Tabelle auf alle Executor verteilt.
Möchtest du schnell Data Visualization aus einem Python Pandas Dataframe ohne Code erstellen?
PyGWalker ist eine Python-Bibliothek für Exploratory Data Analysis mit Visualization. PyGWalker (opens in a new tab) kann deinen Jupyter Notebook Workflow für Data Analysis und Data Visualization vereinfachen, indem es deinen pandas dataframe (und polars dataframe) in ein tableau-alternatives User Interface für visuelle Exploration verwandelt.
Schneller Überblick über Join-Typen
| Type | Behält | Am besten geeignet für |
|---|---|---|
inner | Übereinstimmungen in beiden | Standard‑Enrichment oder Filter |
left / right | Alle left/right + Übereinstimmungen | Eine Seite unabhängig vom Match erhalten |
full | Alle Zeilen, Nulls bei fehlenden | Audits und Vollständigkeitsprüfungen |
left_semi | Left‑Zeilen mit Match, keine Spalten von right | Existenz‑Filterung |
left_anti | Left‑Zeilen ohne Match | Anti‑Join für Ausschlüsse |
Beispieldaten
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
users = spark.createDataFrame(
[(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
"user_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
"user_id INT, order_id STRING, amount DOUBLE",
)Inner- und Left-Joins mit klaren Bedingungen
inner_join = users.join(orders, on="user_id", how="inner")
left_join = users.join(orders, on="user_id", how="left")- Bevorzuge ein explizites
on, um versehentliche Cross Joins zu vermeiden. - Zeilenanzahl prüfen: Inner Join reduziert Zeilen; Left Join behält alle
users.
Umgang mit doppelten Spaltennamen
joined = (
users.join(orders, on="user_id", how="left")
.withColumnRenamed("name", "user_name")
)- Nach dem Join überlappende Spalten auf stabile Namen umbenennen.
- Alternativ vor dem Join nur benötigte Spalten auswählen.
Semi- und Anti-Joins als Filter
with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")left_semiliefert nur Spalten aususers, wenn eine Order existiert.left_antiliefertusersohne Orders; effizient für Ausschlusslogik.
Kleine Tabellen per Broadcast verteilen, um Skew zu vermeiden
small_dim = spark.createDataFrame(
[("US", "United States"), ("CA", "Canada")],
"country STRING, country_name STRING",
)
joined_dim = users.join(broadcast(small_dim), "country", "left")- Broadcast nutzen, wenn die kleinere Seite bequem in den Executor‑Speicher passt (typisch einige dutzend MB).
- Broadcasting überspringt den Shuffle für diese Seite, beschleunigt Joins und reduziert die Auswirkungen von Skew.
Wann man nicht broadcasten sollte
- Kein Broadcast für große oder unbeschränkt wachsende Tabellen; Gefahr von Out‑of‑Memory auf Executoren.
- Nicht broadcasten, wenn Schlüssel gleichmäßig verteilt sind und Tabellengrößen ähnlich sind; ein normaler Shuffle Join ist dann ausreichend.
Häufige Fallstricke und Gegenmaßnahmen
- Unbeabsichtigter Cross Join: sicherstellen, dass
ongesetzt ist; Spark warnt bei kartesischen Produkten. - Doppelte Spalten: vor oder nach dem Join umbenennen oder selektieren, um nachgelagerte Verbraucher nicht zu verwirren.
- Skewed Keys: kleine Lookup-Tabellen broadcasten; bei extrem schiefen Fact‑zu‑Fact‑Joins zusätzlich „Salting“ erwägen.
- Typinkonsistenzen: Join‑Keys vor dem Join auf übereinstimmende Typen casten, um leere Matches zu vermeiden.
Minimaler End‑to‑End‑Pattern
result = (
users
.join(broadcast(small_dim), "country", "left")
.join(orders, "user_id", "left")
.select(
"user_id",
F.col("name").alias("user_name"),
"country_name",
"order_id",
"amount",
)
)Dieses Pattern hält Join-Keys explizit, erledigt Dimension‑Enrichment per Broadcast und liefert saubere Spaltennamen, bereit für Aggregation oder Export.
