Skip to content

PySpark groupBy e Aggregation: Resumos Precisos em Escala

Updated on

Relatórios falham quando agregações fazem dupla contagem, ignoram grupos nulos ou escondem problemas de cardinalidade. O groupBy e o agg do PySpark mantêm os rollups corretos, mas apenas quando são escolhidas as funções e aliases adequados.

Este guia mostra padrões confiáveis de agregação: cálculo de múltiplas métricas em uma única passada, opções de contagem distinta, tratamento de grupos nulos e ordenação de resultados para uso posterior.

Quer criar rapidamente Data Visualization a partir de um DataFrame do Python Pandas sem escrever código?

PyGWalker é uma biblioteca Python para Exploratory Data Analysis com Visualização. O PyGWalker (opens in a new tab) pode simplificar o fluxo de trabalho de análise e visualização de dados no seu Jupyter Notebook, transformando seu pandas dataframe (e polars dataframe) em uma interface tipo tableau para exploração visual.

PyGWalker for Data visualization (opens in a new tab)

Opções de agregação

NecessidadeFunçãoObservações
Contagens (inclui nulls)count("*")Rápido; conta linhas
Contagem distintacountDistinct(col)Exato, mas mais pesado
Distinto aproximadoapprox_count_distinct(col, rsd)Erro ajustável; mais rápido
Soma/média com segurança para nullsum, avgIgnoram nulls por padrão
Múltiplas métricas em uma passadaagg com aliasesMantém nomes estáveis

Configuração

from pyspark.sql import SparkSession, functions as F
 
spark = SparkSession.builder.appName("groupby-agg").getOrCreate()
 
sales = spark.createDataFrame(
    [
        ("2025-11-01", "US", "mobile", 120.0, "A123"),
        ("2025-11-01", "US", "web",    80.0,  "A124"),
        ("2025-11-01", "CA", "mobile", None,  "A125"),
        ("2025-11-02", "US", "mobile", 200.0, "A126"),
        ("2025-11-02", None, "web",    50.0,  "A127"),
    ],
    "date STRING, country STRING, channel STRING, revenue DOUBLE, order_id STRING",
)

Multi‑agregação com aliases

daily = (
    sales.groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.avg("revenue").alias("revenue_avg"),
    )
)
  • Aliases mantêm estáveis os nomes das colunas usadas depois.
  • Linhas com revenue nulo são ignoradas por sum/avg.

Contagens distintas e aproximadas

unique_orders = (
    sales.groupBy("date")
    .agg(
        F.countDistinct("order_id").alias("orders_exact"),
        F.approx_count_distinct("order_id", 0.02).alias("orders_approx"),
    )
)
  • Use approx_count_distinct quando desempenho for importante e um pequeno erro for aceitável.

Filtrando agregados com lógica tipo HAVING

high_rev = daily.where(F.col("revenue_sum") > 150)
  • Faça o filtro após a agregação para imitar o HAVING do SQL.

Ordenando resultados

ranked = daily.orderBy(F.col("revenue_sum").desc(), "date")
  • Ordene após a agregação; o Spark não garante ordenação sem orderBy.

Tratando chaves de grupo nulas

null_country_stats = (
    sales.groupBy("country")
    .agg(F.count("*").alias("rows"))
)
  • groupBy mantém chaves nulas; decida se deve imputar (fillna) antes da agregação.

Pivots para cross‑tabs simples

pivoted = (
    sales.groupBy("date")
    .pivot("channel", ["mobile", "web"])
    .agg(F.sum("revenue"))
)
  • Restrinja os valores do pivot para evitar explosão de colunas.

Rollup mínimo de ponta a ponta

summary = (
    sales
    .groupBy("date", "country")
    .agg(
        F.count("*").alias("orders"),
        F.sum("revenue").alias("revenue_sum"),
        F.countDistinct("order_id").alias("unique_orders"),
    )
    .orderBy("date", "country")
)

Isso produz métricas diárias estáveis, preserva as decisões sobre tratamento de country nulo e mantém os nomes das colunas prontos para joins ou exportações posteriores.