PySpark UDF チュートリアル(初心者向け):pyspark udf とは?なぜ必要?どう使う?
PySpark は大規模データを高速に処理するのが得意です——ただし、Spark が理解できないロジックを書いた瞬間に話が変わります。そこで登場するのが UDF(User Defined Function) です。UDF を使うと、Spark DataFrame 上で 独自の Python コード を実行できます。
ただし注意点もあります。UDF は使い方を誤ると Spark を 遅く してしまうことがあります。このチュートリアルでは、UDF を使うべき場面、正しい書き方、そしてより高速な代替手段(組み込み関数や Pandas UDF など)の選び方を解説します。
UDF が解決する問題
DataFrame の列に対して、独自のロジックを適用したいケースがあります。
- ドメイン固有のパース(変な ID、独自ルール)
- カスタムなスコアリング関数
- 組み込みの Spark SQL 関数ではカバーできないテキスト正規化
- 複雑なルールに基づく値のマッピング
Spark の組み込み関数は JVM 上で最適化されています。一方、純粋な Python ロジック は自動的に「Spark ネイティブ」にはなりません。
解決策の選択肢(最速 → 最遅)
実務では、基本的に次の優先順位がおすすめです。
- 組み込みの Spark 関数(
pyspark.sql.functions) ✅ 最速 - SQL 式(
expr,when,regexp_extract) ✅ 高速 - Pandas UDF(Apache Arrow によるベクトル化) ✅ 多くの場合高速
- 通常の Python UDF(行ごとに Python 実行) ⚠️ 多くの場合最も遅い
この記事は (3) と (4) に焦点を当てつつ、避けるべき場面についてもしっかりガイドします。
セットアップ:試しやすい小さな DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7),
],
["name", "country", "visits"]
)
df.show()例1:シンプルな Python UDF(文字列のクリーニング)
目的
前後の空白を削除し、小文字化し、null も安全に扱う。
from pyspark.sql.functions import udf
def clean_name(x: str) -> str:
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()メモ
- 戻り値型(
StringType())は 必ず 指定します。 - Python UDF は Python ワーカーで行単位に実行されるため(大規模データでは)遅くなりがちです。
例2:可能なら組み込み関数を優先する
先ほどの UDF は組み込み関数で置き換えられます(より高速):
df_fast = df.withColumn(
"name_clean",
F.lower(F.trim(F.col("name")))
)
df_fast.show()経験則:F.* で表現できるなら、そちらを使うのが基本です。
例3:複数入力の UDF(カスタムスコアリング)
country + visits からスコアを作ってみます。
def score(country: str, visits: int) -> float:
if country is None or visits is None:
return 0.0
bonus = 1.5 if country == "US" else 1.0
return float(visits) * bonus
score_udf = udf(score, DoubleType())
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()例4:Spark SQL 用に UDF を登録する
SQL で使いたい場合:
spark.udf.register("score_udf_sql", score, DoubleType())
df.createOrReplaceTempView("t")
spark.sql("""
SELECT name, country, visits, score_udf_sql(country, visits) AS score
FROM t
""").show()よくあるミス(と回避方法)
1) 間違った型を返す
Spark 側が DoubleType() を期待しているのに、場合によって文字列を返してしまうと、実行時エラーや null になったりします。
2) join / filter で不要に UDF を使う
UDF は Spark の最適化を妨げることがあります。まず派生列を計算してから使う、または組み込み関数を使うことを検討してください。
3) null ハンドリングを忘れる
入力は常に None になり得る前提で書きます。
より速い代替:Pandas UDF(ベクトル化)
Pandas UDF はデータをまとめて(バッチで)処理するため(ベクトル化)、通常の UDF より高速なことが多いです。
例5:名前クリーニングの Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()Pandas UDF が効く場面
- pandas の方がロジックを書きやすい
- ベクトル化された操作が必要
- パフォーマンスが重要で、組み込み関数では足りない
例6:要素ごとのスコアリングの Pandas UDF
@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
bonus = country.eq("US").astype("float64") * 0.5 + 1.0
visits_filled = visits.fillna(0).astype("float64")
return visits_filled * bonus
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()実務で使えるベストプラクティス・チェックリスト
- ✅ まず組み込み関数を試す(
F.lower,F.when,F.regexp_extractなど) - ✅ カスタムロジックが必要なら、通常の UDF の前に Pandas UDF を検討
- ✅ 正しい戻り値型を必ず指定する
- ✅ null(
None)を明示的に扱う - ✅ UDF のロジックは純粋関数・決定的(同じ入力 → 同じ出力)に保つ
- ⚠️ 可能なら filter / join 内で UDF を避ける(最適化が効きにくくなる)
- ⚠️ 計測する:サンプルデータで UDF あり/なしの実行時間を比較する
通常の Python UDF を使うべき場面
通常の UDF を使うのが適しているのは次のような場合です。
- データ量がそれほど大きくない(または性能要件が厳しくない)
- 組み込み関数では表現しづらいロジックが必要
- pandas / Arrow が利用できない、または変換に合わない
パフォーマンスが重要なら、まずは 組み込み関数、次に Pandas UDF を優先してください。