PySpark UDF 教程(适合初学者):pyspark udf 是什么、为什么用、以及如何使用
PySpark 非常擅长快速处理大数据——直到你写了 Spark “看不懂”的业务逻辑。这时就需要 UDF(User Defined Function,用户自定义函数):它允许你在 Spark DataFrame 上运行自定义 Python 代码。
但需要注意:如果使用不当,UDF 也可能让 Spark 变慢。本教程会说明什么时候该用 UDF、如何正确编写,以及如何选择更快的替代方案(例如内置函数或 Pandas UDF)。
UDF 解决的问题是什么
你有一个 DataFrame 列,想应用自定义逻辑,例如:
- 领域特定解析(奇怪的 ID、定制规则)
- 自定义打分函数
- 内置 Spark SQL 函数未覆盖的文本规范化
- 用复杂规则进行映射/转换
Spark 的内置函数在 JVM 里做了深度优化。但纯 Python 逻辑不会自动变成“Spark 原生”的可优化执行计划。
解决方案的选择(最快 → 最慢)
实践中建议优先顺序如下:
- Spark 内置函数(
pyspark.sql.functions)✅ 最快 - SQL 表达式(
expr,when,regexp_extract)✅ 快 - Pandas UDF(基于 Apache Arrow 的向量化)✅ 通常很快
- 普通 Python UDF(逐行 Python)⚠️ 通常最慢
本文重点讲 (3) 和 (4),并强调哪些场景应尽量避免使用它们。
准备:创建一个用于演示的小 DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7),
],
["name", "country", "visits"]
)
df.show()示例 1:一个简单的 Python UDF(字符串清洗)
目标
去掉空格、转小写,并安全处理空值。
from pyspark.sql.functions import udf
def clean_name(x: str) -> str:
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()说明
- 你必须提供返回类型(
StringType())。 - Python UDF 在 Python worker 中逐行执行(在大数据上可能较慢)。
示例 2:尽可能优先使用内置函数
上面的 UDF 可以用内置函数替换(更快):
df_fast = df.withColumn(
"name_clean",
F.lower(F.trim(F.col("name")))
)
df_fast.show()经验法则:如果能用 F.* 表达,就用内置函数。
示例 3:带多个输入参数的 UDF(自定义打分)
基于 country + visits 计算一个分数。
def score(country: str, visits: int) -> float:
if country is None or visits is None:
return 0.0
bonus = 1.5 if country == "US" else 1.0
return float(visits) * bonus
score_udf = udf(score, DoubleType())
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()示例 4:为 Spark SQL 注册 UDF
如果你想在 SQL 里使用:
spark.udf.register("score_udf_sql", score, DoubleType())
df.createOrReplaceTempView("t")
spark.sql("""
SELECT name, country, visits, score_udf_sql(country, visits) AS score
FROM t
""").show()常见错误(以及如何避免)
1)返回了错误的类型
如果 Spark 期望 DoubleType(),但你有时返回字符串,会导致运行时报错或结果变成 null。
2)在 join/filter 中不必要地使用 UDF
UDF 可能阻断 Spark 的优化。尽量先计算出派生列,或者改用内置函数。
3)忘记处理空值
永远假设输入可能是 None。
更快的替代方案:Pandas UDF(向量化)
Pandas UDF 会按批次处理数据(向量化),通常比普通 UDF 更快。
示例 5:用于清洗 name 的 Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()Pandas UDF 适用场景
- 用 pandas 写逻辑更自然
- 需要向量化操作
- 性能重要且内置函数不够用
示例 6:用于逐元素打分的 Pandas UDF
@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
bonus = country.eq("US").astype("float64") * 0.5 + 1.0
visits_filled = visits.fillna(0).astype("float64")
return visits_filled * bonus
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()实用最佳实践清单
- ✅ 优先尝试内置函数(
F.lower,F.when,F.regexp_extract等) - ✅ 必须写自定义逻辑时,先考虑 Pandas UDF 再考虑普通 UDF
- ✅ 始终指定正确的返回类型
- ✅ 显式处理空值(
None) - ✅ 保持 UDF 逻辑纯粹且确定(同输入 → 同输出)
- ⚠️ 尽量避免在 filter/join 中使用 UDF(会降低可优化程度)
- ⚠️ 做性能对比:在抽样数据上比较使用/不使用 UDF 的运行时间
什么时候你“应该”使用普通 Python UDF
以下情况可以使用普通 UDF:
- 数据量不大(或性能不是关键)
- 逻辑很难用内置函数表达
- pandas/Arrow 不可用或不适合该转换
如果性能重要,优先选择 内置函数 或 Pandas UDF。