Skip to content

PySpark groupBy und Aggregation: Präzise Zusammenfassungen im großen Maßstab

Updated on

Reports werden unzuverlässig, wenn Aggregationen doppelt zählen, Null‑Gruppen auslassen oder Kardinalitätsprobleme verschleiern. PySpark groupBy und agg halten Rollups korrekt – aber nur, wenn passende Funktionen und aussagekräftige Aliases verwendet werden.

Diese Anleitung zeigt robuste Aggregationsmuster: Berechnung mehrerer Metriken in einem Durchlauf, Optionen für distinct Counts, Umgang mit Null‑Gruppen und Sortierung der Ergebnisse für die Weiterverarbeitung.

Möchtest du schnell Data Visualization aus einem Python Pandas Dataframe ganz ohne Code erstellen?

PyGWalker ist eine Python‑Bibliothek für Exploratory Data Analysis mit Visualization. PyGWalker (opens in a new tab) kann deinen Jupyter Notebook‑Workflow für Datenanalyse und Data Visualization vereinfachen, indem es deinen pandas dataframe (und polars dataframe) in ein tableau‑alternatives User Interface für visuelle Exploration verwandelt.

PyGWalker for Data visualization (opens in a new tab)

Aggregationsoptionen

BedarfFunktionHinweise
Anzahl (inklusive Nulls)count("*")Schnell; zählt Zeilen
Distinct CountcountDistinct(col)Exakt, aber ressourcenintensiver
Approximativer Distinct Countapprox_count_distinct(col, rsd)Einstellbarer Fehler; schneller
Summe/Avg mit Null‑Sicherheitsum, avgIgnorieren Null‑Werte standardmäßig
Mehrere Metriken in einem Durchlaufagg mit AliasesHält Spaltennamen stabil

Setup

from pyspark.sql import SparkSession, functions as F
 
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
 
sales = spark.createDataFrame(
    [
        ("2025-11-01", "US", "mobile", 120.0, "A123"),
        ("2025-11-01", "US", "web",    80.0,  "A124"),
        ("2025-11-01", "CA", "mobile", None,  "A125"),
        ("2025-11-02", "US", "mobile", 200.0, "A126"),
        ("2025-11-02", None, "web",    50.0,  "A127"),
    ],
    "date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)

Multi‑Aggregation mit Aliases

daily = (
    sales.groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.avg("revenue").alias("revenue_avg"),
    )
)
  • Aliases halten Spaltennamen für nachgelagerte Schritte stabil.
  • Zeilen mit Null in revenue werden von sum/avg ignoriert.

Distinct und approximative Counts

unique_orders = (
    sales.groupBy("date")
    .agg(
        F.countDistinct("order_id").alias("orders_exact"),
        F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
    )
)
  • Verwende approx_count_distinct, wenn Performance wichtiger ist und ein kleiner Fehler tolerierbar ist.

Aggregate filtern mit Having‑ähnlicher Logik

high_rev = daily.where(F.col("revenue_sum") > 150)
  • Nach der Aggregation filtern, um SQL HAVING nachzuahmen.

Ergebnisse sortieren

ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")
  • Nach der Aggregation sortieren; Spark garantiert ohne orderBy keine Reihenfolge.

Umgang mit Null‑Gruppenschlüsseln

null_country_stats = (
    sales.groupBy("country")
    .agg(F.count("*").alias("rows"))
)
  • groupBy behält Null‑Keys bei; entscheide, ob du vor der Aggregation imputieren (fillna) möchtest.

Pivots für einfache Kreuztabellen

pivoted = (
    sales.groupBy("date")
    .pivot("channel", ["mobile", "web"])
    .agg(F.sum("revenue"))
)
  • Pivot‑Werte begrenzen, um eine Explosion der Spaltenanzahl zu vermeiden.

Minimaler End‑to‑End‑Rollup

summary = (
    sales
    .groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.countDistinct("order_id").alias("unique_orders"),
    )
    .orderBy("date", "country")
)

Dies erzeugt stabile tägliche Kennzahlen, erhält die getroffenen Entscheidungen zum Umgang mit Null‑Ländern und hält Spaltennamen für nachgelagerte Joins oder Exporte bereit.