Skip to content

PySpark UDF 教程(适合初学者):pyspark udf 是什么、为什么用、以及如何使用

PySpark 非常擅长快速处理大数据——直到你写了 Spark “看不懂”的业务逻辑。这时就需要 UDF(User Defined Function,用户自定义函数):它允许你在 Spark DataFrame 上运行自定义 Python 代码

但需要注意:如果使用不当,UDF 也可能让 Spark 变慢。本教程会说明什么时候该用 UDF、如何正确编写,以及如何选择更快的替代方案(例如内置函数或 Pandas UDF)。


UDF 解决的问题是什么

你有一个 DataFrame 列,想应用自定义逻辑,例如:

  • 领域特定解析(奇怪的 ID、定制规则)
  • 自定义打分函数
  • 内置 Spark SQL 函数未覆盖的文本规范化
  • 用复杂规则进行映射/转换

Spark 的内置函数在 JVM 里做了深度优化。但纯 Python 逻辑不会自动变成“Spark 原生”的可优化执行计划。


解决方案的选择(最快 → 最慢)

实践中建议优先顺序如下:

  1. Spark 内置函数pyspark.sql.functions)✅ 最快
  2. SQL 表达式expr, when, regexp_extract)✅ 快
  3. Pandas UDF(基于 Apache Arrow 的向量化)✅ 通常很快
  4. 普通 Python UDF(逐行 Python)⚠️ 通常最慢

本文重点讲 (3) 和 (4),并强调哪些场景应尽量避免使用它们。


准备:创建一个用于演示的小 DataFrame

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [
        (" Alice  ", "US", 10),
        ("bob", "UK", 3),
        (None, "US", 7),
    ],
    ["name", "country", "visits"]
)
 
df.show()

示例 1:一个简单的 Python UDF(字符串清洗)

目标

去掉空格、转小写,并安全处理空值。

from pyspark.sql.functions import udf
 
def clean_name(x: str) -> str:
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()

说明

  • 必须提供返回类型(StringType())。
  • Python UDF 在 Python worker 中逐行执行(在大数据上可能较慢)。

示例 2:尽可能优先使用内置函数

上面的 UDF 可以用内置函数替换(更快):

df_fast = df.withColumn(
    "name_clean",
    F.lower(F.trim(F.col("name")))
)
df_fast.show()

经验法则:如果能用 F.* 表达,就用内置函数。


示例 3:带多个输入参数的 UDF(自定义打分)

基于 country + visits 计算一个分数。

def score(country: str, visits: int) -> float:
    if country is None or visits is None:
        return 0.0
    bonus = 1.5 if country == "US" else 1.0
    return float(visits) * bonus
 
score_udf = udf(score, DoubleType())
 
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()

示例 4:为 Spark SQL 注册 UDF

如果你想在 SQL 里使用:

spark.udf.register("score_udf_sql", score, DoubleType())
 
df.createOrReplaceTempView("t")
spark.sql("""
  SELECT name, country, visits, score_udf_sql(country, visits) AS score
  FROM t
""").show()

常见错误(以及如何避免)

1)返回了错误的类型

如果 Spark 期望 DoubleType(),但你有时返回字符串,会导致运行时报错或结果变成 null。

2)在 join/filter 中不必要地使用 UDF

UDF 可能阻断 Spark 的优化。尽量先计算出派生列,或者改用内置函数。

3)忘记处理空值

永远假设输入可能是 None


更快的替代方案:Pandas UDF(向量化)

Pandas UDF 会按批次处理数据(向量化),通常比普通 UDF 更快。

示例 5:用于清洗 name 的 Pandas UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()

Pandas UDF 适用场景

  • 用 pandas 写逻辑更自然
  • 需要向量化操作
  • 性能重要且内置函数不够用

示例 6:用于逐元素打分的 Pandas UDF

@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
    bonus = country.eq("US").astype("float64") * 0.5 + 1.0
    visits_filled = visits.fillna(0).astype("float64")
    return visits_filled * bonus
 
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()

实用最佳实践清单

  • ✅ 优先尝试内置函数(F.lower, F.when, F.regexp_extract 等)
  • ✅ 必须写自定义逻辑时,先考虑 Pandas UDF 再考虑普通 UDF
  • ✅ 始终指定正确的返回类型
  • ✅ 显式处理空值(None
  • ✅ 保持 UDF 逻辑纯粹且确定(同输入 → 同输出)
  • ⚠️ 尽量避免在 filter/join 中使用 UDF(会降低可优化程度)
  • ⚠️ 做性能对比:在抽样数据上比较使用/不使用 UDF 的运行时间

什么时候你“应该”使用普通 Python UDF

以下情况可以使用普通 UDF:

  • 数据量不大(或性能不是关键)
  • 逻辑很难用内置函数表达
  • pandas/Arrow 不可用或不适合该转换

如果性能重要,优先选择 内置函数Pandas UDF