Skip to content

PySparkのJoinとBroadcast: 毎回正しいJoinを選ぶ

Updated on

良くないjoinは、行を消したり、カラムを重複させたり、skewしたキーが衝突したときにデータを爆発させたりします。明確なjoinの選択と賢いbroadcast利用によって、結果の正しさとジョブの高速性を両立できます。

PySparkは複数のjoinタイプと、shuffleの挙動を制御するためのbroadcastヒントを提供しています。このガイドでは、用途ごとに適切なjoinの選び方、重なり合うカラム名の扱い方、そして小さいテーブルを各executorへ配布(broadcast)すべきタイミングを説明します。

Python Pandas Dataframeからノーコードで素早くデータ可視化を作りたいですか?

PyGWalker は可視化付きのExploratory Data Analysis向けPythonライブラリです。PyGWalker (opens in a new tab) を使うと、pandas dataframe(およびpolars dataframe)を、Jupyter Notebook上でのデータ分析・可視化向けに、tableauのような対話的UIへ変換できます。

PyGWalker for Data visualization (opens in a new tab)

Joinタイプ早見表

Type残す行向いている用途
inner両方に存在する行一般的な付加・フィルタ
left / right左/右の全行 + マッチした行一方のテーブルを必ず残したいとき
fullすべての行、欠損側はnullデータ監査や網羅性チェック
left_semi右側にマッチがある左の行のみ(右のカラムは返さない)存在チェックのフィルタ
left_anti右側にマッチがない左の行のみ除外用のanti-join

サンプルデータ

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
 
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
 
users = spark.createDataFrame(
    [(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
    "user_id INT, name STRING, country STRING",
)
 
orders = spark.createDataFrame(
    [(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
    "user_id INT, order_id STRING, amount DOUBLE",
)

Inner / Left joinと明示的な条件

inner_join = users.join(orders, on="user_id", how="inner")
 
left_join = users.join(orders, on="user_id", how="left")
  • 意図しないcross joinを避けるため、on を明示することを推奨します。
  • 行数を確認しましょう: innerは行数を減らし、leftはすべてのusers行を保持します。

重複カラム名の扱い

joined = (
    users.join(orders, on="user_id", how="left")
    .withColumnRenamed("name", "user_name")
)
  • join後は、重複するカラムを安定した名前にrenameしておきます。
  • あるいは、join前に必要なカラムだけをselectする方法もあります。

フィルタ用途のSemi / Anti join

with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")
  • left_semi は、注文が存在する場合のusersの行だけを返し、右テーブルのカラムは返しません。
  • left_anti は、注文を持たないusersの行を返します。除外条件の実装として効率的です。

Skew回避のための小さいテーブルのBroadcast

small_dim = spark.createDataFrame(
    [("US", "United States"), ("CA", "Canada")],
    "country STRING, country_name STRING",
)
 
joined_dim = users.join(broadcast(small_dim), "country", "left")
  • 小さい側のテーブルがexecutorメモリに十分収まる(数十MB程度まで)場合にbroadcastを使います。
  • broadcastすると、そのテーブル側のshuffleを省略でき、joinを高速化しつつskewの影響を減らせます。

Broadcastすべきでないケース

  • 大きい、あるいはサイズが読めないテーブルのbroadcastは避けてください。executorのメモリ不足を招きます。
  • キー分布がほぼ一様で、テーブルサイズも同程度であれば、通常のshuffle joinで問題ありません。

ありがちな落とし穴と対処

  • 意図しないcross join: on を必ず指定します。cartesian productの場合はSparkが警告します。
  • カラムの二重定義: join前にselectするか、join後にrenameして、後続処理で混乱しないようにします。
  • Skewしたキー: 小さいlookupテーブルはbroadcastし、事実テーブル同士で極端なskewがある場合はsaltingも検討します。
  • 型の不一致: joinキーは事前に同じ型へcastし、マッチしない(空になる)joinを防ぎます。

最小構成のエンドツーエンドパターン

result = (
    users
    .join(broadcast(small_dim), "country", "left")
    .join(orders, "user_id", "left")
    .select(
        "user_id",
        F.col("name").alias("user_name"),
        "country_name",
        "order_id",
        "amount",
    )
)

このパターンでは、joinキーを明示し、次元テーブルの付加にbroadcastを使い、集計やエクスポートにそのまま使える、整理されたカラム名で結果を返しています。