Skip to content

PySpark groupBy 和聚合:在大规模数据下获得精确汇总

Updated on

当汇总计算出现重复计数、遗漏空值分组、或掩盖基数(cardinality)问题时,报表就会“失真”。PySpark 的 groupByagg 能保持汇总结果的准确性,但前提是选择了合适的聚合函数和列别名。

本文介绍一套可靠的聚合模式:多指标一次性计算、不同的去重计数方案、如何处理分组键中的 null,以及如何为下游使用排序结果。

想在 Python Pandas DataFrame 上“零代码”快速创建数据可视化?

PyGWalker 是一个用于可视化 Exploratory Data Analysis 的 Python 库。PyGWalker (opens in a new tab) 可以简化你在 Jupyter Notebook 中的数据分析与可视化流程,把你的 pandas dataframe(以及 polars dataframe)转换成类似 Tableau 的可视化探索界面。

PyGWalker for Data visualization (opens in a new tab)

聚合方式选择

需求函数说明
计数(包含 null)count("*")快,按行计数
去重计数countDistinct(col)精确但更耗资源
近似去重计数approx_count_distinct(col, rsd)可调误差,更快
对 null 安全的 sum/avgsum, avg默认忽略 null
一次完成多指标统计带别名的 agg保持列名稳定

环境准备

from pyspark.sql import SparkSession, functions as F
 
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
 
sales = spark.createDataFrame(
    [
        ("2025-11-01", "US", "mobile", 120.0, "A123"),
        ("2025-11-01", "US", "web",    80.0,  "A124"),
        ("2025-11-01", "CA", "mobile", None,  "A125"),
        ("2025-11-02", "US", "mobile", 200.0, "A126"),
        ("2025-11-02", None, "web",    50.0,  "A127"),
    ],
    "date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)

多重聚合与别名

daily = (
    sales.groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.avg("revenue").alias("revenue_avg"),
    )
)
  • 使用别名可以让下游使用的列名保持稳定。
  • sum / avg 会自动忽略 revenue 为 null 的行。

精确去重计数与近似去重计数

unique_orders = (
    sales.groupBy("date")
    .agg(
        F.countDistinct("order_id").alias("orders_exact"),
        F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
    )
)
  • 当性能优先且允许运算有少量误差时,可以使用 approx_count_distinct

使用类似 HAVING 的条件过滤聚合结果

high_rev = daily.where(F.col("revenue_sum") > 150)
  • 在聚合完成之后再过滤,行为类似 SQL 中的 HAVING

结果排序

ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")
  • 聚合之后再排序;如果不使用 orderBy,Spark 不保证结果顺序。

处理分组键中的 null

null_country_stats = (
    sales.groupBy("country")
    .agg(F.count("*").alias("rows"))
)
  • groupBy 会保留为 null 的分组键;在聚合前要决定是否用 fillna 做填补。

使用 pivot 做简单交叉表

pivoted = (
    sales.groupBy("date")
    .pivot("channel", ["mobile", "web"])
    .agg(F.sum("revenue"))
)
  • 限定 pivot 的取值范围,防止列数量爆炸式增长。

一个精简的端到端汇总示例

summary = (
    sales
    .groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.countDistinct("order_id").alias("unique_orders"),
    )
    .orderBy("date", "country")
)

这个示例能生成稳定的日度指标,同时保留对 country 中 null 的处理决策,并让列名在下游 join 或导出时保持一致。