PySpark groupBy und Aggregation: Präzise Zusammenfassungen im großen Maßstab
Updated on
Reports werden unzuverlässig, wenn Aggregationen doppelt zählen, Null‑Gruppen auslassen oder Kardinalitätsprobleme verschleiern. PySpark groupBy und agg halten Rollups korrekt – aber nur, wenn passende Funktionen und aussagekräftige Aliases verwendet werden.
Diese Anleitung zeigt robuste Aggregationsmuster: Berechnung mehrerer Metriken in einem Durchlauf, Optionen für distinct Counts, Umgang mit Null‑Gruppen und Sortierung der Ergebnisse für die Weiterverarbeitung.
Möchtest du schnell Data Visualization aus einem Python Pandas Dataframe ganz ohne Code erstellen?
PyGWalker ist eine Python‑Bibliothek für Exploratory Data Analysis mit Visualization. PyGWalker (opens in a new tab) kann deinen Jupyter Notebook‑Workflow für Datenanalyse und Data Visualization vereinfachen, indem es deinen pandas dataframe (und polars dataframe) in ein tableau‑alternatives User Interface für visuelle Exploration verwandelt.
Aggregationsoptionen
| Bedarf | Funktion | Hinweise |
|---|---|---|
| Anzahl (inklusive Nulls) | count("*") | Schnell; zählt Zeilen |
| Distinct Count | countDistinct(col) | Exakt, aber ressourcenintensiver |
| Approximativer Distinct Count | approx_count_distinct(col, rsd) | Einstellbarer Fehler; schneller |
| Summe/Avg mit Null‑Sicherheit | sum, avg | Ignorieren Null‑Werte standardmäßig |
| Mehrere Metriken in einem Durchlauf | agg mit Aliases | Hält Spaltennamen stabil |
Setup
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
sales = spark.createDataFrame(
[
("2025-11-01", "US", "mobile", 120.0, "A123"),
("2025-11-01", "US", "web", 80.0, "A124"),
("2025-11-01", "CA", "mobile", None, "A125"),
("2025-11-02", "US", "mobile", 200.0, "A126"),
("2025-11-02", None, "web", 50.0, "A127"),
],
"date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)Multi‑Aggregation mit Aliases
daily = (
sales.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.avg("revenue").alias("revenue_avg"),
)
)- Aliases halten Spaltennamen für nachgelagerte Schritte stabil.
- Zeilen mit Null in
revenuewerden vonsum/avgignoriert.
Distinct und approximative Counts
unique_orders = (
sales.groupBy("date")
.agg(
F.countDistinct("order_id").alias("orders_exact"),
F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
)
)- Verwende
approx_count_distinct, wenn Performance wichtiger ist und ein kleiner Fehler tolerierbar ist.
Aggregate filtern mit Having‑ähnlicher Logik
high_rev = daily.where(F.col("revenue_sum") > 150)- Nach der Aggregation filtern, um SQL
HAVINGnachzuahmen.
Ergebnisse sortieren
ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")- Nach der Aggregation sortieren; Spark garantiert ohne
orderBykeine Reihenfolge.
Umgang mit Null‑Gruppenschlüsseln
null_country_stats = (
sales.groupBy("country")
.agg(F.count("*").alias("rows"))
)groupBybehält Null‑Keys bei; entscheide, ob du vor der Aggregation imputieren (fillna) möchtest.
Pivots für einfache Kreuztabellen
pivoted = (
sales.groupBy("date")
.pivot("channel", ["mobile", "web"])
.agg(F.sum("revenue"))
)- Pivot‑Werte begrenzen, um eine Explosion der Spaltenanzahl zu vermeiden.
Minimaler End‑to‑End‑Rollup
summary = (
sales
.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.countDistinct("order_id").alias("unique_orders"),
)
.orderBy("date", "country")
)Dies erzeugt stabile tägliche Kennzahlen, erhält die getroffenen Entscheidungen zum Umgang mit Null‑Ländern und hält Spaltennamen für nachgelagerte Joins oder Exporte bereit.
