Skip to content

PySpark UDF 튜토리얼(초보자 친화): pyspark udf란 무엇이고, 왜/어떻게 사용하는가

PySpark는 빅데이터를 빠르게 처리하는 데 강점이 있습니다—하지만 Spark가 이해하지 못하는 로직을 작성하는 순간 문제가 생깁니다. 이때 UDF(User Defined Function) 가 필요합니다. UDF를 사용하면 Spark DataFrame에서 커스텀 Python 코드를 실행할 수 있습니다.

다만 주의할 점이 있습니다. UDF는 조심해서 쓰지 않으면 Spark를 더 느리게 만들 수 있습니다. 이 튜토리얼에서는 UDF를 언제 써야 하는지, 올바르게 작성하는 방법, 그리고 더 빠른 대안(내장 함수나 Pandas UDF 등)을 어떻게 선택할지 설명합니다.


UDF가 해결하는 문제

DataFrame 컬럼에 커스텀 로직을 적용하고 싶다고 해봅시다:

  • 도메인 특화 파싱(이상한 ID 형식, 커스텀 규칙)
  • 커스텀 점수/스코어링 함수
  • Spark SQL 내장 함수로 커버되지 않는 텍스트 정규화
  • 복잡한 규칙을 사용한 값 매핑

Spark의 내장 함수는 JVM에서 최적화되어 있습니다. 하지만 순수 Python 로직은 자동으로 “Spark 네이티브”가 되지 않습니다.


해결 옵션(가장 빠름 → 가장 느림)

실무에서는 보통 다음 우선순위를 권장합니다:

  1. Spark 내장 함수 (pyspark.sql.functions) ✅ 가장 빠름
  2. SQL 표현식 (expr, when, regexp_extract) ✅ 빠름
  3. Pandas UDF (Apache Arrow로 벡터화) ✅ 대체로 빠름
  4. 일반 Python UDF (행 단위 Python 실행) ⚠️ 대체로 가장 느림

이 글은 (3)과 (4) 를 중심으로 다루며, 언제 피해야 하는지에 대한 강한 가이드를 제공합니다.


준비: 가지고 놀 작은 DataFrame 만들기

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
 
spark = SparkSession.builder.getOrCreate()
 
df = spark.createDataFrame(
    [
        (" Alice  ", "US", 10),
        ("bob", "UK", 3),
        (None, "US", 7),
    ],
    ["name", "country", "visits"]
)
 
df.show()

예제 1: 간단한 Python UDF(문자열 정리)

목표

공백을 제거하고 소문자로 바꾸며, null도 안전하게 처리합니다.

from pyspark.sql.functions import udf
 
def clean_name(x: str) -> str:
    if x is None:
        return None
    return x.strip().lower()
 
clean_name_udf = udf(clean_name, StringType())
 
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()

참고

  • 반환 타입(StringType())을 반드시 제공해야 합니다.
  • Python UDF는 Python 워커에서 행 단위로 실행됩니다(대용량에서는 느릴 수 있음).

예제 2: 가능하면 내장 함수를 우선 사용하기

앞의 UDF는 내장 함수로 대체할 수 있습니다(더 빠름):

df_fast = df.withColumn(
    "name_clean",
    F.lower(F.trim(F.col("name")))
)
df_fast.show()

경험칙: F.*로 표현할 수 있다면 그걸 쓰세요.


예제 3: 여러 입력을 받는 UDF(커스텀 스코어링)

country + visits를 기반으로 점수를 만들어봅시다.

def score(country: str, visits: int) -> float:
    if country is None or visits is None:
        return 0.0
    bonus = 1.5 if country == "US" else 1.0
    return float(visits) * bonus
 
score_udf = udf(score, DoubleType())
 
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()

예제 4: Spark SQL용 UDF 등록하기

SQL에서 사용하고 싶다면:

spark.udf.register("score_udf_sql", score, DoubleType())
 
df.createOrReplaceTempView("t")
spark.sql("""
  SELECT name, country, visits, score_udf_sql(country, visits) AS score
  FROM t
""").show()

흔한 실수(그리고 피하는 방법)

1) 잘못된 타입을 반환하기

Spark가 DoubleType()을 기대하는데 때때로 문자열을 반환하면 런타임 에러가 나거나 null이 생길 수 있습니다.

2) 조인/필터에서 불필요하게 UDF를 사용하기

UDF는 Spark 최적화를 막을 수 있습니다. 파생 컬럼을 먼저 계산하거나, 내장 함수를 사용해보세요.

3) null 처리를 빼먹기

입력은 언제든 None일 수 있다고 가정해야 합니다.


더 빠른 대안: Pandas UDF(벡터화)

Pandas UDF는 데이터를 한 번에 배치로 처리(벡터화)하므로, 일반 UDF보다 훨씬 빠른 경우가 많습니다.

예제 5: 이름 정리를 위한 Pandas UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf
 
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
    return s.str.strip().str.lower()
 
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()

Pandas UDF가 도움이 되는 경우

  • pandas에서 로직을 구현하는 편이 더 쉬울 때
  • 벡터화된 연산이 필요할 때
  • 내장 함수만으로는 부족하고 성능도 중요할 때

예제 6: 원소별 스코어링을 위한 Pandas UDF

@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
    bonus = country.eq("US").astype("float64") * 0.5 + 1.0
    visits_filled = visits.fillna(0).astype("float64")
    return visits_filled * bonus
 
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()

실전 베스트 프랙티스 체크리스트

  • ✅ 내장 함수를 먼저 시도하세요 (F.lower, F.when, F.regexp_extract 등)
  • ✅ 커스텀 로직이 필요하면, 일반 UDF보다 Pandas UDF를 먼저 고려하세요
  • ✅ 항상 올바른 반환 타입을 지정하세요
  • ✅ null(None)을 명시적으로 처리하세요
  • ✅ UDF 로직은 순수하고 결정적이어야 합니다(같은 입력 → 같은 출력)
  • ⚠️ 가능하면 필터/조인에서 UDF를 피하세요(최적화가 줄어들 수 있음)
  • ⚠️ 측정하세요: 샘플 데이터셋에서 UDF 사용 전/후 실행 시간을 비교하세요

일반 Python UDF를 써야 하는 경우

다음과 같은 경우에는 일반 UDF를 사용해도 됩니다:

  • 데이터가 아주 크지 않거나(또는) 성능이 핵심이 아닐 때
  • 내장 함수로 표현하기 어려운 로직일 때
  • pandas/Arrow를 사용할 수 없거나 변환에 적합하지 않을 때

성능이 중요하다면, 먼저 내장 함수 또는 Pandas UDF를 선택하세요.