PySpark UDF 튜토리얼(초보자 친화): pyspark udf란 무엇이고, 왜/어떻게 사용하는가
PySpark는 빅데이터를 빠르게 처리하는 데 강점이 있습니다—하지만 Spark가 이해하지 못하는 로직을 작성하는 순간 문제가 생깁니다. 이때 UDF(User Defined Function) 가 필요합니다. UDF를 사용하면 Spark DataFrame에서 커스텀 Python 코드를 실행할 수 있습니다.
다만 주의할 점이 있습니다. UDF는 조심해서 쓰지 않으면 Spark를 더 느리게 만들 수 있습니다. 이 튜토리얼에서는 UDF를 언제 써야 하는지, 올바르게 작성하는 방법, 그리고 더 빠른 대안(내장 함수나 Pandas UDF 등)을 어떻게 선택할지 설명합니다.
UDF가 해결하는 문제
DataFrame 컬럼에 커스텀 로직을 적용하고 싶다고 해봅시다:
- 도메인 특화 파싱(이상한 ID 형식, 커스텀 규칙)
- 커스텀 점수/스코어링 함수
- Spark SQL 내장 함수로 커버되지 않는 텍스트 정규화
- 복잡한 규칙을 사용한 값 매핑
Spark의 내장 함수는 JVM에서 최적화되어 있습니다. 하지만 순수 Python 로직은 자동으로 “Spark 네이티브”가 되지 않습니다.
해결 옵션(가장 빠름 → 가장 느림)
실무에서는 보통 다음 우선순위를 권장합니다:
- Spark 내장 함수 (
pyspark.sql.functions) ✅ 가장 빠름 - SQL 표현식 (
expr,when,regexp_extract) ✅ 빠름 - Pandas UDF (Apache Arrow로 벡터화) ✅ 대체로 빠름
- 일반 Python UDF (행 단위 Python 실행) ⚠️ 대체로 가장 느림
이 글은 (3)과 (4) 를 중심으로 다루며, 언제 피해야 하는지에 대한 강한 가이드를 제공합니다.
준비: 가지고 놀 작은 DataFrame 만들기
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(" Alice ", "US", 10),
("bob", "UK", 3),
(None, "US", 7),
],
["name", "country", "visits"]
)
df.show()예제 1: 간단한 Python UDF(문자열 정리)
목표
공백을 제거하고 소문자로 바꾸며, null도 안전하게 처리합니다.
from pyspark.sql.functions import udf
def clean_name(x: str) -> str:
if x is None:
return None
return x.strip().lower()
clean_name_udf = udf(clean_name, StringType())
df2 = df.withColumn("name_clean", clean_name_udf(F.col("name")))
df2.show()참고
- 반환 타입(
StringType())을 반드시 제공해야 합니다. - Python UDF는 Python 워커에서 행 단위로 실행됩니다(대용량에서는 느릴 수 있음).
예제 2: 가능하면 내장 함수를 우선 사용하기
앞의 UDF는 내장 함수로 대체할 수 있습니다(더 빠름):
df_fast = df.withColumn(
"name_clean",
F.lower(F.trim(F.col("name")))
)
df_fast.show()경험칙: F.*로 표현할 수 있다면 그걸 쓰세요.
예제 3: 여러 입력을 받는 UDF(커스텀 스코어링)
country + visits를 기반으로 점수를 만들어봅시다.
def score(country: str, visits: int) -> float:
if country is None or visits is None:
return 0.0
bonus = 1.5 if country == "US" else 1.0
return float(visits) * bonus
score_udf = udf(score, DoubleType())
df3 = df.withColumn("score", score_udf(F.col("country"), F.col("visits")))
df3.show()예제 4: Spark SQL용 UDF 등록하기
SQL에서 사용하고 싶다면:
spark.udf.register("score_udf_sql", score, DoubleType())
df.createOrReplaceTempView("t")
spark.sql("""
SELECT name, country, visits, score_udf_sql(country, visits) AS score
FROM t
""").show()흔한 실수(그리고 피하는 방법)
1) 잘못된 타입을 반환하기
Spark가 DoubleType()을 기대하는데 때때로 문자열을 반환하면 런타임 에러가 나거나 null이 생길 수 있습니다.
2) 조인/필터에서 불필요하게 UDF를 사용하기
UDF는 Spark 최적화를 막을 수 있습니다. 파생 컬럼을 먼저 계산하거나, 내장 함수를 사용해보세요.
3) null 처리를 빼먹기
입력은 언제든 None일 수 있다고 가정해야 합니다.
더 빠른 대안: Pandas UDF(벡터화)
Pandas UDF는 데이터를 한 번에 배치로 처리(벡터화)하므로, 일반 UDF보다 훨씬 빠른 경우가 많습니다.
예제 5: 이름 정리를 위한 Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def clean_name_pandas(s: pd.Series) -> pd.Series:
return s.str.strip().str.lower()
df4 = df.withColumn("name_clean", clean_name_pandas(F.col("name")))
df4.show()Pandas UDF가 도움이 되는 경우
- pandas에서 로직을 구현하는 편이 더 쉬울 때
- 벡터화된 연산이 필요할 때
- 내장 함수만으로는 부족하고 성능도 중요할 때
예제 6: 원소별 스코어링을 위한 Pandas UDF
@pandas_udf("double")
def score_pandas(country: pd.Series, visits: pd.Series) -> pd.Series:
bonus = country.eq("US").astype("float64") * 0.5 + 1.0
visits_filled = visits.fillna(0).astype("float64")
return visits_filled * bonus
df5 = df.withColumn("score", score_pandas(F.col("country"), F.col("visits")))
df5.show()실전 베스트 프랙티스 체크리스트
- ✅ 내장 함수를 먼저 시도하세요 (
F.lower,F.when,F.regexp_extract등) - ✅ 커스텀 로직이 필요하면, 일반 UDF보다 Pandas UDF를 먼저 고려하세요
- ✅ 항상 올바른 반환 타입을 지정하세요
- ✅ null(
None)을 명시적으로 처리하세요 - ✅ UDF 로직은 순수하고 결정적이어야 합니다(같은 입력 → 같은 출력)
- ⚠️ 가능하면 필터/조인에서 UDF를 피하세요(최적화가 줄어들 수 있음)
- ⚠️ 측정하세요: 샘플 데이터셋에서 UDF 사용 전/후 실행 시간을 비교하세요
일반 Python UDF를 써야 하는 경우
다음과 같은 경우에는 일반 UDF를 사용해도 됩니다:
- 데이터가 아주 크지 않거나(또는) 성능이 핵심이 아닐 때
- 내장 함수로 표현하기 어려운 로직일 때
- pandas/Arrow를 사용할 수 없거나 변환에 적합하지 않을 때
성능이 중요하다면, 먼저 내장 함수 또는 Pandas UDF를 선택하세요.