Skip to content

PySpark Lecture et Écriture CSV et Parquet : Guide d’E/S Fiables

Updated on

Les projets data se bloquent lorsque les fichiers sont mal chargés, que les schémas dérivent ou que les écritures écrasent des données valides. PySpark résout ces problèmes avec des options de lecture/écriture cohérentes entre CSV et Parquet, mais les valeurs par défaut peuvent surprendre les équipes qui gèrent des jeux de données variés.

Les API DataFrameReader / DataFrameWriter de PySpark offrent une ingestion et une exportation prévisibles. En choisissant le bon format et le bon mode de sauvegarde, en ajoutant du contrôle de schéma et en partitionnant intelligemment, les équipes gardent des pipelines stables et performants.

Vous voulez créer rapidement des visualisations de données à partir d’un DataFrame pandas Python sans coder ?

PyGWalker est une bibliothèque Python pour l’analyse exploratoire de données avec visualisation. PyGWalker (opens in a new tab) peut simplifier votre flux de travail d’analyse et de visualisation de données dans Jupyter Notebook, en transformant votre pandas dataframe (et polars dataframe) en une interface utilisateur alternative à Tableau pour l’exploration visuelle.

PyGWalker for Data visualization (opens in a new tab)

CSV vs Parquet en un coup d’œil

SujetCSVParquet
SchémaInféré sauf si fourni ; dérive de types fréquenteSchéma stocké dans le fichier ; types stables
CompressionAucune par défaut ; fichiers plus volumineuxColonnaire + compression ; plus petit, plus rapide
Vitesse de lectureOrienté lignes ; plus lent pour les larges scansColonnaire ; sélection de colonnes plus rapide
Idéal pourÉchange avec des outils externesAnalytics, lectures répétées, data lakes partitionnés

Mise en place rapide

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("io-guide").getOrCreate()

Lecture de CSV avec contrôle

df_csv = (
    spark.read
    .option("header", True)
    .option("inferSchema", False)
    .schema("id INT, name STRING, ts TIMESTAMP")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .csv("s3://bucket/raw/users/*.csv")
)
  • header lit les noms de colonnes depuis la première ligne.
  • Fournissez un schéma pour éviter les surprises de type et accélérer les lectures.
  • Choix pour mode : PERMISSIVE (par défaut), DROPMALFORMED, FAILFAST.

Lecture de Parquet

df_parquet = spark.read.parquet("s3://bucket/curated/users/")
  • Parquet stocke le schéma et des statistiques ; inferSchema est inutile.
  • Le « column pruning » et le « predicate pushdown » améliorent les performances.

Écriture de CSV en toute sécurité

(
    df_csv
    .repartition(1)  # reduce output shards when needed
    .write
    .option("header", True)
    .option("delimiter", ",")
    .mode("overwrite")
    .csv("s3://bucket/exports/users_csv")
)
  • Utilisez repartition pour contrôler le nombre de fichiers ; évitez un unique fichier énorme.
  • Choisissez mode avec soin (voir le tableau ci-dessous).

Écriture de Parquet avec partitions

(
    df_parquet
    .write
    .mode("append")
    .partitionBy("country", "ingest_date")
    .parquet("s3://bucket/warehouse/users_parquet")
)
  • partitionBy crée des répertoires pour les filtres fréquents afin d’accélérer les requêtes.
  • Gardez des clés de partition à faible cardinalité pour éviter la prolifération de petits fichiers.

Comportement des modes de sauvegarde

modeComportement
error / errorifexistsPar défaut ; échec si le chemin existe
overwriteRemplace les données existantes au chemin indiqué
appendAjoute de nouveaux fichiers au chemin existant
ignoreIgnore l’écriture si le chemin existe

Conseils pour l’évolution de schéma

  • Préférez Parquet lorsque l’évolution de schéma est attendue ; exploitez mergeSchema pour des mises à jour compatibles.
  • Pour CSV, gérez explicitement des versions de schémas et validez les colonnes avant les écritures.
  • Lors de l’ajout de colonnes, écrivez en Parquet avec le nouveau schéma et assurez-vous que les lecteurs utilisent la dernière définition de table.

Pièges fréquents et correctifs

  • Délimiteurs non conformes : définissez delimiter explicitement ; ne comptez pas sur les valeurs par défaut.
  • Types incorrects : fournissez un schema plutôt que inferSchema=True en production.
  • Trop de petits fichiers : utilisez coalesce ou repartition avant l’écriture ; envisagez maxRecordsPerFile.
  • Écrasement accidentel : écrivez d’abord vers un chemin temporaire, puis déplacez de façon atomique si votre couche de stockage le permet.

Recette minimale de bout en bout

schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
 
df = (
    spark.read
    .option("header", True)
    .schema(schema)
    .csv("s3://bucket/raw/events/*.csv")
)
 
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
 
(
    cleaned
    .write
    .mode("append")
    .partitionBy("country")
    .parquet("s3://bucket/warehouse/events_parquet")
)