PySpark Joins and Broadcast: Pick the Right Join Every Time
Updated on
Bad joins hide rows, duplicate columns, or explode data when skewed keys collide. Clear join choices and smart broadcast use keep results correct and jobs fast.
PySpark offers multiple join types plus broadcast hints to control shuffle behavior. This guide shows the right join per need, how to manage overlapping column names, and when to push a small table to every executor.
Want to quickly create Data Visualization from Python Pandas Dataframe with No code?
PyGWalker is a Python library for Exploratory Data Analysis with Visualization. PyGWalker (opens in a new tab) can simplify your Jupyter Notebook data analysis and data visualization workflow, by turning your pandas dataframe (and polars dataframe) into a tableau-alternative User Interface for visual exploration.
Join type quick guide
| Type | Keeps | Best for |
|---|---|---|
inner | Matches in both | Standard enrich or filter |
left / right | All left/right + matches | Preserve one side regardless of match |
full | All rows, nulls for missing | Audits and completeness checks |
left_semi | Left rows with a match, no columns from right | Existence filtering |
left_anti | Left rows with no match | Anti-join for exclusions |
Sample data
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
users = spark.createDataFrame(
[(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
"user_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
"user_id INT, order_id STRING, amount DOUBLE",
)Inner and left joins with clear conditions
inner_join = users.join(orders, on="user_id", how="inner")
left_join = users.join(orders, on="user_id", how="left")- Prefer explicit
onto avoid accidental cross joins. - Review row counts: inner reduces rows; left preserves all
users.
Handling duplicate column names
joined = (
users.join(orders, on="user_id", how="left")
.withColumnRenamed("name", "user_name")
)- After join, rename overlapping columns to stable names.
- Alternatively, select only needed columns before joining.
Semi and anti joins for filters
with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")left_semireturns onlyuserscolumns when an order exists.left_antireturnsuserswith no orders; efficient for exclusions.
Broadcast small tables to avoid skew
small_dim = spark.createDataFrame(
[("US", "United States"), ("CA", "Canada")],
"country STRING, country_name STRING",
)
joined_dim = users.join(broadcast(small_dim), "country", "left")- Broadcast when the smaller side comfortably fits in executor memory (tens of MBs).
- Broadcasting skips shuffle for that side, speeding joins and reducing skew impact.
When not to broadcast
- Avoid broadcasting large or unbounded tables; risk out-of-memory on executors.
- Do not broadcast if keys are uniformly distributed and table sizes are similar; regular shuffle join is fine.
Common pitfalls and fixes
- Unintended cross join: ensure
onis set; Spark will warn on cartesian products. - Duplicate columns: rename or select pre-join to avoid confusing downstream consumers.
- Skewed keys: broadcast small lookup tables; consider salting for extremely skewed fact-to-fact joins.
- Type mismatches: cast keys to matching types before joining to prevent empty matches.
Minimal end-to-end pattern
result = (
users
.join(broadcast(small_dim), "country", "left")
.join(orders, "user_id", "left")
.select(
"user_id",
F.col("name").alias("user_name"),
"country_name",
"order_id",
"amount",
)
)This pattern keeps join keys explicit, handles dimension enrichment via broadcast, and returns clean column names ready for aggregation or export.
