PySpark 조인과 브로드캐스트: 매번 올바른 조인 선택하기
Updated on
잘못된 조인은 행을 숨기거나, 컬럼을 중복해서 생성하거나, 스키유가 있는 키가 충돌할 때 데이터가 폭증하게 만듭니다. 명확한 조인 선택과 적절한 브로드캐스트 사용은 결과의 정확성과 잡의 성능을 모두 지켜줍니다.
PySpark는 여러 조인 타입과 함께 셔플 동작을 제어하기 위한 브로드캐스트 힌트를 제공합니다. 이 가이드는 상황별로 어떤 조인을 써야 하는지, 겹치는 컬럼 이름을 어떻게 다뤄야 하는지, 작은 테이블을 언제 실행기(executor)에 브로드캐스트해야 하는지를 다룹니다.
Python Pandas Dataframe에서 코드 없이 빠르게 Data Visualization을 만들고 싶나요?
PyGWalker는 시각화를 활용한 Exploratory Data Analysis를 위한 Python 라이브러리입니다. PyGWalker (opens in a new tab)는 pandas dataframe(및 polars dataframe)을 시각적 탐색을 위한 tableau 대체 UI로 바꿔 줌으로써, Jupyter Notebook에서의 데이터 분석과 데이터 시각화 워크플로를 단순화합니다.
조인 타입 빠른 요약
| Type | 유지되는 행 | 적합한 용도 |
|---|---|---|
inner | 양쪽에 모두 매칭되는 행 | 표준적인 데이터 결합(enrich) 또는 필터링 |
left / right | 왼쪽/오른쪽 전체 + 매칭되는 행 | 한쪽 테이블을 매칭 여부와 무관하게 유지해야 할 때 |
full | 모든 행, 매칭 안 된 쪽은 null | 데이터 감사(audit), 완전성(completeness) 점검 |
left_semi | 매칭이 있는 왼쪽 행만, 오른쪽 컬럼은 없음 | 존재 여부 기반 필터링 |
left_anti | 매칭이 없는 왼쪽 행만 | 제외(anti-join) 로직 구현 |
샘플 데이터
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("joins-broadcast").getOrCreate()
users = spark.createDataFrame(
[(1, "Alice", "US"), (2, "Bob", "CA"), (3, "Cara", "US")],
"user_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[(1, "A-100", 120.0), (1, "A-101", 80.0), (3, "A-102", 50.0)],
"user_id INT, order_id STRING, amount DOUBLE",
)Inner / Left 조인과 명시적인 조인 조건
inner_join = users.join(orders, on="user_id", how="inner")
left_join = users.join(orders, on="user_id", how="left")- 예기치 않은 cross join을 피하려면
on을 명시적으로 사용하는 편이 좋습니다. - 행 수를 확인하세요: inner 조인은 행 수를 줄이고, left 조인은 모든
users행을 유지합니다.
중복 컬럼 이름 처리
joined = (
users.join(orders, on="user_id", how="left")
.withColumnRenamed("name", "user_name")
)- 조인 후에 겹치는 컬럼들은 의미 있는 고정 이름으로 rename 하는 것이 좋습니다.
- 또는, 조인 전에 필요한 컬럼만
select해서 가져오는 방식도 가능합니다.
필터용 Semi / Anti 조인
with_orders = users.join(orders, "user_id", "left_semi")
without_orders = users.join(orders, "user_id", "left_anti")left_semi는 주문이 존재하는 경우에만users의 행을 반환하며, 오른쪽 테이블의 컬럼은 포함하지 않습니다.left_anti는 주문이 전혀 없는users만 반환하며, 제외 조건을 구현할 때 효율적입니다.
스키유를 피하기 위한 작은 테이블 브로드캐스트
small_dim = spark.createDataFrame(
[("US", "United States"), ("CA", "Canada")],
"country STRING, country_name STRING",
)
joined_dim = users.join(broadcast(small_dim), "country", "left")- 작은 쪽 테이블이 실행기 메모리에 여유 있게 들어갈 정도(수십 MB 수준)일 때 브로드캐스트를 고려합니다.
- 브로드캐스트하면 그쪽 테이블에 대한 셔플을 건너뛰기 때문에 조인이 빨라지고, 키 스키유의 영향을 줄일 수 있습니다.
브로드캐스트를 하면 안 되는 경우
- 큰 테이블이나 상한이 불명확한 테이블은 브로드캐스트를 피해야 합니다. 실행기에서 out-of-memory가 날 수 있습니다.
- 조인 키 분포가 고르게 퍼져 있고, 두 테이블의 크기가 비슷하다면 굳이 브로드캐스트할 필요가 없습니다. 일반 셔플 조인으로 충분합니다.
자주 발생하는 문제와 해결책
- 의도치 않은 cross join:
on을 반드시 지정하세요. cartesian product가 발생하면 Spark가 경고를 표시합니다. - 중복 컬럼: 조인 전후로
select/rename을 사용해 컬럼 이름 충돌을 제거하여, 이후 소비자 코드에서 혼란이 없도록 합니다. - 스키유 키(skewed keys): 작은 lookup 테이블은 브로드캐스트하고, fact-to-fact 조인에서 극단적으로 스키유된 키가 있다면 salting 같은 기법을 고려합니다.
- 타입 불일치: 조인 키의 데이터 타입을 사전에 맞춰 캐스팅하여, 눈에 안 보이는 불일치로 인해 매칭이 전혀 안 나오는 상황을 피합니다.
최소한의 엔드 투 엔드 패턴
result = (
users
.join(broadcast(small_dim), "country", "left")
.join(orders, "user_id", "left")
.select(
"user_id",
F.col("name").alias("user_name"),
"country_name",
"order_id",
"amount",
)
)이 패턴은 조인 키를 명시적으로 지정하고, 차원 테이블 enrichment에는 브로드캐스트를 사용하며, 집계나 export에 바로 사용할 수 있는 깔끔한 컬럼 이름을 반환하도록 구성한 예시입니다.
