Skip to content

PySpark groupBy y agregación: resúmenes precisos a escala

Updated on

Los reportes fallan cuando los agregados cuentan dos veces, se saltan grupos null o esconden problemas de cardinalidad. groupBy y agg de PySpark mantienen los rollups correctos, pero solo cuando se eligen bien las funciones y los alias.

Esta guía muestra patrones fiables de agregación: cálculos multi-métrica, opciones de conteo distinto, manejo de grupos null y ordenación de resultados para uso posterior.

¿Quieres crear rápidamente Data Visualization desde un DataFrame de Python Pandas sin escribir código?

PyGWalker es una librería de Python para Exploratory Data Analysis con visualización. PyGWalker (opens in a new tab) puede simplificar tu flujo de análisis y visualización de datos en Jupyter Notebook, convirtiendo tu pandas dataframe (y polars dataframe) en una interfaz de usuario tipo tableau para exploración visual.

PyGWalker for Data visualization (opens in a new tab)

Opciones de agregación

NecesidadFunciónNotas
Conteos (incluye nulls)count("*")Rápido; cuenta filas
Conteo distintocountDistinct(col)Exacto pero más pesado
Distinto aproximadoapprox_count_distinct(col, rsd)Error ajustable; más rápido
Sum/avg con seguridad frente a nullsum, avgIgnoran null por defecto
Múltiples métricas en una pasadaagg con aliasMantiene nombres estables

Configuración

from pyspark.sql import SparkSession, functions as F
 
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
 
sales = spark.createDataFrame(
    [
        ("2025-11-01", "US", "mobile", 120.0, "A123"),
        ("2025-11-01", "US", "web",    80.0,  "A124"),
        ("2025-11-01", "CA", "mobile", None,  "A125"),
        ("2025-11-02", "US", "mobile", 200.0, "A126"),
        ("2025-11-02", None, "web",    50.0,  "A127"),
    ],
    "date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)

Multi-agregación con alias

daily = (
    sales.groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.avg("revenue").alias("revenue_avg"),
    )
)
  • Los alias mantienen estables los nombres de columnas aguas abajo.
  • Las filas con revenue null se ignoran en sum/avg.

Conteos distintos y aproximados

unique_orders = (
    sales.groupBy("date")
    .agg(
        F.countDistinct("order_id").alias("orders_exact"),
        F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
    )
)
  • Usa approx_count_distinct cuando el rendimiento importa y un pequeño error es aceptable.

Filtrar agregados con lógica tipo HAVING

high_rev = daily.where(F.col("revenue_sum") > 150)
  • Filtra después de la agregación para imitar HAVING de SQL.

Ordenar resultados

ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")
  • Ordena después de agregar; Spark no garantiza el orden sin orderBy.

Manejo de claves de grupo null

null_country_stats = (
    sales.groupBy("country")
    .agg(F.count("*").alias("rows"))
)
  • groupBy conserva claves null; decide si imputar (fillna) antes de agregar.

Pivots para tablas cruzadas simples

pivoted = (
    sales.groupBy("date")
    .pivot("channel", ["mobile", "web"])
    .agg(F.sum("revenue"))
)
  • Restringe los valores de pivot para evitar explosión de columnas.

Rollup mínimo de extremo a extremo

summary = (
    sales
    .groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.countDistinct("order_id").alias("unique_orders"),
    )
    .orderBy("date", "country")
)

Esto produce métricas diarias estables, respeta las decisiones sobre cómo tratar países null y mantiene nombres de columnas listos para joins posteriores o exportaciones.