PySpark groupBy e Aggregation: Resumos Precisos em Escala
Updated on
Relatórios falham quando agregações fazem dupla contagem, ignoram grupos nulos ou escondem problemas de cardinalidade. O groupBy e o agg do PySpark mantêm os rollups corretos, mas apenas quando são escolhidas as funções e aliases adequados.
Este guia mostra padrões confiáveis de agregação: cálculo de múltiplas métricas em uma única passada, opções de contagem distinta, tratamento de grupos nulos e ordenação de resultados para uso posterior.
Quer criar rapidamente Data Visualization a partir de um DataFrame do Python Pandas sem escrever código?
PyGWalker é uma biblioteca Python para Exploratory Data Analysis com Visualização. O PyGWalker (opens in a new tab) pode simplificar o fluxo de trabalho de análise e visualização de dados no seu Jupyter Notebook, transformando seu pandas dataframe (e polars dataframe) em uma interface tipo tableau para exploração visual.
Opções de agregação
| Necessidade | Função | Observações |
|---|---|---|
| Contagens (inclui nulls) | count("*") | Rápido; conta linhas |
| Contagem distinta | countDistinct(col) | Exato, mas mais pesado |
| Distinto aproximado | approx_count_distinct(col, rsd) | Erro ajustável; mais rápido |
| Soma/média com segurança para null | sum, avg | Ignoram nulls por padrão |
| Múltiplas métricas em uma passada | agg com aliases | Mantém nomes estáveis |
Configuração
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
sales = spark.createDataFrame(
[
("2025-11-01", "US", "mobile", 120.0, "A123"),
("2025-11-01", "US", "web", 80.0, "A124"),
("2025-11-01", "CA", "mobile", None, "A125"),
("2025-11-02", "US", "mobile", 200.0, "A126"),
("2025-11-02", None, "web", 50.0, "A127"),
],
"date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)Multi‑agregação com aliases
daily = (
sales.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.avg("revenue").alias("revenue_avg"),
)
)- Aliases mantêm estáveis os nomes das colunas usadas depois.
- Linhas com
revenuenulo são ignoradas porsum/avg.
Contagens distintas e aproximadas
unique_orders = (
sales.groupBy("date")
.agg(
F.countDistinct("order_id").alias("orders_exact"),
F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
)
)- Use
approx_count_distinctquando desempenho for importante e um pequeno erro for aceitável.
Filtrando agregados com lógica tipo HAVING
high_rev = daily.where(F.col("revenue_sum") > 150)- Faça o filtro após a agregação para imitar o
HAVINGdo SQL.
Ordenando resultados
ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")- Ordene após a agregação; o Spark não garante ordenação sem
orderBy.
Tratando chaves de grupo nulas
null_country_stats = (
sales.groupBy("country")
.agg(F.count("*").alias("rows"))
)groupBymantém chaves nulas; decida se deve imputar (fillna) antes da agregação.
Pivots para cross‑tabs simples
pivoted = (
sales.groupBy("date")
.pivot("channel", ["mobile", "web"])
.agg(F.sum("revenue"))
)- Restrinja os valores do pivot para evitar explosão de colunas.
Rollup mínimo de ponta a ponta
summary = (
sales
.groupBy("date", "country")
.agg(
F.count("*").alias("orders"),
F.sum("revenue").alias("revenue_sum"),
F.countDistinct("order_id").alias("unique_orders"),
)
.orderBy("date", "country")
)Isso produz métricas diárias estáveis, preserva as decisões sobre tratamento de country nulo e mantém os nomes das colunas prontos para joins ou exportações posteriores.
