Skip to content

PySpark UDF vs Pandas UDF vs mapInPandas: 무엇을 사용해야 할까?

PySpark에서 커스텀 로직이 필요할 때, 보통 다음 세 가지 중 하나를 선택하게 됩니다:

  • 일반 Python UDF (udf(...))
  • Pandas UDF (@pandas_udf)
  • mapInPandas (DataFrame → Pandas DataFrame 이터레이터)

셋 다 “Spark에서 Python을 실행”할 수는 있지만, 성능, 유연성, 그리고 Spark 최적화를 얼마나 유지하느냐 측면에서 동작 방식이 크게 다릅니다.

이 가이드는 실무에서 바로 쓸 수 있는 의사결정 기준과, 그대로 복사해 사용할 수 있는 예제를 제공합니다.


멘탈 모델: 세 가지 사이에서 무엇이 달라질까?

1) 일반 UDF (행 단위 Python)

  • Spark가 컬럼 데이터를 Python 워커 프로세스로 전달합니다.
  • 함수는 한 번에 한 행씩 실행됩니다.
  • 보통 가장 느립니다.
  • Spark의 옵티마이저와 코드 생성 최적화를 막을 수 있습니다.

이럴 때 사용: 로직이 단순하고, 데이터가 작거나, 속도가 중요하지 않을 때.


2) Pandas UDF (Arrow 기반 벡터화 배치)

  • Spark가 Apache Arrow를 통해 데이터를 컬럼 기반 배치로 Python에 보냅니다.
  • 함수는 Pandas Series / DataFrames에서 (벡터화로) 실행됩니다.
  • 일반 UDF보다 대체로 훨씬 빠릅니다.

이럴 때 사용: 커스텀 컬럼 로직이 필요하고 성능도 챙기고 싶을 때.


3) mapInPandas (파티션 배치 단위로 완전한 제어)

  • Spark가 파티션 청크마다 함수를 한 번 호출하며, Pandas DataFrames 이터레이터를 제공합니다.
  • 멀티 컬럼 로직, 복잡한 변환, 심지어 행 확장까지 처리할 수 있습니다.
  • Spark의 병렬성은 유지하면서 Python에서 “미니 ETL”을 하기 좋습니다.

이럴 때 사용: “한 컬럼 입력 → 한 컬럼 출력” 형태에 맞지 않는 복잡한 변환이 필요할 때.


빠른 의사결정 테이블

이런 게 필요하다면…가장 좋은 선택
간단한 커스텀 변환, 데이터 양이 적음Regular UDF
컬럼 단위 변환, 중/대용량 데이터Pandas UDF
복잡한 로직: 여러 컬럼, 여러 출력 행, pandas 내부 join, 무거운 Python 라이브러리mapInPandas
가능하면 최대 성능Spark SQL 내장 함수 (셋 다 피하기)

규칙: Spark 내장 함수 > Pandas UDF > mapInPandas > 일반 UDF (대개 그렇지만 절대적인 건 아닙니다).


예제 데이터셋

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, StructType, StructField, LongType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [(" Alice  ", "US", 10),
     ("bob", "UK", 3),
     (None, "US", 7)],
    ["name", "country", "visits"]
)

일반 UDF 예제 (단순하지만 느림)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
def clean_name(x):
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df_udf = df.withColumn("name_clean", clean_name_udf("name"))
df_udf.show()

장점

  • 이해하기 가장 쉽습니다
  • 어디서나 동작합니다

단점

  • Python을 행 단위로 호출하는 오버헤드
  • Spark가 공격적인 최적화를 수행하는 것을 막는 경우가 많음

