PySpark로 CSV와 Parquet 읽기·쓰기: 신뢰할 수 있는 IO 가이드
Updated on
파일을 잘못 로드하거나, 스키마가 변하거나, 쓰기 작업이 기존의 정상 데이터를 덮어쓰면 데이터 프로젝트는 쉽게 멈춰 버립니다. PySpark는 CSV와 Parquet 전반에 걸쳐 일관된 읽기/쓰기 옵션을 제공해 이런 문제를 해결하지만, 다양한 데이터셋을 다루는 팀에게 기본값이 예상 밖으로 동작할 때도 많습니다.
PySpark의 DataFrameReader / DataFrameWriter API는 예측 가능한 데이터 적재와 내보내기를 제공합니다. 적절한 포맷과 저장 모드를 선택하고, 스키마를 명시적으로 제어하며, 파티셔닝을 전략적으로 적용하면 파이프라인을 안정적이고 성능 좋게 유지할 수 있습니다.
Python Pandas Dataframe에서 코드 한 줄도 작성하지 않고 빠르게 Data Visualization을 만들고 싶나요?
PyGWalker는 시각화를 통한 Exploratory Data Analysis를 위한 Python 라이브러리입니다. PyGWalker (opens in a new tab)는 pandas dataframe(및 polars dataframe)을 시각적 탐색을 위한 tableau 대체 User Interface로 변환하여, Jupyter Notebook에서의 데이터 분석 및 시각화 워크플로우를 단순화할 수 있습니다.
CSV vs Parquet 한눈에 비교
| Topic | CSV | Parquet |
|---|---|---|
| Schema | 제공하지 않으면 추론; 타입 드리프트가 잦음 | 파일에 스키마 저장; 타입이 안정적 |
| Compression | 기본적으로 없음; 파일 크기 큼 | 컬럼 기반 + 압축; 더 작고 빠름 |
| Read speed | 로우 기반; 폭 넓은 스캔에 느림 | 컬럼 기반; 특정 컬럼 선택 시 빠름 |
| Best for | 외부 도구와의 포맷 호환 | 분석, 반복 조회, 파티션된 데이터 레이크 |
빠른 시작 설정
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("io-guide").getOrCreate()제어력을 갖춘 CSV 읽기
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", False)
.schema("id INT, name STRING, ts TIMESTAMP")
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.csv("s3://bucket/raw/users/*.csv")
)header는 첫 번째 행에서 컬럼 이름을 읽어옵니다.- 스키마를 명시하면 타입 변화에 덜 취약해지고, 읽기 속도도 빨라집니다.
mode선택지:PERMISSIVE(기본값),DROPMALFORMED,FAILFAST.
Parquet 읽기
df_parquet = spark.read.parquet("s3://bucket/curated/users/")- Parquet는 스키마와 통계 정보를 저장하므로
inferSchema옵션이 필요 없습니다. - 컬럼 프루닝(column pruning)과 프레디케이트 푸시다운(predicate pushdown)으로 성능을 높일 수 있습니다.
안전하게 CSV 쓰기
(
df_csv
.repartition(1) # 필요 시 출력 샤드 개수 줄이기
.write
.option("header", True)
.option("delimiter", ",")
.mode("overwrite")
.csv("s3://bucket/exports/users_csv")
)repartition으로 파일 개수를 제어하되, 지나치게 큰 단일 파일은 피하세요.- 아래 표를 참고해
mode를 신중하게 선택합니다.
파티션을 활용한 Parquet 쓰기
(
df_parquet
.write
.mode("append")
.partitionBy("country", "ingest_date")
.parquet("s3://bucket/warehouse/users_parquet")
)partitionBy는 자주 사용하는 필터 컬럼 기준으로 디렉터리를 생성해 쿼리 속도를 높여줍니다.- 파티션 키는 카디널리티가 너무 높지 않도록 선택해, 수많은 작은 파일 생성을 피하세요.
Save mode 동작 방식
| mode | Behavior |
|---|---|
error / errorifexists | 기본값; 경로가 이미 존재하면 실패 |
overwrite | 해당 경로의 기존 데이터를 교체 |
append | 기존 경로에 새 파일을 추가 |
ignore | 경로가 이미 존재하면 쓰기를 생략 |
스키마 진화(schema evolution) 팁
- 스키마가 변할 가능성이 크다면 Parquet를 선호하고, 호환 가능한 변경에는
mergeSchema를 활용하세요. - CSV의 경우 스키마 버전을 명시적으로 관리하고, 쓰기 전에 컬럼을 검증하는 절차를 두는 편이 좋습니다.
- 컬럼을 추가할 때는 새로운 스키마로 Parquet를 쓰고, 모든 리더가 최신 테이블 정의를 사용하도록 맞춰야 합니다.
자주 발생하는 문제와 해결책
- 구분자 불일치:
delimiter를 명시적으로 설정하고, 기본값에 의존하지 않습니다. - 잘못된 타입: 운영 환경에서는
inferSchema=True대신schema를 직접 지정합니다. - 너무 많은 작은 파일: 쓰기 전에
coalesce또는repartition을 사용하고, 필요하면maxRecordsPerFile도 고려합니다. - 실수로 overwrite: 스테이징 경로(임시 경로)에 먼저 쓴 뒤, 스토리지 계층이 지원한다면 원자적으로 이동시키는 전략을 쓰세요.
최소 예제: 엔드 투 엔드 레시피
schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
df = (
spark.read
.option("header", True)
.schema(schema)
.csv("s3://bucket/raw/events/*.csv")
)
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
(
cleaned
.write
.mode("append")
.partitionBy("country")
.parquet("s3://bucket/warehouse/events_parquet")
)