PySparkのJoinとBroadcast: 毎回正しいJoinを選ぶ
Updated on
良くないjoinは、行を消したり、カラムを重複させたり、skewしたキーが衝突したときにデータを爆発させたりします。明確なjoinの選択と賢いbroadcast利用によって、結果の正しさとジョブの高速性を両立できます。
PySparkは複数のjoinタイプと、shuffleの挙動を制御するためのbroadcastヒントを提供しています。このガイドでは、用途ごとに適切なjoinの選び方、重なり合うカラム名の扱い方、そして小さいテーブルを各executorへ配布(broadcast)すべきタイミングを説明します。
Python Pandas Dataframeからノーコードで素早くデータ可視化を作りたいですか?
PyGWalker は可視化付きのExploratory Data Analysis向けPythonライブラリです。PyGWalker (opens in a new tab) を使うと、pandas dataframe(およびpolars dataframe)を、Jupyter Notebook上でのデータ分析・可視化向けに、tableauのような対話的UIへ変換できます。
Joinタイプ早見表
| Type | 残す行 | 向いている用途 |
|---|---|---|
inner | 両方に存在する行 | 一般的な付加・フィルタ |
left / right | 左/右の全行 + マッチした行 | 一方のテーブルを必ず残したいとき |
full | すべての行、欠損側はnull | データ監査や網羅性チェック |
left_semi | 右側にマッチがある左の行のみ(右のカラムは返さない) | 存在チェックのフィルタ |
left_anti | 右側にマッチがない左の行のみ | 除外用のanti-join |
サンプルデータ
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
users = spark.createDataFrame(
[(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
"user_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
"user_id INT, order_id STRING, amount DOUBLE",
)Inner / Left joinと明示的な条件
inner_join = users.join(orders, on="user_id", how="inner")
left_join = users.join(orders, on="user_id", how="left")- 意図しないcross joinを避けるため、
onを明示することを推奨します。 - 行数を確認しましょう: innerは行数を減らし、leftはすべての
users行を保持します。
重複カラム名の扱い
joined = (
users.join(orders, on="user_id", how="left")
.withColumnRenamed("name", "user_name")
)- join後は、重複するカラムを安定した名前にrenameしておきます。
- あるいは、join前に必要なカラムだけをselectする方法もあります。
フィルタ用途のSemi / Anti join
with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")left_semiは、注文が存在する場合のusersの行だけを返し、右テーブルのカラムは返しません。left_antiは、注文を持たないusersの行を返します。除外条件の実装として効率的です。
Skew回避のための小さいテーブルのBroadcast
small_dim = spark.createDataFrame(
[("US", "United States"), ("CA", "Canada")],
"country STRING, country_name STRING",
)
joined_dim = users.join(broadcast(small_dim), "country", "left")- 小さい側のテーブルがexecutorメモリに十分収まる(数十MB程度まで)場合にbroadcastを使います。
- broadcastすると、そのテーブル側のshuffleを省略でき、joinを高速化しつつskewの影響を減らせます。
Broadcastすべきでないケース
- 大きい、あるいはサイズが読めないテーブルのbroadcastは避けてください。executorのメモリ不足を招きます。
- キー分布がほぼ一様で、テーブルサイズも同程度であれば、通常のshuffle joinで問題ありません。
ありがちな落とし穴と対処
- 意図しないcross join:
onを必ず指定します。cartesian productの場合はSparkが警告します。 - カラムの二重定義: join前にselectするか、join後にrenameして、後続処理で混乱しないようにします。
- Skewしたキー: 小さいlookupテーブルはbroadcastし、事実テーブル同士で極端なskewがある場合はsaltingも検討します。
- 型の不一致: joinキーは事前に同じ型へcastし、マッチしない(空になる)joinを防ぎます。
最小構成のエンドツーエンドパターン
result = (
users
.join(broadcast(small_dim), "country", "left")
.join(orders, "user_id", "left")
.select(
"user_id",
F.col("name").alias("user_name"),
"country_name",
"order_id",
"amount",
)
)このパターンでは、joinキーを明示し、次元テーブルの付加にbroadcastを使い、集計やエクスポートにそのまま使える、整理されたカラム名で結果を返しています。
