PySpark groupBy と Aggregation: スケールする正確なサマリー集計
Updated on
集計結果が二重カウントしていたり、null グループを取りこぼしていたり、カーディナリティの問題を隠してしまうと、レポーティングは一気に信頼できなくなります。PySpark の groupBy と agg はロールアップを正確に保てますが、そのためには適切な関数とエイリアスの選択が欠かせません。
このガイドでは、信頼できる集計パターンを紹介します。複数メトリクスの同時計算、distinct カウントの選択肢、null グループの扱い方、下流処理で使いやすい結果の並び替えまでカバーします。
コードを書かずに、Python Pandas Dataframe からすばやく Data Visualization を作りたいですか?
PyGWalker は Visualization 付きの Exploratory Data Analysis のための Python ライブラリです。PyGWalker (opens in a new tab) を使うと、pandas dataframe(および polars dataframe)を、可視化探索のための tableau 代替 UI に変換し、Jupyter Notebook でのデータ分析・データ可視化ワークフローをシンプルにできます。
集計関数の選び方
| Need | Function | Notes |
|---|---|---|
| 件数(null を含む) | count("*") | 高速・行数をカウント |
| Distinct 件数 | countDistinct(col) | 正確だが重め |
| 近似 distinct 件数 | approx_count_distinct(col, rsd) | 誤差を調整可能・高速 |
| null セーフな sum/avg | sum, avg | デフォルトで null を無視 |
| 1 パスで複数メトリクス | agg + alias | 列名を安定させる |
セットアップ
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
sales = spark.createDataFrame(
[
("2025-11-01", "US", "mobile", 120.0, "A123"),
("2025-11-01", "US", "web", 80.0, "A124"),
("2025-11-01", "CA", "mobile", None, "A125"),
("2025-11-02", "US", "mobile", 200.0, "A126"),
("2025-11-02", None, "web", 50.0, "A127"),
],
"date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)エイリアス付きの複数集計
daily = (
sales.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.avg("revenue").alias("revenue_avg"),
)
)- エイリアスで下流処理にとって安定した列名を確保できます。
sum/avgはrevenueが null の行を自動的に無視します。
Distinct と近似カウント
unique_orders = (
sales.groupBy("date")
.agg(
F.countDistinct("order_id").alias("orders_exact"),
F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
)
)- パフォーマンス重視で、わずかな誤差を許容できる場合は
approx_count_distinctを使います。
HAVING 相当の条件で集計結果をフィルタ
high_rev = daily.where(F.col("revenue_sum") > 150)- 集計後にフィルタして、SQL の
HAVINGと同じロジックを実現します。
結果の並び替え
ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")- 集計後にソートします。
orderByを指定しない限り、Spark は順序を保証しません。
null グループキーの扱い
null_country_stats = (
sales.groupBy("country")
.agg(F.count("*").alias("rows"))
)groupByは null キーも保持します。集計前にfillnaなどで補完するかどうかを設計時に決めておきます。
シンプルなクロス集計のための pivot
pivoted = (
sales.groupBy("date")
.pivot("channel", ["mobile", "web"])
.agg(F.sum("revenue"))
)- 列数が爆発するのを避けるため、pivot の値は明示的に制限しておきます。
最小限のエンドツーエンド集計例
summary = (
sales
.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.countDistinct("order_id").alias("unique_orders"),
)
.orderBy("date", "country")
)このパイプラインは、日次の安定したメトリクスを生成し、country の null の扱いを明示的に残したまま、下流での join やエクスポートに使いやすい列名を保ちます。
