Skip to content

Joins y Broadcast en PySpark: Elige el Join Correcto Siempre

Updated on

Los joins mal definidos pueden ocultar filas, duplicar columnas o hacer explotar el volumen de datos cuando hay skew en las keys. Elegir claramente el tipo de join y usar bien el broadcast mantiene los resultados correctos y los jobs rápidos.

PySpark ofrece varios tipos de join más hints de broadcast para controlar el comportamiento del shuffle. Esta guía muestra qué join usar en cada caso, cómo manejar nombres de columnas solapados y cuándo enviar una tabla pequeña a todos los ejecutores.

¿Quieres crear rápidamente visualizaciones de datos desde un DataFrame de Python Pandas sin escribir código?

PyGWalker es una librería de Python para Exploratory Data Analysis con visualización. PyGWalker (opens in a new tab) puede simplificar tu flujo de análisis y visualización de datos en Jupyter Notebook, convirtiendo tu pandas dataframe (y polars dataframe) en una interfaz tipo tableau para exploración visual.

PyGWalker for Data visualization (opens in a new tab)

Guía rápida de tipos de join

TypeConservaMejor uso
innerCoincidencias en ambos ladosEnriquecimiento estándar o filtrado
left / rightTodo el lado izquierdo/derecho + coincidenciasPreservar un lado aunque no haya match
fullTodas las filas, nulls donde falteAuditorías y checks de completitud
left_semiFilas de la izquierda con match, sin columnas de la derechaFiltrado por existencia
left_antiFilas de la izquierda sin matchAnti-join para exclusiones

Datos de ejemplo

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
 
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
 
users = spark.createDataFrame(
    [(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
    "user_id INT, name STRING, country STRING",
)
 
orders = spark.createDataFrame(
    [(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
    "user_id INT, order_id STRING, amount DOUBLE",
)

Joins inner y left con condiciones claras

inner_join = users.join(orders, on="user_id", how="inner")
 
left_join = users.join(orders, on="user_id", how="left")
  • Prefiere un on explícito para evitar cross joins accidentales.
  • Revisa los conteos de filas: el inner reduce filas; el left preserva todos los users.

Manejo de nombres de columnas duplicados

joined = (
    users.join(orders, on="user_id", how="left")
    .withColumnRenamed("name", "user_name")
)
  • Después del join, renombra las columnas que se solapan a nombres estables.
  • Alternativamente, selecciona solo las columnas necesarias antes de hacer el join.

Joins semi y anti para filtros

with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")
  • left_semi devuelve solo las columnas de users cuando existe un pedido.
  • left_anti devuelve los users sin pedidos; es eficiente para exclusiones.

Broadcast de tablas pequeñas para evitar skew

small_dim = spark.createDataFrame(
    [("US", "United States"), ("CA", "Canada")],
    "country STRING, country_name STRING",
)
 
joined_dim = users.join(broadcast(small_dim), "country", "left")
  • Haz broadcast cuando el lado pequeño quepa cómodamente en la memoria de los ejecutores (decenas de MB).
  • El broadcast evita el shuffle de ese lado, acelera los joins y reduce el impacto del skew.

Cuándo no hacer broadcast

  • Evita hacer broadcast de tablas grandes o no acotadas; hay riesgo de out-of-memory en los ejecutores.
  • No hagas broadcast si las keys están uniformemente distribuidas y los tamaños de tablas son similares; un shuffle join normal es suficiente.

Errores comunes y cómo corregirlos

  • Cross join no intencionado: asegúrate de definir on; Spark avisará sobre cartesian products.
  • Columnas duplicadas: renombra o selecciona antes del join para no confundir a los consumidores posteriores.
  • Keys con skew: haz broadcast de tablas de lookup pequeñas; considera salting para joins entre tablas de hechos con skew extremo.
  • Desajustes de tipos: castea las keys a tipos compatibles antes del join para evitar matches vacíos.

Patrón mínimo de extremo a extremo

result = (
    users
    .join(broadcast(small_dim), "country", "left")
    .join(orders, "user_id", "left")
    .select(
        "user_id",
        F.col("name").alias("user_name"),
        "country_name",
        "order_id",
        "amount",
    )
)

Este patrón mantiene las keys de join explícitas, maneja el enriquecimiento de dimensiones mediante broadcast y devuelve nombres de columnas limpios, listos para agregación o exportación.