PySpark Select, Filter, and withColumn: 핵심 DataFrame 레시피
Updated on
지저분한 DataFrame은 모든 파이프라인을 느리게 만듭니다. 잘못된 컬럼 선택, 안전하지 않은 필터, 깨지기 쉬운 캐스팅은 조용히 에러를 만들어 냅니다. 팀이 Spark 잡을 예측 가능하게 유지하려면, 컬럼 선택·필터링·파생 컬럼 생성에 대한 반복 가능한 패턴이 필요합니다.
PySpark의 select, filter/where, withColumn API는 변환을 명시적이고 타입 안전하며 테스트 가능하게 만들어 이런 문제를 해결합니다. 이 가이드는 핵심 패턴을 정리하고, 자주 발생하는 함정을 피하는 방법을 보여 줍니다.
Python Pandas Dataframe에서 코드 없이 빠르게 Data Visualization을 만들고 싶나요?
PyGWalker는 Visualization을 통한 Exploratory Data Analysis를 위한 Python 라이브러리입니다. PyGWalker (opens in a new tab)는 pandas dataframe(및 polars dataframe)을 시각적 탐색을 위한 tableau 대체 User Interface로 변환해, Jupyter Notebook에서의 데이터 분석과 시각화 워크플로우를 단순화해 줍니다.
독자를 위한 로드맵
| Task | API | 언제 사용할까 |
|---|---|---|
| 컬럼 선택/이름 변경 | select, alias | 필요한 컬럼만 유지; 단순할 땐 selectExpr보다 선호 |
| 행 필터링 | filter / where | 두 API는 동일; 조건을 &, ` |
| 파생/조건부 컬럼 | withColumn, when/otherwise | 로직을 적용해 컬럼 추가 또는 대체 |
| SQL 스타일 표현식 | selectExpr, expr | 여러 import 없이 빠르게 산술·SQL 함수 사용 |
| 안전한 캐스팅 | cast, try_cast (Spark 3.5+) | 잘못된 값에서 에러 없이 타입 강제 |
환경 설정과 샘플 데이터
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("select-filter-withcolumn").getOrCreate()
df = spark.createDataFrame(
[
(1, "Alice", "2025-11-01", "premium", "42"),
(2, "Bob", "2025-11-02", "basic", "x"),
(3, "Cara", "2025-11-02", None, "7"),
],
"id INT, name STRING, signup_date STRING, tier STRING, score STRING",
)필요한 것만 Select 하기
slim = df.select(
"id",
F.col("name").alias("customer_name"),
F.to_date("signup_date").alias("signup_dt"),
)select를 사용하면 projection이 명시적으로 드러나고, 불필요하게 넓은 스캔을 피할 수 있습니다.- 읽기 좋은 이름이 필요할 땐
alias를 사용합니다.
Filter / where: 동일한 API
active = slim.where(
(F.col("signup_dt") >= "2025-11-01")
& (F.col("customer_name") != "Bob")
)filter와where는 완전히 동일하므로, 가독성을 기준으로 하나를 골라 일관되게 사용하면 됩니다.- 조건을 조합할 땐
&와|를 사용하고, 각 조건은 괄호로 감싸는 것이 안전합니다. - Null 체크는
isNull()/isNotNull()을 사용해 예상치 못한 동작을 방지합니다.
withColumn으로 파생/조건부 값 만들기
scored = active.withColumn(
"score_int",
F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int"))
.otherwise(None),
)withColumn은 컬럼을 추가하거나 기존 컬럼을 대체합니다. 실수로 덮어쓰지 않도록 컬럼 이름 충돌에 주의합니다.when/otherwise를 사용하면 분기 로직을 명확하게 표현할 수 있습니다.
빠른 표현식을 위한 selectExpr
expr_df = df.selectExpr(
"id",
"upper(name) AS name_upper",
"to_date(signup_date) AS signup_dt",
"CASE WHEN tier = 'premium' THEN 1 ELSE 0 END AS is_premium",
)- 여러
functions를 import하지 않고 SQL 스타일 계산을 빠르게 작성할 때 유용합니다. - 복잡한 로직은 가독성과 테스트 용이성을 위해 일반적인
withColumn체인으로 분리하는 편이 좋습니다.
안전한 캐스팅 패턴
typed = (
df
.withColumn("score_int", F.col("score").cast("int"))
.withColumn("signup_ts", F.to_timestamp("signup_date"))
)- 명시적인 캐스팅을 선호하고, CSV 같은 소스의 스키마 추론에 지나치게 의존하지 않는 것이 좋습니다.
- Spark 3.5+에서는
try_cast를 사용하면 잘못된 값에서 실패 대신 null을 반환하게 할 수 있습니다.
데이터 품질을 위한 빠른 점검
from pyspark.sql import functions as F
bad_counts = df.select(
F.sum(F.col("score").rlike("^[0-9]+$").cast("int")).alias("valid_scores"),
F.sum(F.col("score").rlike("^[^0-9]").cast("int")).alias("invalid_scores"),
)- 쓰기 전 작은 aggregation으로 검증하면 문제를 초기에 드러낼 수 있습니다.
- 주요 컬럼의 결측 비율을 보려면
count와isNull을 조합해 사용합니다.
흔한 함정과 해결책
- 괄호 없는 조건 조합:
&/|를 사용할 때는 항상 각 부울 조건을 괄호로 감쌉니다. - 의도치 않은 컬럼 덮어쓰기:
df.columns를 확인하거나withColumn에서 새 이름을 사용해 충돌을 피합니다. - 파싱되지 않은 문자열 날짜: 비교 전에 반드시
to_date/to_timestamp로 변환합니다. - Null에 민감한 비교 연산: 세 값 논리(three-valued logic)로 인한 의외의 결과를 피하려면
isNull/isNotNull을 활용합니다.
최소 예제 파이프라인
clean = (
df
.select("id", "name", "signup_date", "tier", "score")
.where(F.col("tier").isNotNull())
.withColumn("signup_dt", F.to_date("signup_date"))
.withColumn(
"score_int",
F.when(F.col("score").rlike("^[0-9]+$"), F.col("score").cast("int")),
)
)이 패턴은 선택을 최소화하고, 필터를 명시적으로 표현하며, 다운스트림에 쓰기 전에 파생 컬럼에 올바른 타입을 지정해 둡니다.
