Skip to content

Joins e Broadcast em PySpark: Escolha Sempre o Join Certo

Updated on

Joins mal feitos escondem linhas, duplicam colunas ou fazem os dados explodirem quando chaves enviesadas (skewed) colidem. Escolhas claras de join e uso inteligente de broadcast mantêm os resultados corretos e os jobs rápidos.

PySpark oferece vários tipos de join, além de hints de broadcast para controlar o comportamento de shuffle. Este guia mostra o join certo para cada necessidade, como lidar com nomes de colunas sobrepostos e quando enviar uma tabela pequena para todos os executores.

Quer criar rapidamente Data Visualization a partir de um DataFrame do Python Pandas sem escrever código?

PyGWalker é uma biblioteca Python para Exploratory Data Analysis com Visualização. O PyGWalker (opens in a new tab) pode simplificar o fluxo de análise e visualização de dados no seu Jupyter Notebook, transformando seu pandas dataframe (e polars dataframe) em uma interface do usuário alternativa ao Tableau para exploração visual.

PyGWalker for Data visualization (opens in a new tab)

Guia rápido de tipos de join

TypeKeepsBest for
innerLinhas que casam em ambos os ladosEnriquecimento ou filtro padrão
left / rightTodas as linhas da esquerda/direita + correspondênciasPreservar um dos lados independentemente de haver match
fullTodas as linhas, null onde não há matchAuditorias e checagens de completude
left_semiLinhas da esquerda com match, sem colunas da direitaFiltro por existência
left_antiLinhas da esquerda sem matchAnti-join para exclusões

Dados de exemplo

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
 
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
 
users = spark.createDataFrame(
    [(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
    "user_id INT, name STRING, country STRING",
)
 
orders = spark.createDataFrame(
    [(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
    "user_id INT, order_id STRING, amount DOUBLE",
)

Inner e left joins com condições claras

inner_join = users.join(orders, on="user_id", how="inner")
 
left_join = users.join(orders, on="user_id", how="left")
  • Prefira on explícito para evitar cross joins acidentais.
  • Revise as contagens de linhas: o inner reduz linhas; o left preserva todas as de users.

Tratando nomes de colunas duplicados

joined = (
    users.join(orders, on="user_id", how="left")
    .withColumnRenamed("name", "user_name")
)
  • Após o join, renomeie colunas sobrepostas para nomes estáveis.
  • Alternativamente, selecione apenas as colunas necessárias antes de fazer o join.

Semi e anti joins para filtros

with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")
  • left_semi retorna apenas colunas de users quando existe um pedido.
  • left_anti retorna users sem pedidos; eficiente para exclusões.

Fazer broadcast de tabelas pequenas para evitar skew

small_dim = spark.createDataFrame(
    [("US", "United States"), ("CA", "Canada")],
    "country STRING, country_name STRING",
)
 
joined_dim = users.join(broadcast(small_dim), "country", "left")
  • Faça broadcast quando o lado menor cabe confortavelmente na memória de cada executor (dezenas de MB).
  • O broadcast evita shuffle desse lado, acelerando joins e reduzindo o impacto de skew.

Quando não fazer broadcast

  • Evite broadcast de tabelas grandes ou sem limite claro de tamanho; há risco de falta de memória nos executores.
  • Não faça broadcast se as chaves forem bem distribuídas e os tamanhos das tabelas forem semelhantes; o shuffle join padrão é suficiente.

Armadilhas comuns e como corrigir

  • Cross join indesejado: garanta que o parâmetro on esteja definido; o Spark irá avisar sobre produtos cartesianos.
  • Colunas duplicadas: renomeie ou selecione antes do join para evitar confundir consumidores downstream.
  • Chaves enviesadas (skewed): faça broadcast de tabelas de lookup pequenas; considere salting para joins entre fatos extremamente enviesados.
  • Tipos incompatíveis: faça cast das chaves para tipos compatíveis antes do join para evitar matches vazios.

Padrão mínimo de ponta a ponta

result = (
    users
    .join(broadcast(small_dim), "country", "left")
    .join(orders, "user_id", "left")
    .select(
        "user_id",
        F.col("name").alias("user_name"),
        "country_name",
        "order_id",
        "amount",
    )
)

Esse padrão mantém as chaves de join explícitas, faz o enriquecimento de dimensões via broadcast e retorna nomes de colunas limpos, prontos para agregação ou exportação.