Pandas UDF 예제 (벡터화, 보통 더 빠름)

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_vec(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df_pandas_udf = df.withColumn("name_clean", clean_name_vec("name"))
df_pandas_udf.show()

장점

  • 배치 처리 + 벡터화
  • 큰 컬럼에서 처리량(throughput)이 훨씬 좋음

단점

  • Arrow 지원 및 호환되는 환경이 필요
  • 여전히 Python 측 실행이므로 내장 함수만큼 최적화되진 않음

mapInPandas 예제 (가장 유연함)

사용 사례: 여러 파생 컬럼 출력 + 커스텀 룰 적용

예를 들어 다음을 만들고 싶다고 해봅시다:

  • 정리된 이름
  • country와 visits 기반 점수
  • 구간(bucket) 라벨
import pandas as pd
 
def transform(pdf_iter):
    for pdf in pdf_iter:
        pdf["name_clean"] = pdf["name"].astype("string").str.strip().str.lower()
        pdf["visits"] = pdf["visits"].fillna(0).astype("float64")
        pdf["score"] = pdf["visits"] * pdf["country"].eq("US").astype("float64").add(1.0)  # US -> 2.0x, else 1.0x
        pdf["bucket"] = pd.cut(pdf["visits"], bins=[-1, 0, 5, 999999], labels=["none", "low", "high"])
        yield pdf
 
out_schema = "name string, country string, visits long, name_clean string, score double, bucket string"
 
df_map = df.mapInPandas(transform, schema=out_schema)
df_map.show()

장점

  • 매우 유연함
  • “파티션 단위 pandas 파이프라인”에 적합
  • (주의해서) 행 확장, 다중 출력 계산, 외부 라이브러리 호출까지 가능

단점

  • 스키마를 정확히 정의해야 함
  • 스큐(skew)나 과도하게 큰 파티션을 실수로 만들 가능성이 큼
  • 여전히 Python/Arrow 오버헤드는 존재

성능은 어떻게 볼까?

완벽한 벤치마크가 없어도, 아래 실무용 휴리스틱으로 대체로 올바른 선택을 할 수 있습니다:

일반 UDF가 보통 최악인 경우

  • 수천만 행 규모
  • Spark가 기본 제공으로 처리할 수 있는 단순 변환
  • filter/join 내부에서 사용

Pandas UDF가 빛나는 경우

  • 하나 이상의 컬럼을 벡터화 연산으로 변환할 때
  • pandas/numpy로 효율적으로 로직을 작성할 수 있을 때

mapInPandas가 최적인 경우

  • Spark SQL로는 작성이 번거로운 다단계 변환이 필요할 때
  • 여러 컬럼을 한 번에 생성하고 싶을 때
  • 행 확장이나 복잡한 조건 로직이 필요할 때

정확성 및 스키마 주의사항(gotchas)

Pandas UDF

  • 출력은 선언한 타입과 정확히 일치해야 합니다.
  • null/NaN이 나타날 수 있으므로 명시적으로 처리하세요.

mapInPandas

  • 출력 DataFrame은 스키마와 일치해야 합니다: 컬럼명 + dtype + 순서.
  • Python object dtype에 주의하세요. string/float로 명시적 캐스팅을 권장합니다.

“피해야 할 것” 리스트 (흔한 안티패턴)

  • 기본적인 문자열 연산에 일반 UDF 사용 (lower, trim, regex) → Spark 내장 함수를 사용하세요.
  • UDF 안에서 네트워크 API 호출 → 느리고, 불안정하며, 안전한 재시도가 어렵습니다.
  • Pandas UDF/mapInPandas 내부에서 행 단위 Python 루프를 과하게 사용 → 벡터화 이점을 망칩니다.
  • 일관되지 않은 타입 반환(어떤 때는 int, 어떤 때는 string) → 런타임 실패 / null 발생 가능.

권장 의사결정 흐름

  1. Spark 내장 함수로 가능한가? → 내장 함수 사용.

  2. 컬럼 단위 변환인가(입출력 행 수가 동일)? → Pandas UDF 사용.

  3. 멀티 컬럼 로직, 여러 출력 컬럼, 또는 행 확장이 필요한가? → mapInPandas 사용.

  4. 데이터가 작거나 빠른 프로토타이핑인가? → 일반 UDF도 허용.


마지막 팁: Spark 실행 계획으로 검증하기

전체 벤치마크를 하지 않더라도, 아래처럼 확인하면 많은 힌트를 얻을 수 있습니다:

df_pandas_udf.explain(True)

함수를 추가한 뒤 Spark가 최적화를 덜 하는 것으로 보인다면, 내장 함수로 바꾸거나 구조를 재구성해보라는 신호입니다.