PySpark groupBy et agrégation : des synthèses fiables à l’échelle
Updated on
Les rapports se dégradent lorsque les agrégats comptent en double, ignorent les groupes null ou masquent les problèmes de cardinalité. groupBy et agg de PySpark permettent de garder des rollups fiables, mais seulement si l’on choisit les bonnes fonctions et les bons alias.
Ce guide présente des schémas d’agrégation robustes : calculs multi-métriques, options de comptage distinct, gestion des groupes null, et tri des résultats pour un usage en aval.
Vous voulez créer rapidement des visualisations de données à partir d’un DataFrame Pandas en Python, sans code ?
PyGWalker est une bibliothèque Python pour l’analyse exploratoire de données avec visualisation. PyGWalker (opens in a new tab) peut simplifier votre flux de travail d’analyse et de visualisation dans Jupyter Notebook, en transformant votre pandas dataframe (et polars dataframe) en une interface utilisateur de type tableau pour l’exploration visuelle.
Choix d’agrégation
| Besoin | Fonction | Remarques |
|---|---|---|
| Comptage (inclut les null) | count("*") | Rapide ; compte les lignes |
| Comptage distinct | countDistinct(col) | Exact mais plus coûteux |
| Distinct approximatif | approx_count_distinct(col, rsd) | Erreur paramétrable ; plus rapide |
| Somme/moyenne avec sécurité vis‑à‑vis des null | sum, avg | Ignorent les null par défaut |
| Multi-métriques en un seul passage | agg avec alias | Garde des noms stables |
Mise en place
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
sales = spark.createDataFrame(
[
("2025-11-01", "US", "mobile", 120.0, "A123"),
("2025-11-01", "US", "web", 80.0, "A124"),
("2025-11-01", "CA", "mobile", None, "A125"),
("2025-11-02", "US", "mobile", 200.0, "A126"),
("2025-11-02", None, "web", 50.0, "A127"),
],
"date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)Multi-agrégation avec alias
daily = (
sales.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.avg("revenue").alias("revenue_avg"),
)
)- Les alias stabilisent les noms de colonnes pour les traitements en aval.
- Les lignes avec
revenuenul sont ignorées parsum/avg.
Comptages distincts et approximatifs
unique_orders = (
sales.groupBy("date")
.agg(
F.countDistinct("order_id").alias("orders_exact"),
F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
)
)- Utilisez
approx_count_distinctquand la performance prime et qu’une petite marge d’erreur est acceptable.
Filtrer les agrégats avec une logique type HAVING
high_rev = daily.where(F.col("revenue_sum") > 150)- Filtrez après l’agrégation pour imiter
HAVINGen SQL.
Trier les résultats
ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")- Triez après l’agrégation ; Spark ne garantit pas l’ordre sans
orderBy.
Gestion des clés de groupe null
null_country_stats = (
sales.groupBy("country")
.agg(F.count("*").alias("rows"))
)groupByconserve les clés null ; décidez si vous voulez les imputer (fillna) avant l’agrégation.
Pivots pour des tableaux croisés simples
pivoted = (
sales.groupBy("date")
.pivot("channel", ["mobile", "web"])
.agg(F.sum("revenue"))
)- Limitez explicitement les valeurs de pivot pour éviter une explosion du nombre de colonnes.
Rollup minimal de bout en bout
summary = (
sales
.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.countDistinct("order_id").alias("unique_orders"),
)
.orderBy("date", "country")
)Cela produit des métriques journalières stables, respecte vos décisions de traitement pour les pays nuls, et garde des noms de colonnes prêts pour des jointures ou des exports en aval.
