PySpark groupBy y agregación: resúmenes precisos a escala
Updated on
Los reportes fallan cuando los agregados cuentan dos veces, se saltan grupos null o esconden problemas de cardinalidad. groupBy y agg de PySpark mantienen los rollups correctos, pero solo cuando se eligen bien las funciones y los alias.
Esta guía muestra patrones fiables de agregación: cálculos multi-métrica, opciones de conteo distinto, manejo de grupos null y ordenación de resultados para uso posterior.
¿Quieres crear rápidamente Data Visualization desde un DataFrame de Python Pandas sin escribir código?
PyGWalker es una librería de Python para Exploratory Data Analysis con visualización. PyGWalker (opens in a new tab) puede simplificar tu flujo de análisis y visualización de datos en Jupyter Notebook, convirtiendo tu pandas dataframe (y polars dataframe) en una interfaz de usuario tipo tableau para exploración visual.
Opciones de agregación
| Necesidad | Función | Notas |
|---|---|---|
| Conteos (incluye nulls) | count("*") | Rápido; cuenta filas |
| Conteo distinto | countDistinct(col) | Exacto pero más pesado |
| Distinto aproximado | approx_count_distinct(col, rsd) | Error ajustable; más rápido |
| Sum/avg con seguridad frente a null | sum, avg | Ignoran null por defecto |
| Múltiples métricas en una pasada | agg con alias | Mantiene nombres estables |
Configuración
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
sales = spark.createDataFrame(
[
("2025-11-01", "US", "mobile", 120.0, "A123"),
("2025-11-01", "US", "web", 80.0, "A124"),
("2025-11-01", "CA", "mobile", None, "A125"),
("2025-11-02", "US", "mobile", 200.0, "A126"),
("2025-11-02", None, "web", 50.0, "A127"),
],
"date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)Multi-agregación con alias
daily = (
sales.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.avg("revenue").alias("revenue_avg"),
)
)- Los alias mantienen estables los nombres de columnas aguas abajo.
- Las filas con
revenuenull se ignoran ensum/avg.
Conteos distintos y aproximados
unique_orders = (
sales.groupBy("date")
.agg(
F.countDistinct("order_id").alias("orders_exact"),
F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
)
)- Usa
approx_count_distinctcuando el rendimiento importa y un pequeño error es aceptable.
Filtrar agregados con lógica tipo HAVING
high_rev = daily.where(F.col("revenue_sum") > 150)- Filtra después de la agregación para imitar
HAVINGde SQL.
Ordenar resultados
ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")- Ordena después de agregar; Spark no garantiza el orden sin
orderBy.
Manejo de claves de grupo null
null_country_stats = (
sales.groupBy("country")
.agg(F.count("*").alias("rows"))
)groupByconserva claves null; decide si imputar (fillna) antes de agregar.
Pivots para tablas cruzadas simples
pivoted = (
sales.groupBy("date")
.pivot("channel", ["mobile", "web"])
.agg(F.sum("revenue"))
)- Restringe los valores de pivot para evitar explosión de columnas.
Rollup mínimo de extremo a extremo
summary = (
sales
.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.countDistinct("order_id").alias("unique_orders"),
)
.orderBy("date", "country")
)Esto produce métricas diarias estables, respeta las decisiones sobre cómo tratar países null y mantiene nombres de columnas listos para joins posteriores o exportaciones.
