Skip to content

PySpark CSV und Parquet lesen und schreiben: Zuverlässiger IO-Guide

Updated on

Datenprojekte geraten ins Stocken, wenn Dateien falsch geladen werden, Schemas abweichen oder Schreibvorgänge gute Daten überschreiben. PySpark löst diese Probleme mit konsistenten Lese-/Schreiboptionen für CSV und Parquet, aber die Standardwerte können Teams überraschen, die mit heterogenen Datensätzen arbeiten.

Die DataFrameReader/DataFrameWriter‑APIs von PySpark bieten vorhersehbare Ingestion und Export. Durch die Wahl des passenden Formats und Save Modes, ergänzt um Schemakontrolle und sinnvolle Partitionierung, bleiben Pipelines stabil und performant.

Willst du schnell Datenvisualisierung aus einem Python Pandas DataFrame ohne Code erstellen?

PyGWalker ist eine Python‑Bibliothek für Exploratory Data Analysis mit Visualisierung. PyGWalker (opens in a new tab) kann deinen Data‑Analysis‑ und Data‑Visualization‑Workflow in Jupyter Notebooks vereinfachen, indem er deinen pandas dataframe (und polars dataframe) in ein tableau‑alternatives User Interface für visuelle Exploration verwandelt.

PyGWalker for Data visualization (opens in a new tab)

CSV vs Parquet im Überblick

ThemaCSVParquet
SchemaWird inferiert, falls nicht angegeben; Typ-Drift häufigSchema im File gespeichert; stabile Typen
KompressionStandardmäßig keine; größere DateienSpaltenbasiert + Kompression; kleiner, schneller
LesegeschwindigkeitZeilenbasiert; langsamer bei breiten ScansSpaltenbasiert; schnellere Spaltenselektion
Am besten geeignet fürAustausch mit externen ToolsAnalytics, wiederholte Reads, partitionierte Data Lakes

Quickstart‑Setup

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("io-guide").getOrCreate()

CSV mit Kontrolle lesen

df_csv = (
    spark.read
    .option("header", True)
    .option("inferSchema", False)
    .schema("id INT, name STRING, ts TIMESTAMP")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .csv("s3://bucket/raw/users/*.csv")
)
  • header liest Spaltennamen aus der ersten Zeile.
  • Gib ein Schema an, um Typüberraschungen zu vermeiden und Reads zu beschleunigen.
  • mode‑Optionen: PERMISSIVE (Standard), DROPMALFORMED, FAILFAST.

Parquet lesen

df_parquet = spark.read.parquet("s3://bucket/curated/users/")
  • Parquet speichert Schema und Statistiken; inferSchema ist überflüssig.
  • Column Pruning und Predicate Pushdown verbessern die Performance.

CSV sicher schreiben

(
    df_csv
    .repartition(1)  # reduce output shards when needed
    .write
    .option("header", True)
    .option("delimiter", ",")
    .mode("overwrite")
    .csv("s3://bucket/exports/users_csv")
)
  • Nutze repartition, um die Anzahl der Dateien zu steuern; vermeide einzelne riesige Files.
  • Wähle mode mit Bedacht (siehe Tabelle unten).

Parquet mit Partitionen schreiben

(
    df_parquet
    .write
    .mode("append")
    .partitionBy("country", "ingest_date")
    .parquet("s3://bucket/warehouse/users_parquet")
)
  • partitionBy erzeugt Verzeichnisse für häufige Filter und beschleunigt Abfragen.
  • Halte Partition Keys niedrig kardinal, um viele sehr kleine Dateien zu vermeiden.

Verhalten der Save Modes

modeVerhalten
error / errorifexistsStandard; schlägt fehl, wenn Pfad existiert
overwriteErsetzt bestehende Daten am Pfad
appendFügt neue Dateien zu einem bestehenden Pfad hinzu
ignoreÜberspringt den Write, wenn der Pfad existiert

Tipps zur Schema‑Evolution

  • Bevorzuge Parquet, wenn Schema‑Evolution zu erwarten ist; nutze mergeSchema für kompatible Änderungen.
  • Für CSV: Versioniere Schemas explizit und validiere Spalten vor dem Schreiben.
  • Beim Hinzufügen von Spalten: Schreibe Parquet mit dem neuen Schema und stelle sicher, dass Reader die aktuellste Table‑Definition verwenden.

Häufige Fallstricke und Abhilfen

  • Falsche Delimiter: setze delimiter explizit; verlasse dich nicht auf Defaults.
  • Falsche Datentypen: gib schema anstatt inferSchema=True in produktiven Umgebungen.
  • Zu viele kleine Dateien: coalesce oder repartition vor dem Write; erwäge maxRecordsPerFile.
  • Unbeabsichtigtes Overwrite: schreibe zunächst in einen temporären Pfad und verschiebe anschließend atomar, sofern dein Storage‑Layer das unterstützt.

Minimales End‑to‑End‑Rezept

schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
 
df = (
    spark.read
    .option("header", True)
    .schema(schema)
    .csv("s3://bucket/raw/events/*.csv")
)
 
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
 
(
    cleaned
    .write
    .mode("append")
    .partitionBy("country")
    .parquet("s3://bucket/warehouse/events_parquet")
)