Skip to content

PySpark Joins and Broadcast: Pick the Right Join Every Time

Updated on

Bad joins hide rows, duplicate columns, or explode data when skewed keys collide. Clear join choices and smart broadcast use keep results correct and jobs fast.

PySpark offers multiple join types plus broadcast hints to control shuffle behavior. This guide shows the right join per need, how to manage overlapping column names, and when to push a small table to every executor.

Want to quickly create Data Visualization from Python Pandas Dataframe with No code?

PyGWalker is a Python library for Exploratory Data Analysis with Visualization. PyGWalker (opens in a new tab) can simplify your Jupyter Notebook data analysis and data visualization workflow, by turning your pandas dataframe (and polars dataframe) into a tableau-alternative User Interface for visual exploration.

PyGWalker for Data visualization (opens in a new tab)

Join type quick guide

TypeKeepsBest for
innerMatches in bothStandard enrich or filter
left / rightAll left/right + matchesPreserve one side regardless of match
fullAll rows, nulls for missingAudits and completeness checks
left_semiLeft rows with a match, no columns from rightExistence filtering
left_antiLeft rows with no matchAnti-join for exclusions

Sample data

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
 
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
 
users = spark.createDataFrame(
    [(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
    "user_id INT, name STRING, country STRING",
)
 
orders = spark.createDataFrame(
    [(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
    "user_id INT, order_id STRING, amount DOUBLE",
)

Inner and left joins with clear conditions

inner_join = users.join(orders, on="user_id", how="inner")
 
left_join = users.join(orders, on="user_id", how="left")
  • Prefer explicit on to avoid accidental cross joins.
  • Review row counts: inner reduces rows; left preserves all users.

Handling duplicate column names

joined = (
    users.join(orders, on="user_id", how="left")
    .withColumnRenamed("name", "user_name")
)
  • After join, rename overlapping columns to stable names.
  • Alternatively, select only needed columns before joining.

Semi and anti joins for filters

with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")
  • left_semi returns only users columns when an order exists.
  • left_anti returns users with no orders; efficient for exclusions.

Broadcast small tables to avoid skew

small_dim = spark.createDataFrame(
    [("US", "United States"), ("CA", "Canada")],
    "country STRING, country_name STRING",
)
 
joined_dim = users.join(broadcast(small_dim), "country", "left")
  • Broadcast when the smaller side comfortably fits in executor memory (tens of MBs).
  • Broadcasting skips shuffle for that side, speeding joins and reducing skew impact.

When not to broadcast

  • Avoid broadcasting large or unbounded tables; risk out-of-memory on executors.
  • Do not broadcast if keys are uniformly distributed and table sizes are similar; regular shuffle join is fine.

Common pitfalls and fixes

  • Unintended cross join: ensure on is set; Spark will warn on cartesian products.
  • Duplicate columns: rename or select pre-join to avoid confusing downstream consumers.
  • Skewed keys: broadcast small lookup tables; consider salting for extremely skewed fact-to-fact joins.
  • Type mismatches: cast keys to matching types before joining to prevent empty matches.

Minimal end-to-end pattern

result = (
    users
    .join(broadcast(small_dim), "country", "left")
    .join(orders, "user_id", "left")
    .select(
        "user_id",
        F.col("name").alias("user_name"),
        "country_name",
        "order_id",
        "amount",
    )
)

This pattern keeps join keys explicit, handles dimension enrichment via broadcast, and returns clean column names ready for aggregation or export.