PySpark UDF vs Pandas UDF vs mapInPandas:应该用哪个?
当你需要在 PySpark 中实现自定义逻辑时,通常会在三种工具里选一个:
- 普通 Python UDF (
udf(...)) - Pandas UDF (
@pandas_udf) mapInPandas(DataFrame → Pandas DataFrames 的迭代器)
它们都能“在 Spark 上运行 Python”,但在性能、灵活性,以及你能保留多少 Spark 优化能力方面差异很大。
这篇指南提供一个实用的决策框架,并附上可直接复制的示例。
心智模型:这三者之间到底变了什么?
1) 普通 UDF(逐行 Python)
- Spark 会把列数据发送到 Python worker 进程。
- 你的函数会一行一行地执行。
- 往往是最慢的。
- 可能会阻断 Spark 的优化器与代码生成。
**适用场景:**逻辑简单、数据量小,或者速度不重要。
2) Pandas UDF(通过 Arrow 的向量化批处理)
- Spark 使用 Apache Arrow 以列式批次把数据传给 Python。
- 你的函数运行在 Pandas Series / DataFrames 上(向量化)。
- 通常比普通 UDF 快得多。
**适用场景:**需要自定义列逻辑,同时希望更好的性能。
3) mapInPandas(按分区批次的完全控制)
- Spark 会按分区分批调用你的函数,传入一个Pandas DataFrames 的迭代器。
- 你可以做多列逻辑、复杂变换,甚至行扩展(row expansion)。
- 很适合在 Python 里做“迷你 ETL”,同时仍由 Spark 负责并行。
适用场景:需要复杂变换,不符合“单列输入 → 单列输出”的形态。
快速决策表
| 你的需求… | 最佳选择 |
|---|---|
| 简单自定义变换、数据量低 | 普通 UDF |
| 按列变换、中/大数据量 | Pandas UDF |
| 复杂逻辑:多列、多输出行、在 pandas 内做 join、依赖较重的 Python libs | mapInPandas |
| 在可行前提下追求极致性能 | Spark SQL 内置函数(避免使用这三者) |
经验法则:Spark 内置函数 > Pandas UDF > mapInPandas > 普通 UDF(通常如此,但不绝对)。
示例数据集
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7)],
["name", "country", "visits"]
)普通 UDF 示例(简单但慢)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def clean_name(x):
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()优点
- 最容易理解
- 到处都能用
缺点
- Python 逐行处理的额外开销
- 经常会阻止 Spark 做更激进的优化
Pandas UDF 示例(向量化,通常更快)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()优点
- 批处理 + 向量化
- 在大列数据上吞吐量更高
缺点
- 需要 Arrow 支持与兼容的运行环境
- 仍然在 Python 侧执行,仍不如内置函数那么可优化
mapInPandas 示例(最灵活)
用例:输出多个派生列 + 自定义规则
比如你想要:
- 清洗后的 name
- 基于 country 与 visits 的 score
- 分桶标签(bucket)
import pandas as pd
def transform(pdf_iter):
for pdf in pdf_iter:
pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0) # US -> 2.0x, else 1.0x
pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
yield pdf
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()优点
- 灵活性极强
- 非常适合“按分区执行的 pandas 流水线”
- 可以扩展行、同时计算多个输出、调用外部库(需谨慎)
缺点
- 必须正确声明 schema
- 更容易不小心造成数据倾斜 / 超大分区
- 仍有 Python/Arrow 传输与执行开销
性能到底如何选?
你不需要做完美的 benchmark 才能选对。可以用这些实用经验来判断:
普通 UDF 通常最糟的情况:
- 上千万行级别的数据
- 本来 Spark 原生就能做的简单变换
- 在 filter/join 中使用
Pandas UDF 发挥优势的情况:
- 对一列或多列做向量化变换
- 能用 pandas/numpy 把逻辑写得足够“向量化/批处理友好”
mapInPandas 最适合的情况:
- 需要多步变换,用 Spark SQL 写会非常痛苦
- 想一次生成多个列
- 需要行扩展或复杂的条件逻辑
正确性与 schema 常见坑
Pandas UDF
- 输出必须与声明类型严格一致。
- 可能出现 Null/NaN;要显式处理。
mapInPandas
- 输出 DataFrame 必须匹配 schema:列名 + dtypes + 顺序。
- 小心 Python 的
objectdtype;要显式 cast 成 string/float 等。
“尽量避免”的清单(常见反模式)
- 用普通 UDF 做基础字符串操作(
lower、trim、regex)→ 用 Spark 内置函数。 - 在 UDF 里调用网络 API → 会慢、不稳定、也很难安全重试。
- 在 Pandas UDF/mapInPandas 里做逐行 Python 循环 → 会抹掉向量化的优势。
- 返回不一致类型(有时 int、有时 string)→ 运行时失败 / 产生 null。
推荐决策流程
-
Spark 内置函数能做吗? → 用内置函数。
-
是按列变换(输入输出行数一致)吗? → 用 Pandas UDF。
-
需要多列逻辑、多输出列,或需要行扩展吗? → 用
mapInPandas。 -
数据小 / 快速原型? → 普通 UDF 可以接受。
最后一个建议:用 Spark 执行计划验证
即使不做完整 benchmark,你也能从下面的信息学到很多:
df_pandas_udf.explain(True)如果你看到加了自定义函数后 Spark 的优化变少了,这是一个信号:要么尽量改用内置函数,要么重构实现方式。