PySpark groupBy 和聚合:在大规模数据下获得精确汇总
Updated on
当汇总计算出现重复计数、遗漏空值分组、或掩盖基数(cardinality)问题时,报表就会“失真”。PySpark 的 groupBy 和 agg 能保持汇总结果的准确性,但前提是选择了合适的聚合函数和列别名。
本文介绍一套可靠的聚合模式:多指标一次性计算、不同的去重计数方案、如何处理分组键中的 null,以及如何为下游使用排序结果。
想在 Python Pandas DataFrame 上“零代码”快速创建数据可视化?
PyGWalker 是一个用于可视化 Exploratory Data Analysis 的 Python 库。PyGWalker (opens in a new tab) 可以简化你在 Jupyter Notebook 中的数据分析与可视化流程,把你的 pandas dataframe(以及 polars dataframe)转换成类似 Tableau 的可视化探索界面。
聚合方式选择
| 需求 | 函数 | 说明 |
|---|---|---|
| 计数(包含 null) | count("*") | 快,按行计数 |
| 去重计数 | countDistinct(col) | 精确但更耗资源 |
| 近似去重计数 | approx_count_distinct(col, rsd) | 可调误差,更快 |
| 对 null 安全的 sum/avg | sum, avg | 默认忽略 null |
| 一次完成多指标统计 | 带别名的 agg | 保持列名稳定 |
环境准备
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
sales = spark.createDataFrame(
[
("2025-11-01", "US", "mobile", 120.0, "A123"),
("2025-11-01", "US", "web", 80.0, "A124"),
("2025-11-01", "CA", "mobile", None, "A125"),
("2025-11-02", "US", "mobile", 200.0, "A126"),
("2025-11-02", None, "web", 50.0, "A127"),
],
"date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)多重聚合与别名
daily = (
sales.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.avg("revenue").alias("revenue_avg"),
)
)- 使用别名可以让下游使用的列名保持稳定。
sum/avg会自动忽略revenue为 null 的行。
精确去重计数与近似去重计数
unique_orders = (
sales.groupBy("date")
.agg(
F.countDistinct("order_id").alias("orders_exact"),
F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
)
)- 当性能优先且允许运算有少量误差时,可以使用
approx_count_distinct。
使用类似 HAVING 的条件过滤聚合结果
high_rev = daily.where(F.col("revenue_sum") > 150)- 在聚合完成之后再过滤,行为类似 SQL 中的
HAVING。
结果排序
ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")- 聚合之后再排序;如果不使用
orderBy,Spark 不保证结果顺序。
处理分组键中的 null
null_country_stats = (
sales.groupBy("country")
.agg(F.count("*").alias("rows"))
)groupBy会保留为 null 的分组键;在聚合前要决定是否用fillna做填补。
使用 pivot 做简单交叉表
pivoted = (
sales.groupBy("date")
.pivot("channel", ["mobile", "web"])
.agg(F.sum("revenue"))
)- 限定 pivot 的取值范围,防止列数量爆炸式增长。
一个精简的端到端汇总示例
summary = (
sales
.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.countDistinct("order_id").alias("unique_orders"),
)
.orderBy("date", "country")
)这个示例能生成稳定的日度指标,同时保留对 country 中 null 的处理决策,并让列名在下游 join 或导出时保持一致。
