Skip to content

PySpark UDF チュートリアル(初心者向け):pyspark udf とは?なぜ必要?どう使う?

PySpark は大規模データを高速に処理するのが得意です——ただし、Spark が理解できないロジックを書いた瞬間に話が変わります。そこで登場するのが UDF(User Defined Function) です。UDF を使うと、Spark DataFrame 上で 独自の Python コード を実行できます。

ただし注意点もあります。UDF は使い方を誤ると Spark を 遅く してしまうことがあります。このチュートリアルでは、UDF を使うべき場面、正しい書き方、そしてより高速な代替手段(組み込み関数や Pandas UDF など)の選び方を解説します。


UDF が解決する問題

DataFrame の列に対して、独自のロジックを適用したいケースがあります。

  • ドメイン固有のパース(変な ID、独自ルール)
  • カスタムなスコアリング関数
  • 組み込みの Spark SQL 関数ではカバーできないテキスト正規化
  • 複雑なルールに基づく値のマッピング

Spark の組み込み関数は JVM 上で最適化されています。一方、純粋な Python ロジック は自動的に「Spark ネイティブ」にはなりません。


解決策の選択肢(最速 → 最遅)

実務では、基本的に次の優先順位がおすすめです。

  1. 組み込みの Spark 関数pyspark.sql.functions) ✅ 最速
  2. SQL 式expr, when, regexp_extract) ✅ 高速
  3. Pandas UDF(Apache Arrow によるベクトル化) ✅ 多くの場合高速
  4. 通常の Python UDF(行ごとに Python 実行) ⚠️ 多くの場合最も遅い

この記事は (3) と (4) に焦点を当てつつ、避けるべき場面についてもしっかりガイドします。


セットアップ:試しやすい小さな DataFrame

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [
        (" Alice  ", "US", 10),
        ("bob", "UK", 3),
        (None, "US", 7),
    ],
    ["name", "country", "visits"]
)
 
df.show()

例1:シンプルな Python UDF(文字列のクリーニング)

目的

前後の空白を削除し、小文字化し、null も安全に扱う。

from pyspark.sql.functions import udf
 
def clean_name(x: str) -> str:
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()

メモ

  • 戻り値型(StringType())は 必ず 指定します。
  • Python UDF は Python ワーカーで行単位に実行されるため(大規模データでは)遅くなりがちです。

例2:可能なら組み込み関数を優先する

先ほどの UDF は組み込み関数で置き換えられます(より高速):

df_fast = df.withColumn(
    "name_clean",
    F.lower(F.trim(F.col("name")))
)
df_fast.show()

経験則:F.* で表現できるなら、そちらを使うのが基本です。


例3:複数入力の UDF(カスタムスコアリング)

country + visits からスコアを作ってみます。

def score(country: str, visits: int) -> float:
    if country is None or visits is None:
        return 0.0
    bonus = 1.5 if country == "US" else 1.0
    return float(visits) * bonus
 
score_udf = udf(score, DoubleType())
 
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()

例4:Spark SQL 用に UDF を登録する

SQL で使いたい場合:

spark.udf.register("score_udf_sql", score, DoubleType())
 
df.createOrReplaceTempView("t")
spark.sql("""
  SELECT name, country, visits, score_udf_sql(country, visits) AS score
  FROM t
""").show()

よくあるミス(と回避方法)

1) 間違った型を返す

Spark 側が DoubleType() を期待しているのに、場合によって文字列を返してしまうと、実行時エラーや null になったりします。

2) join / filter で不要に UDF を使う

UDF は Spark の最適化を妨げることがあります。まず派生列を計算してから使う、または組み込み関数を使うことを検討してください。

3) null ハンドリングを忘れる

入力は常に None になり得る前提で書きます。


より速い代替:Pandas UDF(ベクトル化)

Pandas UDF はデータをまとめて(バッチで)処理するため(ベクトル化)、通常の UDF より高速なことが多いです。

例5:名前クリーニングの Pandas UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()

Pandas UDF が効く場面

  • pandas の方がロジックを書きやすい
  • ベクトル化された操作が必要
  • パフォーマンスが重要で、組み込み関数では足りない

例6:要素ごとのスコアリングの Pandas UDF

@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
    bonus = country.eq("US").astype("float64") * 0.5 + 1.0
    visits_filled = visits.fillna(0).astype("float64")
    return visits_filled * bonus
 
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()

実務で使えるベストプラクティス・チェックリスト

  • ✅ まず組み込み関数を試す(F.lower, F.when, F.regexp_extract など)
  • ✅ カスタムロジックが必要なら、通常の UDF の前に Pandas UDF を検討
  • ✅ 正しい戻り値型を必ず指定する
  • ✅ null(None)を明示的に扱う
  • ✅ UDF のロジックは純粋関数・決定的(同じ入力 → 同じ出力)に保つ
  • ⚠️ 可能なら filter / join 内で UDF を避ける(最適化が効きにくくなる)
  • ⚠️ 計測する:サンプルデータで UDF あり/なしの実行時間を比較する

通常の Python UDF を使うべき場面

通常の UDF を使うのが適しているのは次のような場合です。

  • データ量がそれほど大きくない(または性能要件が厳しくない)
  • 組み込み関数では表現しづらいロジックが必要
  • pandas / Arrow が利用できない、または変換に合わない

パフォーマンスが重要なら、まずは 組み込み関数、次に Pandas UDF を優先してください。