PySpark Joins and Broadcast: 每次都选对 Join 类型
Updated on
糟糕的 join 会让行被悄悄丢失、列被重复、或者在倾斜的 key 上发生数据爆炸。清晰地选择 join 类型并合理使用 broadcast,能保持结果正确、作业高效。
PySpark 提供多种 join 类型,以及用于控制 shuffle 行为的 broadcast 提示。本文说明在不同需求下应选择哪种 join,如何管理重名列,以及什么时候把小表广播到所有 executor。
想在不写代码的情况下,快速从 Python Pandas DataFrame 创建数据可视化?
PyGWalker 是一个用于可视化探索式数据分析的 Python 库。PyGWalker (opens in a new tab) 可以简化你在 Jupyter Notebook 中的数据分析与可视化流程,把 pandas / polars dataframe 变成类似 Tableau 的可视化交互界面。
Join 类型速查表
| Type | 保留的行 | 适用场景 |
|---|---|---|
inner | 两边都匹配的行 | 标准的数据丰富或基于关联的过滤 |
left / right | 保留所有左/右表行 + 匹配行 | 不论是否匹配都要保留一侧数据 |
full | 两表所有行,缺失部分用 null | 审计、完整性检查 |
left_semi | 仅返回左表中有匹配的行,不带右表列 | 存在性过滤(存在即通过) |
left_anti | 仅返回左表中没有匹配的行 | 排除型过滤(反 join) |
示例数据
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
users = spark.createDataFrame(
[(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
"user_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
"user_id INT, order_id STRING, amount DOUBLE",
)使用清晰条件的 inner / left join
inner_join = users.join(orders, on="user_id", how="inner")
left_join = users.join(orders, on="user_id", how="left")- 优先使用显式的
on条件,避免意外的笛卡尔积(cross join)。 - 留意行数变化:inner 会减少行数;left 会保留所有
users行。
处理重复列名
joined = (
users.join(orders, on="user_id", how="left")
.withColumnRenamed("name", "user_name")
)- join 之后,对重名列进行重命名,得到稳定且可读的字段名。
- 或者在 join 之前,用
select只保留需要参与 join 或输出的列。
用 semi / anti join 做过滤
with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")left_semi:只返回users的列,且这些用户在orders中至少有一条记录。left_anti:只返回没有任何订单记录的users;对排除型过滤更高效。
广播小表以避免倾斜
small_dim = spark.createDataFrame(
[("US", "United States"), ("CA", "Canada")],
"country STRING, country_name STRING",
)
joined_dim = users.join(broadcast(small_dim), "country", "left")- 当较小表能轻松放进 executor 内存(通常是几十 MB)时,可以广播它。
- Broadcast 可以让这一侧跳过 shuffle,显著加速 join,并减轻数据倾斜的影响。
什么时候不要 broadcast
- 不要广播很大或大小不确定的表,否则可能导致 executor 内存溢出。
- 如果两表大小相近且 key 分布比较均匀,正常的 shuffle join 已经足够,不一定需要 broadcast。
常见坑与解决思路
- 意外的 cross join:务必设置
on条件;Spark 对笛卡尔积会给出警告。 - 重复列名:在 join 前用
select精简列,或在 join 后重命名,避免下游逻辑混乱。 - Key 倾斜:对小维表采用 broadcast;对于事实表对事实表且极端倾斜的 join,可考虑“加盐”(salting)方案。
- 类型不匹配:在 join 前把 key 显式
cast成一致类型,避免因为类型不同导致完全匹配不上。
最简端到端示例模式
result = (
users
.join(broadcast(small_dim), "country", "left")
.join(orders, "user_id", "left")
.select(
"user_id",
F.col("name").alias("user_name"),
"country_name",
"order_id",
"amount",
)
)这种模式让 join key 明确可读,利用 broadcast 做维度表丰富,并输出清晰稳定的列名,方便后续聚合或导出。
