Joins e Broadcast em PySpark: Escolha Sempre o Join Certo
Updated on
Joins mal feitos escondem linhas, duplicam colunas ou fazem os dados explodirem quando chaves enviesadas (skewed) colidem. Escolhas claras de join e uso inteligente de broadcast mantêm os resultados corretos e os jobs rápidos.
PySpark oferece vários tipos de join, além de hints de broadcast para controlar o comportamento de shuffle. Este guia mostra o join certo para cada necessidade, como lidar com nomes de colunas sobrepostos e quando enviar uma tabela pequena para todos os executores.
Quer criar rapidamente Data Visualization a partir de um DataFrame do Python Pandas sem escrever código?
PyGWalker é uma biblioteca Python para Exploratory Data Analysis com Visualização. O PyGWalker (opens in a new tab) pode simplificar o fluxo de análise e visualização de dados no seu Jupyter Notebook, transformando seu pandas dataframe (e polars dataframe) em uma interface do usuário alternativa ao Tableau para exploração visual.
Guia rápido de tipos de join
| Type | Keeps | Best for |
|---|---|---|
inner | Linhas que casam em ambos os lados | Enriquecimento ou filtro padrão |
left / right | Todas as linhas da esquerda/direita + correspondências | Preservar um dos lados independentemente de haver match |
full | Todas as linhas, null onde não há match | Auditorias e checagens de completude |
left_semi | Linhas da esquerda com match, sem colunas da direita | Filtro por existência |
left_anti | Linhas da esquerda sem match | Anti-join para exclusões |
Dados de exemplo
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
users = spark.createDataFrame(
[(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
"user_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
"user_id INT, order_id STRING, amount DOUBLE",
)Inner e left joins com condições claras
inner_join = users.join(orders, on="user_id", how="inner")
left_join = users.join(orders, on="user_id", how="left")- Prefira
onexplícito para evitar cross joins acidentais. - Revise as contagens de linhas: o
innerreduz linhas; oleftpreserva todas as deusers.
Tratando nomes de colunas duplicados
joined = (
users.join(orders, on="user_id", how="left")
.withColumnRenamed("name", "user_name")
)- Após o join, renomeie colunas sobrepostas para nomes estáveis.
- Alternativamente, selecione apenas as colunas necessárias antes de fazer o join.
Semi e anti joins para filtros
with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")left_semiretorna apenas colunas deusersquando existe um pedido.left_antiretornauserssem pedidos; eficiente para exclusões.
Fazer broadcast de tabelas pequenas para evitar skew
small_dim = spark.createDataFrame(
[("US", "United States"), ("CA", "Canada")],
"country STRING, country_name STRING",
)
joined_dim = users.join(broadcast(small_dim), "country", "left")- Faça broadcast quando o lado menor cabe confortavelmente na memória de cada executor (dezenas de MB).
- O broadcast evita shuffle desse lado, acelerando joins e reduzindo o impacto de skew.
Quando não fazer broadcast
- Evite broadcast de tabelas grandes ou sem limite claro de tamanho; há risco de falta de memória nos executores.
- Não faça broadcast se as chaves forem bem distribuídas e os tamanhos das tabelas forem semelhantes; o shuffle join padrão é suficiente.
Armadilhas comuns e como corrigir
- Cross join indesejado: garanta que o parâmetro
onesteja definido; o Spark irá avisar sobre produtos cartesianos. - Colunas duplicadas: renomeie ou selecione antes do join para evitar confundir consumidores downstream.
- Chaves enviesadas (skewed): faça broadcast de tabelas de lookup pequenas; considere salting para joins entre fatos extremamente enviesados.
- Tipos incompatíveis: faça
castdas chaves para tipos compatíveis antes do join para evitar matches vazios.
Padrão mínimo de ponta a ponta
result = (
users
.join(broadcast(small_dim), "country", "left")
.join(orders, "user_id", "left")
.select(
"user_id",
F.col("name").alias("user_name"),
"country_name",
"order_id",
"amount",
)
)Esse padrão mantém as chaves de join explícitas, faz o enriquecimento de dimensões via broadcast e retorna nomes de colunas limpos, prontos para agregação ou exportação.
