PySpark での CSV および Parquet の読み書き: 信頼できる IO ガイド
Updated on
ファイルの読み込みが正しく行われなかったり、スキーマが変化したり、書き込みが既存の良質なデータを上書きしてしまうと、データプロジェクトは簡単に行き詰まります。PySpark は CSV と Parquet に共通の読み書きオプションを提供することで、こうした問題を解決しますが、デフォルト設定は多様なデータセットを扱うチームをしばしば驚かせます。
PySpark の DataFrameReader / DataFrameWriter API は予測可能な取り込みとエクスポートを提供します。適切なフォーマットと保存モードを選び、スキーマ制御を加え、賢くパーティション分割することで、パイプラインを安定かつ高パフォーマンスに保てます。
Python Pandas DataFrame からノーコードで素早くデータ可視化を行いたいですか?
PyGWalker は可視化を伴う Exploratory Data Analysis のための Python ライブラリです。PyGWalker (opens in a new tab) は、pandas dataframe(および polars dataframe)を、可視的に探索できる tableau 代替 UI に変換することで、Jupyter Notebook におけるデータ分析とデータ可視化のワークフローをシンプルにします。
CSV と Parquet の比較(概要)
| Topic | CSV | Parquet |
|---|---|---|
| Schema | 指定しない限り推論される。型のブレが起きやすい | スキーマはファイルに保存される。型が安定 |
| Compression | デフォルトではなし。ファイルサイズが大きくなりがち | カラム型 + 圧縮。より小さく、高速 |
| Read speed | 行ベース。広いスキャンには遅め | カラム型。列選択に強く高速 |
| Best for | 外部ツールとのデータ受け渡し | 分析、繰り返し読み込み、パーティション化されたデータレイク |
クイックスタートセットアップ
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("io-guide").getOrCreate()CSV 読み込みを制御する
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", False)
.schema("id INT, name STRING, ts TIMESTAMP")
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.csv("s3://bucket/raw/users/*.csv")
)headerは 1 行目からカラム名を読み込みます。- スキーマを明示すると、想定外の型を避けつつ読み込みを高速化できます。
modeの選択肢:PERMISSIVE(デフォルト)、DROPMALFORMED,FAILFAST。
Parquet の読み込み
df_parquet = spark.read.parquet("s3://bucket/curated/users/")- Parquet はスキーマと統計情報を保持するため、
inferSchemaは不要です。 - カラムプルーニングと述語プッシュダウンによりパフォーマンスが向上します。
CSV を安全に書き込む
(
df_csv
.repartition(1) # 必要に応じて出力ファイル数を減らす
.write
.option("header", True)
.option("delimiter", ",")
.mode("overwrite")
.csv("s3://bucket/exports/users_csv")
)repartitionでファイル数を制御します。巨大な 1 ファイルにまとめるのは避けましょう。modeは注意して選びます(下表を参照)。
Parquet をパーティション付きで書き込む
(
df_parquet
.write
.mode("append")
.partitionBy("country", "ingest_date")
.parquet("s3://bucket/warehouse/users_parquet")
)partitionByは、よくフィルターに使うカラムごとにディレクトリを作成し、クエリを高速化します。- パーティションキーは、カーディナリティが低いものを使い、細かすぎるファイルが大量にできるのを避けます。
Save mode の挙動
| mode | Behavior |
|---|---|
error / errorifexists | デフォルト。パスが既に存在する場合は失敗 |
overwrite | そのパスにある既存データを置き換える |
append | 既存パスにファイルを追加 |
ignore | パスが存在する場合は書き込みをスキップ |
スキーマ進化のためのヒント
- スキーマの進化が見込まれる場合は Parquet を優先し、互換な更新には
mergeSchemaを活用します。 - CSV の場合はスキーマを明示的にバージョン管理し、書き込み前にカラムを検証します。
- カラムを追加する際は、新しいスキーマで Parquet に書き込み、リーダー側が最新のテーブル定義を使うことを保証します。
よくある落とし穴と対策
- 区切り文字の不一致:
delimiterを明示的に設定し、デフォルトに依存しないようにします。 - 型の不整合: 本番では
inferSchema=Trueではなく、schemaを指定します。 - 多数の小さいファイル: 書き込み前に
coalesceやrepartitionを行い、必要に応じてmaxRecordsPerFileを検討します。 - 誤った overwrite: 一時パスに出力してから、ストレージレイヤーが対応していればアトミックな move で本番パスに切り替えます。
最小限のエンドツーエンドレシピ
schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
df = (
spark.read
.option("header", True)
.schema(schema)
.csv("s3://bucket/raw/events/*.csv")
)
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
(
cleaned
.write
.mode("append")
.partitionBy("country")
.parquet("s3://bucket/warehouse/events_parquet")
)