PySpark 读取与写入 CSV 和 Parquet:可靠 IO 指南
Updated on
当文件加载错误、schema 漂移或写入覆盖了有效数据时,数据项目就会停滞不前。PySpark 通过在 CSV 和 Parquet 间提供一致的读写选项来解决这些问题,但对于处理多种数据集的团队来说,默认行为有时会带来意外。
PySpark 的 DataFrameReader / DataFrameWriter API 为数据的导入与导出提供了可预期的行为。通过选择合适的格式与保存模式、增加 schema 控制、并合理分区,团队能够让数据管道保持稳定且高效。
想在不写代码的情况下,从 Python Pandas DataFrame 快速创建数据可视化?
PyGWalker 是一个用于可视化探索性数据分析的 Python 库。PyGWalker (opens in a new tab) 可以简化你在 Jupyter Notebook 中的数据分析和可视化工作流,把 pandas dataframe(以及 polars dataframe)转换成类似 tableau 的可视化交互界面。
CSV 与 Parquet 对比速览
| Topic | CSV | Parquet |
|---|---|---|
| Schema | 默认推断;类型漂移常见 | Schema 存在文件中;类型稳定 |
| Compression | 默认无压缩;文件更大 | 列式存储 + 压缩;更小更快 |
| Read speed | 行式读取;宽表扫描较慢 | 列式;按列读取更快 |
| Best for | 与外部工具交换数据 | 分析型场景、重复读取、分区数据湖 |
快速开始环境
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("io-guide").getOrCreate()受控地读取 CSV
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", False)
.schema("id INT, name STRING, ts TIMESTAMP")
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.csv("s3://bucket/raw/users/*.csv")
)header表示从首行读取列名。- 显式提供 schema,可以避免类型意外并加快读取。
mode可选值:PERMISSIVE(默认)、DROPMALFORMED、FAILFAST。
读取 Parquet
df_parquet = spark.read.parquet("s3://bucket/curated/users/")- Parquet 内部存储 schema 和统计信息;无需
inferSchema。 - 列裁剪(column pruning)与谓词下推(predicate pushdown)可以显著提升性能。
安全地写入 CSV
(
df_csv
.repartition(1) # 在确有需要时减少输出分片数
.write
.option("header", True)
.option("delimiter", ",")
.mode("overwrite")
.csv("s3://bucket/exports/users_csv")
)- 使用
repartition控制输出文件数量;避免单个超大文件。 - 谨慎选择
mode(见下表)。
带分区写入 Parquet
(
df_parquet
.write
.mode("append")
.partitionBy("country", "ingest_date")
.parquet("s3://bucket/warehouse/users_parquet")
)partitionBy会根据常用过滤字段创建目录,从而加速查询。- 分区键应保持低基数,避免产生过多的小文件。
Save mode 行为对照
| mode | Behavior |
|---|---|
error / errorifexists | 默认;若路径已存在则失败 |
overwrite | 替换该路径下已有数据 |
append | 在已有路径下追加新文件 |
ignore | 若路径已存在则跳过写入 |
Schema 演进实践建议
- 当预期会有 schema 演进时,更倾向使用 Parquet;可利用
mergeSchema来支持兼容性更新。 - 对于 CSV,应显式管理 schema 版本,并在写入前校验列。
- 当新增列时,使用新 schema 写入 Parquet,并确保下游读取使用最新的表定义。
常见坑与解决方法
- 分隔符不一致:显式设置
delimiter,不要依赖默认值。 - 类型错误:生产环境中应提供
schema,而不是使用inferSchema=True。 - 小文件过多:在写入前使用
coalesce或repartition;必要时设置maxRecordsPerFile。 - 误删/误覆盖数据:先写入临时路径,再在存储层支持的情况下以原子方式移动到正式路径。
简洁端到端示例
schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
df = (
spark.read
.option("header", True)
.schema(schema)
.csv("s3://bucket/raw/events/*.csv")
)
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
(
cleaned
.write
.mode("append")
.partitionBy("country")
.parquet("s3://bucket/warehouse/events_parquet")
)