PySpark UDF vs Pandas UDF vs mapInPandas: どれを使うべき?
PySpark でカスタムロジックが必要になったとき、通常は次の 3 つのいずれかを選びます。
- 通常の Python UDF (
udf(...)) - Pandas UDF (
@pandas_udf) mapInPandas(DataFrame → Pandas DataFrames の iterator)
どれも「Spark 上で Python を動かす」ことはできますが、パフォーマンス、柔軟性、そして Spark の最適化をどれだけ維持できるかという点で挙動が大きく異なります。
このガイドでは、実用的な意思決定フレームワークと、そのままコピーできる例を紹介します。
メンタルモデル:3 つの違いは何か?
1) 通常の UDF(行ごとの Python)
- Spark が列データを Python の worker プロセスへ送ります。
- 関数は 1 行ずつ 実行されます。
- 多くの場合 最も遅い です。
- Spark のオプティマイザやコード生成を阻害することがあります。
使いどころ: ロジックが単純、データが小さい、または速度が重要でない場合。
2) Pandas UDF(Arrow によるベクトル化バッチ)
- Spark は Apache Arrow を使い、Python に 列指向のバッチ でデータを送ります。
- 関数は Pandas Series / DataFrames に対して(ベクトル化して)動作します。
- 通常の UDF より大幅に高速なことが多いです。
使いどころ: カスタムの列ロジックが必要で、性能も良くしたい場合。
3) mapInPandas(パーティション単位のバッチで完全に制御)
- Spark はパーティションのチャンクごとに 1 回関数を呼び、Pandas DataFrames の iterator を渡します。
- 複数列にまたがるロジック、複雑な変換、行の増減(row expansion)まで扱えます。
- Spark の並列性を保ったまま、Python で「ミニ ETL」を行うのに向きます。
使いどころ: 「1 列入力 → 1 列出力」という形に収まらない 複雑な変換 が必要な場合。
クイック意思決定表
| 必要なこと… | 最適な選択 |
|---|---|
| 単純なカスタム変換、低ボリューム | Regular UDF |
| 列単位の変換、中〜大規模データ | Pandas UDF |
| 複雑なロジック:複数列、複数出力行、pandas 内での join、重い Python ライブラリ | mapInPandas |
| 可能なら最大の性能 | 組み込みの Spark SQL 関数(3 つすべてを避ける) |
ルール:組み込み Spark 関数 > Pandas UDF > mapInPandas > 通常の UDF(一般的な傾向であり、絶対ではありません)。
例のデータセット
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7)],
["name", "country", "visits"]
)Regular UDF の例(シンプルだが遅い)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def clean_name(x):
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()Pros
- 理解しやすい
- どこでも動く
Cons
- 行ごとの Python オーバーヘッド
- Spark が積極的な最適化をしにくくなることが多い
Pandas UDF の例(ベクトル化、通常は高速)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()Pros
- バッチ処理 + ベクトル化
- 大きな列に対してスループットが大幅に向上しやすい
Cons
- Arrow サポートと互換性のある環境が必要
- 依然として Python 側であり、組み込み関数ほど最適化されない
mapInPandas の例(最も柔軟)
ユースケース:派生列を複数出力 + カスタムルール
たとえば次を作りたいとします:
- クレンジングした name
- country と visits に基づく score
- bucket ラベル
import pandas as pd
def transform(pdf_iter):
for pdf in pdf_iter:
pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0) # US -> 2.0x, else 1.0x
pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
yield pdf
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()Pros
- 非常に柔軟
- 「パーティションごとの pandas パイプライン」に向く
- 行の増減、複数出力の計算、外部ライブラリ呼び出しも可能(注意して使う)
Cons
- schema を正しく定義する必要がある
- 偏り(skew)や巨大パーティションをうっかり作りやすい
- 依然として Python/Arrow のオーバーヘッドはある
パフォーマンスはどう考える?
完璧なベンチマークがなくても、次の実用的な目安で十分に選べます。
Regular UDF がたいてい最悪になりやすいのは:
- 数千万行規模
- Spark がネイティブにできる単純変換
- filter/join の中で使う場合
Pandas UDF が輝くのは:
- 1 列または複数列を、ベクトル化演算で変換する場合
- pandas/numpy で効率よく書けるロジックの場合
mapInPandas が最適なのは:
- Spark SQL で書くのがつらい多段の変換が必要
- 複数列を一度に作りたい
- 行の増減(row expansion)や複雑な条件分岐が必要
正しさと schema の落とし穴
Pandas UDF
- 出力は宣言した型と完全に一致している必要があります。
- Null/NaN が出ることがあるため、明示的に扱ってください。
mapInPandas
- 出力 DataFrame は schema と一致する必要があります(列名 + dtype + 順序)。
- Python の
objectdtype に注意し、string/float などへ明示的に cast してください。
「避けるべき」一覧(よくあるアンチパターン)
- 基本的な文字列操作に Regular UDF を使う(
lower,trim, regex)→ Spark の組み込み関数を使う。 - UDF の中でネットワーク API を呼ぶ → 遅く、不安定で、安全にリトライしづらい。
- Pandas UDF/mapInPandas の中で 行ごとの重い Python ループ → ベクトル化の利点を潰します。
- 型が一貫しない値を返す(int のときもあれば string のときもある)→ 実行時エラーや null の原因。
推奨の意思決定フロー
-
Spark の組み込み関数でできる? → 組み込み関数を使う。
-
列単位の変換(入出力の行数が同じ)? → Pandas UDF を使う。
-
複数列ロジック、複数出力列、または行の増減が必要? →
mapInPandasを使う。 -
小さなデータ / 素早いプロトタイプ? → Regular UDF でも許容。
最後のヒント:Spark のプランで検証する
フルのベンチマークをしなくても、次のようにして多くが分かります:
df_pandas_udf.explain(True)関数を追加した後に Spark の最適化が弱くなっているのが見えたら、組み込み関数を試すか、構成を見直すサインです。