Skip to content

PySpark groupBy et agrégation : des synthèses fiables à l’échelle

Updated on

Les rapports se dégradent lorsque les agrégats comptent en double, ignorent les groupes null ou masquent les problèmes de cardinalité. groupBy et agg de PySpark permettent de garder des rollups fiables, mais seulement si l’on choisit les bonnes fonctions et les bons alias.

Ce guide présente des schémas d’agrégation robustes : calculs multi-métriques, options de comptage distinct, gestion des groupes null, et tri des résultats pour un usage en aval.

Vous voulez créer rapidement des visualisations de données à partir d’un DataFrame Pandas en Python, sans code ?

PyGWalker est une bibliothèque Python pour l’analyse exploratoire de données avec visualisation. PyGWalker (opens in a new tab) peut simplifier votre flux de travail d’analyse et de visualisation dans Jupyter Notebook, en transformant votre pandas dataframe (et polars dataframe) en une interface utilisateur de type tableau pour l’exploration visuelle.

PyGWalker for Data visualization (opens in a new tab)

Choix d’agrégation

BesoinFonctionRemarques
Comptage (inclut les null)count("*")Rapide ; compte les lignes
Comptage distinctcountDistinct(col)Exact mais plus coûteux
Distinct approximatifapprox_count_distinct(col, rsd)Erreur paramétrable ; plus rapide
Somme/moyenne avec sécurité vis‑à‑vis des nullsum, avgIgnorent les null par défaut
Multi-métriques en un seul passageagg avec aliasGarde des noms stables

Mise en place

from pyspark.sql import SparkSession, functions as F
 
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
 
sales = spark.createDataFrame(
    [
        ("2025-11-01", "US", "mobile", 120.0, "A123"),
        ("2025-11-01", "US", "web",    80.0,  "A124"),
        ("2025-11-01", "CA", "mobile", None,  "A125"),
        ("2025-11-02", "US", "mobile", 200.0, "A126"),
        ("2025-11-02", None, "web",    50.0,  "A127"),
    ],
    "date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)

Multi-agrégation avec alias

daily = (
    sales.groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.avg("revenue").alias("revenue_avg"),
    )
)
  • Les alias stabilisent les noms de colonnes pour les traitements en aval.
  • Les lignes avec revenue nul sont ignorées par sum/avg.

Comptages distincts et approximatifs

unique_orders = (
    sales.groupBy("date")
    .agg(
        F.countDistinct("order_id").alias("orders_exact"),
        F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
    )
)
  • Utilisez approx_count_distinct quand la performance prime et qu’une petite marge d’erreur est acceptable.

Filtrer les agrégats avec une logique type HAVING

high_rev = daily.where(F.col("revenue_sum") > 150)
  • Filtrez après l’agrégation pour imiter HAVING en SQL.

Trier les résultats

ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")
  • Triez après l’agrégation ; Spark ne garantit pas l’ordre sans orderBy.

Gestion des clés de groupe null

null_country_stats = (
    sales.groupBy("country")
    .agg(F.count("*").alias("rows"))
)
  • groupBy conserve les clés null ; décidez si vous voulez les imputer (fillna) avant l’agrégation.

Pivots pour des tableaux croisés simples

pivoted = (
    sales.groupBy("date")
    .pivot("channel", ["mobile", "web"])
    .agg(F.sum("revenue"))
)
  • Limitez explicitement les valeurs de pivot pour éviter une explosion du nombre de colonnes.

Rollup minimal de bout en bout

summary = (
    sales
    .groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.countDistinct("order_id").alias("unique_orders"),
    )
    .orderBy("date", "country")
)

Cela produit des métriques journalières stables, respecte vos décisions de traitement pour les pays nuls, et garde des noms de colonnes prêts pour des jointures ou des exports en aval.