Skip to content

PySpark groupBy と Aggregation: スケールする正確なサマリー集計

Updated on

集計結果が二重カウントしていたり、null グループを取りこぼしていたり、カーディナリティの問題を隠してしまうと、レポーティングは一気に信頼できなくなります。PySpark の groupByagg はロールアップを正確に保てますが、そのためには適切な関数とエイリアスの選択が欠かせません。

このガイドでは、信頼できる集計パターンを紹介します。複数メトリクスの同時計算、distinct カウントの選択肢、null グループの扱い方、下流処理で使いやすい結果の並び替えまでカバーします。

コードを書かずに、Python Pandas Dataframe からすばやく Data Visualization を作りたいですか?

PyGWalker は Visualization 付きの Exploratory Data Analysis のための Python ライブラリです。PyGWalker (opens in a new tab) を使うと、pandas dataframe(および polars dataframe)を、可視化探索のための tableau 代替 UI に変換し、Jupyter Notebook でのデータ分析・データ可視化ワークフローをシンプルにできます。

PyGWalker for Data visualization (opens in a new tab)

集計関数の選び方

NeedFunctionNotes
件数(null を含む)count("*")高速・行数をカウント
Distinct 件数countDistinct(col)正確だが重め
近似 distinct 件数approx_count_distinct(col, rsd)誤差を調整可能・高速
null セーフな sum/avgsum, avgデフォルトで null を無視
1 パスで複数メトリクスagg + alias列名を安定させる

セットアップ

from pyspark.sql import SparkSession, functions as F
 
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
 
sales = spark.createDataFrame(
    [
        ("2025-11-01", "US", "mobile", 120.0, "A123"),
        ("2025-11-01", "US", "web",    80.0,  "A124"),
        ("2025-11-01", "CA", "mobile", None,  "A125"),
        ("2025-11-02", "US", "mobile", 200.0, "A126"),
        ("2025-11-02", None, "web",    50.0,  "A127"),
    ],
    "date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)

エイリアス付きの複数集計

daily = (
    sales.groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.avg("revenue").alias("revenue_avg"),
    )
)
  • エイリアスで下流処理にとって安定した列名を確保できます。
  • sum / avgrevenue が null の行を自動的に無視します。

Distinct と近似カウント

unique_orders = (
    sales.groupBy("date")
    .agg(
        F.countDistinct("order_id").alias("orders_exact"),
        F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
    )
)
  • パフォーマンス重視で、わずかな誤差を許容できる場合は approx_count_distinct を使います。

HAVING 相当の条件で集計結果をフィルタ

high_rev = daily.where(F.col("revenue_sum") > 150)
  • 集計後にフィルタして、SQL の HAVING と同じロジックを実現します。

結果の並び替え

ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")
  • 集計後にソートします。orderBy を指定しない限り、Spark は順序を保証しません。

null グループキーの扱い

null_country_stats = (
    sales.groupBy("country")
    .agg(F.count("*").alias("rows"))
)
  • groupBy は null キーも保持します。集計前に fillna などで補完するかどうかを設計時に決めておきます。

シンプルなクロス集計のための pivot

pivoted = (
    sales.groupBy("date")
    .pivot("channel", ["mobile", "web"])
    .agg(F.sum("revenue"))
)
  • 列数が爆発するのを避けるため、pivot の値は明示的に制限しておきます。

最小限のエンドツーエンド集計例

summary = (
    sales
    .groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.countDistinct("order_id").alias("unique_orders"),
    )
    .orderBy("date", "country")
)

このパイプラインは、日次の安定したメトリクスを生成し、country の null の扱いを明示的に残したまま、下流での join やエクスポートに使いやすい列名を保ちます。