Skip to content

PySpark UDF vs Pandas UDF vs mapInPandas:应该用哪个?

当你需要在 PySpark 中实现自定义逻辑时,通常会在三种工具里选一个:

  • 普通 Python UDF (udf(...))
  • Pandas UDF (@pandas_udf)
  • mapInPandas(DataFrame → Pandas DataFrames 的迭代器)

它们都能“在 Spark 上运行 Python”,但在性能、灵活性,以及你能保留多少 Spark 优化能力方面差异很大。

这篇指南提供一个实用的决策框架,并附上可直接复制的示例。


心智模型:这三者之间到底变了什么?

1) 普通 UDF(逐行 Python)

  • Spark 会把列数据发送到 Python worker 进程。
  • 你的函数会一行一行地执行。
  • 往往是最慢的。
  • 可能会阻断 Spark 的优化器与代码生成。

**适用场景:**逻辑简单、数据量小,或者速度不重要。


2) Pandas UDF(通过 Arrow 的向量化批处理)

  • Spark 使用 Apache Arrow 以列式批次把数据传给 Python。
  • 你的函数运行在 Pandas Series / DataFrames 上(向量化)。
  • 通常比普通 UDF 快得多。

**适用场景:**需要自定义列逻辑,同时希望更好的性能。


3) mapInPandas(按分区批次的完全控制)

  • Spark 会按分区分批调用你的函数,传入一个Pandas DataFrames 的迭代器
  • 你可以做多列逻辑、复杂变换,甚至行扩展(row expansion)。
  • 很适合在 Python 里做“迷你 ETL”,同时仍由 Spark 负责并行。

适用场景:需要复杂变换,不符合“单列输入 → 单列输出”的形态。


快速决策表

你的需求…最佳选择
简单自定义变换、数据量低普通 UDF
按列变换、中/大数据量Pandas UDF
复杂逻辑:多列、多输出行、在 pandas 内做 join、依赖较重的 Python libsmapInPandas
在可行前提下追求极致性能Spark SQL 内置函数(避免使用这三者)

经验法则:Spark 内置函数 > Pandas UDF > mapInPandas > 普通 UDF(通常如此,但不绝对)。


示例数据集

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [(" Alice  ", "US", 10),
     ("bob", "UK", 3),
     (None, "US", 7)],
    ["name", "country", "visits"]
)

普通 UDF 示例(简单但慢)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
def clean_name(x):
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()

优点

  • 最容易理解
  • 到处都能用

缺点

  • Python 逐行处理的额外开销
  • 经常会阻止 Spark 做更激进的优化

Pandas UDF 示例(向量化,通常更快)

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()

优点

  • 批处理 + 向量化
  • 在大列数据上吞吐量更高

缺点

  • 需要 Arrow 支持与兼容的运行环境
  • 仍然在 Python 侧执行,仍不如内置函数那么可优化

mapInPandas 示例(最灵活)

用例:输出多个派生列 + 自定义规则

比如你想要:

  • 清洗后的 name
  • 基于 country 与 visits 的 score
  • 分桶标签(bucket)
import pandas as pd
 
def transform(pdf_iter):
    for pdf in pdf_iter:
        pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
        pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
        pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0)  # US -> 2.0x, else 1.0x
        pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
        yield pdf
 
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
 
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()

优点

  • 灵活性极强
  • 非常适合“按分区执行的 pandas 流水线”
  • 可以扩展行、同时计算多个输出、调用外部库(需谨慎)

缺点

  • 必须正确声明 schema
  • 更容易不小心造成数据倾斜 / 超大分区
  • 仍有 Python/Arrow 传输与执行开销

性能到底如何选?

你不需要做完美的 benchmark 才能选对。可以用这些实用经验来判断:

普通 UDF 通常最糟的情况:

  • 上千万行级别的数据
  • 本来 Spark 原生就能做的简单变换
  • 在 filter/join 中使用

Pandas UDF 发挥优势的情况:

  • 对一列或多列做向量化变换
  • 能用 pandas/numpy 把逻辑写得足够“向量化/批处理友好”

mapInPandas 最适合的情况:

  • 需要多步变换,用 Spark SQL 写会非常痛苦
  • 想一次生成多个列
  • 需要行扩展或复杂的条件逻辑

正确性与 schema 常见坑

Pandas UDF

  • 输出必须与声明类型严格一致。
  • 可能出现 Null/NaN;要显式处理。

mapInPandas

  • 输出 DataFrame 必须匹配 schema:列名 + dtypes + 顺序。
  • 小心 Python 的 object dtype;要显式 cast 成 string/float 等。

“尽量避免”的清单(常见反模式)

  • 用普通 UDF 做基础字符串操作lowertrim、regex)→ 用 Spark 内置函数。
  • 在 UDF 里调用网络 API → 会慢、不稳定、也很难安全重试。
  • 在 Pandas UDF/mapInPandas 里做逐行 Python 循环 → 会抹掉向量化的优势。
  • 返回不一致类型(有时 int、有时 string)→ 运行时失败 / 产生 null。

推荐决策流程

  1. Spark 内置函数能做吗? → 用内置函数。

  2. 是按列变换(输入输出行数一致)吗? → 用 Pandas UDF

  3. 需要多列逻辑、多输出列,或需要行扩展吗? → 用 mapInPandas

  4. 数据小 / 快速原型? → 普通 UDF 可以接受。


最后一个建议:用 Spark 执行计划验证

即使不做完整 benchmark,你也能从下面的信息学到很多:

df_pandas_udf.explain(True)

如果你看到加了自定义函数后 Spark 的优化变少了,这是一个信号:要么尽量改用内置函数,要么重构实现方式。