PySpark UDF vs Pandas UDF vs mapInPandas: 무엇을 사용해야 할까?
PySpark에서 커스텀 로직이 필요할 때, 보통 다음 세 가지 중 하나를 선택하게 됩니다:
- 일반 Python UDF (
udf(...)) - Pandas UDF (
@pandas_udf) mapInPandas(DataFrame → Pandas DataFrame 이터레이터)
셋 다 “Spark에서 Python을 실행”할 수는 있지만, 성능, 유연성, 그리고 Spark 최적화를 얼마나 유지하느냐 측면에서 동작 방식이 크게 다릅니다.
이 가이드는 실무에서 바로 쓸 수 있는 의사결정 기준과, 그대로 복사해 사용할 수 있는 예제를 제공합니다.
멘탈 모델: 세 가지 사이에서 무엇이 달라질까?
1) 일반 UDF (행 단위 Python)
- Spark가 컬럼 데이터를 Python 워커 프로세스로 전달합니다.
- 함수는 한 번에 한 행씩 실행됩니다.
- 보통 가장 느립니다.
- Spark의 옵티마이저와 코드 생성 최적화를 막을 수 있습니다.
이럴 때 사용: 로직이 단순하고, 데이터가 작거나, 속도가 중요하지 않을 때.
2) Pandas UDF (Arrow 기반 벡터화 배치)
- Spark가 Apache Arrow를 통해 데이터를 컬럼 기반 배치로 Python에 보냅니다.
- 함수는 Pandas Series / DataFrames에서 (벡터화로) 실행됩니다.
- 일반 UDF보다 대체로 훨씬 빠릅니다.
이럴 때 사용: 커스텀 컬럼 로직이 필요하고 성능도 챙기고 싶을 때.
3) mapInPandas (파티션 배치 단위로 완전한 제어)
- Spark가 파티션 청크마다 함수를 한 번 호출하며, Pandas DataFrames 이터레이터를 제공합니다.
- 멀티 컬럼 로직, 복잡한 변환, 심지어 행 확장까지 처리할 수 있습니다.
- Spark의 병렬성은 유지하면서 Python에서 “미니 ETL”을 하기 좋습니다.
이럴 때 사용: “한 컬럼 입력 → 한 컬럼 출력” 형태에 맞지 않는 복잡한 변환이 필요할 때.
빠른 의사결정 테이블
| 이런 게 필요하다면… | 가장 좋은 선택 |
|---|---|
| 간단한 커스텀 변환, 데이터 양이 적음 | Regular UDF |
| 컬럼 단위 변환, 중/대용량 데이터 | Pandas UDF |
| 복잡한 로직: 여러 컬럼, 여러 출력 행, pandas 내부 join, 무거운 Python 라이브러리 | mapInPandas |
| 가능하면 최대 성능 | Spark SQL 내장 함수 (셋 다 피하기) |
규칙: Spark 내장 함수 > Pandas UDF > mapInPandas > 일반 UDF (대개 그렇지만 절대적인 건 아닙니다).
예제 데이터셋
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7)],
["name", "country", "visits"]
)일반 UDF 예제 (단순하지만 느림)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def clean_name(x):
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()장점
- 이해하기 가장 쉽습니다
- 어디서나 동작합니다
단점
- Python을 행 단위로 호출하는 오버헤드
- Spark가 공격적인 최적화를 수행하는 것을 막는 경우가 많음
Pandas UDF 예제 (벡터화, 보통 더 빠름)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()장점
- 배치 처리 + 벡터화
- 큰 컬럼에서 처리량(throughput)이 훨씬 좋음
단점
- Arrow 지원 및 호환되는 환경이 필요
- 여전히 Python 측 실행이므로 내장 함수만큼 최적화되진 않음
mapInPandas 예제 (가장 유연함)
사용 사례: 여러 파생 컬럼 출력 + 커스텀 룰 적용
예를 들어 다음을 만들고 싶다고 해봅시다:
- 정리된 이름
- country와 visits 기반 점수
- 구간(bucket) 라벨
import pandas as pd
def transform(pdf_iter):
for pdf in pdf_iter:
pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0) # US -> 2.0x, else 1.0x
pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
yield pdf
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()장점
- 매우 유연함
- “파티션 단위 pandas 파이프라인”에 적합
- (주의해서) 행 확장, 다중 출력 계산, 외부 라이브러리 호출까지 가능
단점
- 스키마를 정확히 정의해야 함
- 스큐(skew)나 과도하게 큰 파티션을 실수로 만들 가능성이 큼
- 여전히 Python/Arrow 오버헤드는 존재
성능은 어떻게 볼까?
완벽한 벤치마크가 없어도, 아래 실무용 휴리스틱으로 대체로 올바른 선택을 할 수 있습니다:
일반 UDF가 보통 최악인 경우
- 수천만 행 규모
- Spark가 기본 제공으로 처리할 수 있는 단순 변환
- filter/join 내부에서 사용
Pandas UDF가 빛나는 경우
- 하나 이상의 컬럼을 벡터화 연산으로 변환할 때
- pandas/numpy로 효율적으로 로직을 작성할 수 있을 때
mapInPandas가 최적인 경우
- Spark SQL로는 작성이 번거로운 다단계 변환이 필요할 때
- 여러 컬럼을 한 번에 생성하고 싶을 때
- 행 확장이나 복잡한 조건 로직이 필요할 때
정확성 및 스키마 주의사항(gotchas)
Pandas UDF
- 출력은 선언한 타입과 정확히 일치해야 합니다.
- null/NaN이 나타날 수 있으므로 명시적으로 처리하세요.
mapInPandas
- 출력 DataFrame은 스키마와 일치해야 합니다: 컬럼명 + dtype + 순서.
- Python
objectdtype에 주의하세요. string/float로 명시적 캐스팅을 권장합니다.
“피해야 할 것” 리스트 (흔한 안티패턴)
- 기본적인 문자열 연산에 일반 UDF 사용 (
lower,trim, regex) → Spark 내장 함수를 사용하세요. - UDF 안에서 네트워크 API 호출 → 느리고, 불안정하며, 안전한 재시도가 어렵습니다.
- Pandas UDF/mapInPandas 내부에서 행 단위 Python 루프를 과하게 사용 → 벡터화 이점을 망칩니다.
- 일관되지 않은 타입 반환(어떤 때는 int, 어떤 때는 string) → 런타임 실패 / null 발생 가능.
권장 의사결정 흐름
-
Spark 내장 함수로 가능한가? → 내장 함수 사용.
-
컬럼 단위 변환인가(입출력 행 수가 동일)? → Pandas UDF 사용.
-
멀티 컬럼 로직, 여러 출력 컬럼, 또는 행 확장이 필요한가? →
mapInPandas사용. -
데이터가 작거나 빠른 프로토타이핑인가? → 일반 UDF도 허용.
마지막 팁: Spark 실행 계획으로 검증하기
전체 벤치마크를 하지 않더라도, 아래처럼 확인하면 많은 힌트를 얻을 수 있습니다:
df_pandas_udf.explain(True)함수를 추가한 뒤 Spark가 최적화를 덜 하는 것으로 보인다면, 내장 함수로 바꾸거나 구조를 재구성해보라는 신호입니다.