Skip to content

PySpark 读取与写入 CSV 和 Parquet:可靠 IO 指南

Updated on

当文件加载错误、schema 漂移或写入覆盖了有效数据时,数据项目就会停滞不前。PySpark 通过在 CSV 和 Parquet 间提供一致的读写选项来解决这些问题,但对于处理多种数据集的团队来说,默认行为有时会带来意外。

PySpark 的 DataFrameReader / DataFrameWriter API 为数据的导入与导出提供了可预期的行为。通过选择合适的格式与保存模式、增加 schema 控制、并合理分区,团队能够让数据管道保持稳定且高效。

想在不写代码的情况下,从 Python Pandas DataFrame 快速创建数据可视化?

PyGWalker 是一个用于可视化探索性数据分析的 Python 库。PyGWalker (opens in a new tab) 可以简化你在 Jupyter Notebook 中的数据分析和可视化工作流,把 pandas dataframe(以及 polars dataframe)转换成类似 tableau 的可视化交互界面。

PyGWalker for Data visualization (opens in a new tab)

CSV 与 Parquet 对比速览

TopicCSVParquet
Schema默认推断;类型漂移常见Schema 存在文件中;类型稳定
Compression默认无压缩;文件更大列式存储 + 压缩;更小更快
Read speed行式读取;宽表扫描较慢列式;按列读取更快
Best for与外部工具交换数据分析型场景、重复读取、分区数据湖

快速开始环境

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("io-guide").getOrCreate()

受控地读取 CSV

df_csv = (
    spark.read
    .option("header", True)
    .option("inferSchema", False)
    .schema("id INT, name STRING, ts TIMESTAMP")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .csv("s3://bucket/raw/users/*.csv")
)
  • header 表示从首行读取列名。
  • 显式提供 schema,可以避免类型意外并加快读取。
  • mode 可选值:PERMISSIVE(默认)、DROPMALFORMEDFAILFAST

读取 Parquet

df_parquet = spark.read.parquet("s3://bucket/curated/users/")
  • Parquet 内部存储 schema 和统计信息;无需 inferSchema
  • 列裁剪(column pruning)与谓词下推(predicate pushdown)可以显著提升性能。

安全地写入 CSV

(
    df_csv
    .repartition(1)  # 在确有需要时减少输出分片数
    .write
    .option("header", True)
    .option("delimiter", ",")
    .mode("overwrite")
    .csv("s3://bucket/exports/users_csv")
)
  • 使用 repartition 控制输出文件数量;避免单个超大文件。
  • 谨慎选择 mode(见下表)。

带分区写入 Parquet

(
    df_parquet
    .write
    .mode("append")
    .partitionBy("country", "ingest_date")
    .parquet("s3://bucket/warehouse/users_parquet")
)
  • partitionBy 会根据常用过滤字段创建目录,从而加速查询。
  • 分区键应保持低基数,避免产生过多的小文件。

Save mode 行为对照

modeBehavior
error / errorifexists默认;若路径已存在则失败
overwrite替换该路径下已有数据
append在已有路径下追加新文件
ignore若路径已存在则跳过写入

Schema 演进实践建议

  • 当预期会有 schema 演进时,更倾向使用 Parquet;可利用 mergeSchema 来支持兼容性更新。
  • 对于 CSV,应显式管理 schema 版本,并在写入前校验列。
  • 当新增列时,使用新 schema 写入 Parquet,并确保下游读取使用最新的表定义。

常见坑与解决方法

  • 分隔符不一致:显式设置 delimiter,不要依赖默认值。
  • 类型错误:生产环境中应提供 schema,而不是使用 inferSchema=True
  • 小文件过多:在写入前使用 coalescerepartition;必要时设置 maxRecordsPerFile
  • 误删/误覆盖数据:先写入临时路径,再在存储层支持的情况下以原子方式移动到正式路径。

简洁端到端示例

schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
 
df = (
    spark.read
    .option("header", True)
    .schema(schema)
    .csv("s3://bucket/raw/events/*.csv")
)
 
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
 
(
    cleaned
    .write
    .mode("append")
    .partitionBy("country")
    .parquet("s3://bucket/warehouse/events_parquet")
)