Skip to content

PySpark로 CSV와 Parquet 읽기·쓰기: 신뢰할 수 있는 IO 가이드

Updated on

파일을 잘못 로드하거나, 스키마가 변하거나, 쓰기 작업이 기존의 정상 데이터를 덮어쓰면 데이터 프로젝트는 쉽게 멈춰 버립니다. PySpark는 CSV와 Parquet 전반에 걸쳐 일관된 읽기/쓰기 옵션을 제공해 이런 문제를 해결하지만, 다양한 데이터셋을 다루는 팀에게 기본값이 예상 밖으로 동작할 때도 많습니다.

PySpark의 DataFrameReader / DataFrameWriter API는 예측 가능한 데이터 적재와 내보내기를 제공합니다. 적절한 포맷과 저장 모드를 선택하고, 스키마를 명시적으로 제어하며, 파티셔닝을 전략적으로 적용하면 파이프라인을 안정적이고 성능 좋게 유지할 수 있습니다.

Python Pandas Dataframe에서 코드 한 줄도 작성하지 않고 빠르게 Data Visualization을 만들고 싶나요?

PyGWalker는 시각화를 통한 Exploratory Data Analysis를 위한 Python 라이브러리입니다. PyGWalker (opens in a new tab)는 pandas dataframe(및 polars dataframe)을 시각적 탐색을 위한 tableau 대체 User Interface로 변환하여, Jupyter Notebook에서의 데이터 분석 및 시각화 워크플로우를 단순화할 수 있습니다.

PyGWalker for Data visualization (opens in a new tab)

CSV vs Parquet 한눈에 비교

TopicCSVParquet
Schema제공하지 않으면 추론; 타입 드리프트가 잦음파일에 스키마 저장; 타입이 안정적
Compression기본적으로 없음; 파일 크기 큼컬럼 기반 + 압축; 더 작고 빠름
Read speed로우 기반; 폭 넓은 스캔에 느림컬럼 기반; 특정 컬럼 선택 시 빠름
Best for외부 도구와의 포맷 호환분석, 반복 조회, 파티션된 데이터 레이크

빠른 시작 설정

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("io-guide").getOrCreate()

제어력을 갖춘 CSV 읽기

df_csv = (
    spark.read
    .option("header", True)
    .option("inferSchema", False)
    .schema("id INT, name STRING, ts TIMESTAMP")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .csv("s3://bucket/raw/users/*.csv")
)
  • header는 첫 번째 행에서 컬럼 이름을 읽어옵니다.
  • 스키마를 명시하면 타입 변화에 덜 취약해지고, 읽기 속도도 빨라집니다.
  • mode 선택지: PERMISSIVE(기본값), DROPMALFORMED, FAILFAST.

Parquet 읽기

df_parquet = spark.read.parquet("s3://bucket/curated/users/")
  • Parquet는 스키마와 통계 정보를 저장하므로 inferSchema 옵션이 필요 없습니다.
  • 컬럼 프루닝(column pruning)과 프레디케이트 푸시다운(predicate pushdown)으로 성능을 높일 수 있습니다.

안전하게 CSV 쓰기

(
    df_csv
    .repartition(1)  # 필요 시 출력 샤드 개수 줄이기
    .write
    .option("header", True)
    .option("delimiter", ",")
    .mode("overwrite")
    .csv("s3://bucket/exports/users_csv")
)
  • repartition으로 파일 개수를 제어하되, 지나치게 큰 단일 파일은 피하세요.
  • 아래 표를 참고해 mode를 신중하게 선택합니다.

파티션을 활용한 Parquet 쓰기

(
    df_parquet
    .write
    .mode("append")
    .partitionBy("country", "ingest_date")
    .parquet("s3://bucket/warehouse/users_parquet")
)
  • partitionBy는 자주 사용하는 필터 컬럼 기준으로 디렉터리를 생성해 쿼리 속도를 높여줍니다.
  • 파티션 키는 카디널리티가 너무 높지 않도록 선택해, 수많은 작은 파일 생성을 피하세요.

Save mode 동작 방식

modeBehavior
error / errorifexists기본값; 경로가 이미 존재하면 실패
overwrite해당 경로의 기존 데이터를 교체
append기존 경로에 새 파일을 추가
ignore경로가 이미 존재하면 쓰기를 생략

스키마 진화(schema evolution) 팁

  • 스키마가 변할 가능성이 크다면 Parquet를 선호하고, 호환 가능한 변경에는 mergeSchema를 활용하세요.
  • CSV의 경우 스키마 버전을 명시적으로 관리하고, 쓰기 전에 컬럼을 검증하는 절차를 두는 편이 좋습니다.
  • 컬럼을 추가할 때는 새로운 스키마로 Parquet를 쓰고, 모든 리더가 최신 테이블 정의를 사용하도록 맞춰야 합니다.

자주 발생하는 문제와 해결책

  • 구분자 불일치: delimiter를 명시적으로 설정하고, 기본값에 의존하지 않습니다.
  • 잘못된 타입: 운영 환경에서는 inferSchema=True 대신 schema를 직접 지정합니다.
  • 너무 많은 작은 파일: 쓰기 전에 coalesce 또는 repartition을 사용하고, 필요하면 maxRecordsPerFile도 고려합니다.
  • 실수로 overwrite: 스테이징 경로(임시 경로)에 먼저 쓴 뒤, 스토리지 계층이 지원한다면 원자적으로 이동시키는 전략을 쓰세요.

최소 예제: 엔드 투 엔드 레시피

schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
 
df = (
    spark.read
    .option("header", True)
    .schema(schema)
    .csv("s3://bucket/raw/events/*.csv")
)
 
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
 
(
    cleaned
    .write
    .mode("append")
    .partitionBy("country")
    .parquet("s3://bucket/warehouse/events_parquet")
)