PySpark Lecture et Écriture CSV et Parquet : Guide d’E/S Fiables
Updated on
Les projets data se bloquent lorsque les fichiers sont mal chargés, que les schémas dérivent ou que les écritures écrasent des données valides. PySpark résout ces problèmes avec des options de lecture/écriture cohérentes entre CSV et Parquet, mais les valeurs par défaut peuvent surprendre les équipes qui gèrent des jeux de données variés.
Les API DataFrameReader / DataFrameWriter de PySpark offrent une ingestion et une exportation prévisibles. En choisissant le bon format et le bon mode de sauvegarde, en ajoutant du contrôle de schéma et en partitionnant intelligemment, les équipes gardent des pipelines stables et performants.
Vous voulez créer rapidement des visualisations de données à partir d’un DataFrame pandas Python sans coder ?
PyGWalker est une bibliothèque Python pour l’analyse exploratoire de données avec visualisation. PyGWalker (opens in a new tab) peut simplifier votre flux de travail d’analyse et de visualisation de données dans Jupyter Notebook, en transformant votre pandas dataframe (et polars dataframe) en une interface utilisateur alternative à Tableau pour l’exploration visuelle.
CSV vs Parquet en un coup d’œil
| Sujet | CSV | Parquet |
|---|---|---|
| Schéma | Inféré sauf si fourni ; dérive de types fréquente | Schéma stocké dans le fichier ; types stables |
| Compression | Aucune par défaut ; fichiers plus volumineux | Colonnaire + compression ; plus petit, plus rapide |
| Vitesse de lecture | Orienté lignes ; plus lent pour les larges scans | Colonnaire ; sélection de colonnes plus rapide |
| Idéal pour | Échange avec des outils externes | Analytics, lectures répétées, data lakes partitionnés |
Mise en place rapide
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("io-guide").getOrCreate()Lecture de CSV avec contrôle
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", False)
.schema("id INT, name STRING, ts TIMESTAMP")
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.csv("s3://bucket/raw/users/*.csv")
)headerlit les noms de colonnes depuis la première ligne.- Fournissez un schéma pour éviter les surprises de type et accélérer les lectures.
- Choix pour
mode:PERMISSIVE(par défaut),DROPMALFORMED,FAILFAST.
Lecture de Parquet
df_parquet = spark.read.parquet("s3://bucket/curated/users/")- Parquet stocke le schéma et des statistiques ;
inferSchemaest inutile. - Le « column pruning » et le « predicate pushdown » améliorent les performances.
Écriture de CSV en toute sécurité
(
df_csv
.repartition(1) # reduce output shards when needed
.write
.option("header", True)
.option("delimiter", ",")
.mode("overwrite")
.csv("s3://bucket/exports/users_csv")
)- Utilisez
repartitionpour contrôler le nombre de fichiers ; évitez un unique fichier énorme. - Choisissez
modeavec soin (voir le tableau ci-dessous).
Écriture de Parquet avec partitions
(
df_parquet
.write
.mode("append")
.partitionBy("country", "ingest_date")
.parquet("s3://bucket/warehouse/users_parquet")
)partitionBycrée des répertoires pour les filtres fréquents afin d’accélérer les requêtes.- Gardez des clés de partition à faible cardinalité pour éviter la prolifération de petits fichiers.
Comportement des modes de sauvegarde
| mode | Comportement |
|---|---|
error / errorifexists | Par défaut ; échec si le chemin existe |
overwrite | Remplace les données existantes au chemin indiqué |
append | Ajoute de nouveaux fichiers au chemin existant |
ignore | Ignore l’écriture si le chemin existe |
Conseils pour l’évolution de schéma
- Préférez Parquet lorsque l’évolution de schéma est attendue ; exploitez
mergeSchemapour des mises à jour compatibles. - Pour CSV, gérez explicitement des versions de schémas et validez les colonnes avant les écritures.
- Lors de l’ajout de colonnes, écrivez en Parquet avec le nouveau schéma et assurez-vous que les lecteurs utilisent la dernière définition de table.
Pièges fréquents et correctifs
- Délimiteurs non conformes : définissez
delimiterexplicitement ; ne comptez pas sur les valeurs par défaut. - Types incorrects : fournissez un
schemaplutôt queinferSchema=Trueen production. - Trop de petits fichiers : utilisez
coalesceourepartitionavant l’écriture ; envisagezmaxRecordsPerFile. - Écrasement accidentel : écrivez d’abord vers un chemin temporaire, puis déplacez de façon atomique si votre couche de stockage le permet.
Recette minimale de bout en bout
schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
df = (
spark.read
.option("header", True)
.schema(schema)
.csv("s3://bucket/raw/events/*.csv")
)
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
(
cleaned
.write
.mode("append")
.partitionBy("country")
.parquet("s3://bucket/warehouse/events_parquet")
)