PySpark CSV und Parquet lesen und schreiben: Zuverlässiger IO-Guide
Updated on
Datenprojekte geraten ins Stocken, wenn Dateien falsch geladen werden, Schemas abweichen oder Schreibvorgänge gute Daten überschreiben. PySpark löst diese Probleme mit konsistenten Lese-/Schreiboptionen für CSV und Parquet, aber die Standardwerte können Teams überraschen, die mit heterogenen Datensätzen arbeiten.
Die DataFrameReader/DataFrameWriter‑APIs von PySpark bieten vorhersehbare Ingestion und Export. Durch die Wahl des passenden Formats und Save Modes, ergänzt um Schemakontrolle und sinnvolle Partitionierung, bleiben Pipelines stabil und performant.
Willst du schnell Datenvisualisierung aus einem Python Pandas DataFrame ohne Code erstellen?
PyGWalker ist eine Python‑Bibliothek für Exploratory Data Analysis mit Visualisierung. PyGWalker (opens in a new tab) kann deinen Data‑Analysis‑ und Data‑Visualization‑Workflow in Jupyter Notebooks vereinfachen, indem er deinen pandas dataframe (und polars dataframe) in ein tableau‑alternatives User Interface für visuelle Exploration verwandelt.
CSV vs Parquet im Überblick
| Thema | CSV | Parquet |
|---|---|---|
| Schema | Wird inferiert, falls nicht angegeben; Typ-Drift häufig | Schema im File gespeichert; stabile Typen |
| Kompression | Standardmäßig keine; größere Dateien | Spaltenbasiert + Kompression; kleiner, schneller |
| Lesegeschwindigkeit | Zeilenbasiert; langsamer bei breiten Scans | Spaltenbasiert; schnellere Spaltenselektion |
| Am besten geeignet für | Austausch mit externen Tools | Analytics, wiederholte Reads, partitionierte Data Lakes |
Quickstart‑Setup
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("io-guide").getOrCreate()CSV mit Kontrolle lesen
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", False)
.schema("id INT, name STRING, ts TIMESTAMP")
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.csv("s3://bucket/raw/users/*.csv")
)headerliest Spaltennamen aus der ersten Zeile.- Gib ein Schema an, um Typüberraschungen zu vermeiden und Reads zu beschleunigen.
mode‑Optionen:PERMISSIVE(Standard),DROPMALFORMED,FAILFAST.
Parquet lesen
df_parquet = spark.read.parquet("s3://bucket/curated/users/")- Parquet speichert Schema und Statistiken;
inferSchemaist überflüssig. - Column Pruning und Predicate Pushdown verbessern die Performance.
CSV sicher schreiben
(
df_csv
.repartition(1) # reduce output shards when needed
.write
.option("header", True)
.option("delimiter", ",")
.mode("overwrite")
.csv("s3://bucket/exports/users_csv")
)- Nutze
repartition, um die Anzahl der Dateien zu steuern; vermeide einzelne riesige Files. - Wähle
modemit Bedacht (siehe Tabelle unten).
Parquet mit Partitionen schreiben
(
df_parquet
.write
.mode("append")
.partitionBy("country", "ingest_date")
.parquet("s3://bucket/warehouse/users_parquet")
)partitionByerzeugt Verzeichnisse für häufige Filter und beschleunigt Abfragen.- Halte Partition Keys niedrig kardinal, um viele sehr kleine Dateien zu vermeiden.
Verhalten der Save Modes
| mode | Verhalten |
|---|---|
error / errorifexists | Standard; schlägt fehl, wenn Pfad existiert |
overwrite | Ersetzt bestehende Daten am Pfad |
append | Fügt neue Dateien zu einem bestehenden Pfad hinzu |
ignore | Überspringt den Write, wenn der Pfad existiert |
Tipps zur Schema‑Evolution
- Bevorzuge Parquet, wenn Schema‑Evolution zu erwarten ist; nutze
mergeSchemafür kompatible Änderungen. - Für CSV: Versioniere Schemas explizit und validiere Spalten vor dem Schreiben.
- Beim Hinzufügen von Spalten: Schreibe Parquet mit dem neuen Schema und stelle sicher, dass Reader die aktuellste Table‑Definition verwenden.
Häufige Fallstricke und Abhilfen
- Falsche Delimiter: setze
delimiterexplizit; verlasse dich nicht auf Defaults. - Falsche Datentypen: gib
schemaanstattinferSchema=Truein produktiven Umgebungen. - Zu viele kleine Dateien:
coalesceoderrepartitionvor dem Write; erwägemaxRecordsPerFile. - Unbeabsichtigtes Overwrite: schreibe zunächst in einen temporären Pfad und verschiebe anschließend atomar, sofern dein Storage‑Layer das unterstützt.
Minimales End‑to‑End‑Rezept
schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
df = (
spark.read
.option("header", True)
.schema(schema)
.csv("s3://bucket/raw/events/*.csv")
)
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
(
cleaned
.write
.mode("append")
.partitionBy("country")
.parquet("s3://bucket/warehouse/events_parquet")
)