PySpark Leer y Escribir CSV y Parquet: Guía de IO Confiable
Updated on
Los proyectos de datos se detienen cuando los archivos se cargan de forma incorrecta, los esquemas cambian inesperadamente o las escrituras sobrescriben datos válidos. PySpark resuelve estos problemas con opciones de lectura/escritura consistentes entre CSV y Parquet, pero los valores por defecto pueden sorprender a equipos que manejan conjuntos de datos variados.
Las APIs DataFrameReader / DataFrameWriter de PySpark proporcionan una ingesta y exportación predecibles. Al elegir el formato y el modo de guardado adecuados, añadir control de esquema y particionar de forma inteligente, los equipos mantienen las canalizaciones estables y con buen rendimiento.
¿Quieres crear visualizaciones de datos rápidamente desde un DataFrame de Python Pandas sin escribir código?
PyGWalker es una biblioteca de Python para Análisis Exploratorio de Datos con visualización. PyGWalker (opens in a new tab) puede simplificar tu flujo de trabajo de análisis y visualización de datos en Jupyter Notebook, convirtiendo tu pandas dataframe (y polars dataframe) en una interfaz de usuario alternativa a Tableau para exploración visual.
CSV vs Parquet de un vistazo
| Tema | CSV | Parquet |
|---|---|---|
| Esquema | Inferido a menos que se proporcione; deriva de tipos común | Esquema almacenado en el archivo; tipos estables |
| Compresión | Ninguna por defecto; archivos más grandes | Columnar + compresión; más pequeño y rápido |
| Velocidad de lectura | Basado en filas; más lento para lecturas amplias | Columnar; selección de columnas más rápida |
| Mejor para | Intercambio con herramientas externas | Analítica, lecturas repetidas, data lakes particionados |
Configuración rápida
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("io-guide").getOrCreate()Lectura de CSV con control
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", False)
.schema("id INT, name STRING, ts TIMESTAMP")
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.csv("s3://bucket/raw/users/*.csv")
)headerlee los nombres de columnas de la primera fila.- Proporciona un esquema para evitar sorpresas de tipos y acelerar las lecturas.
- Opciones de
mode:PERMISSIVE(por defecto),DROPMALFORMED,FAILFAST.
Lectura de Parquet
df_parquet = spark.read.parquet("s3://bucket/curated/users/")- Parquet almacena esquema y estadísticas;
inferSchemano es necesario. - El recorte de columnas (column pruning) y el predicate pushdown mejoran el rendimiento.
Escritura de CSV de forma segura
(
df_csv
.repartition(1) # reduce output shards when needed
.write
.option("header", True)
.option("delimiter", ",")
.mode("overwrite")
.csv("s3://bucket/exports/users_csv")
)- Usa
repartitionpara controlar el número de archivos; evita archivos únicos y gigantes. - Elige el
modecuidadosamente (ver tabla abajo).
Escritura de Parquet con particiones
(
df_parquet
.write
.mode("append")
.partitionBy("country", "ingest_date")
.parquet("s3://bucket/warehouse/users_parquet")
)partitionBycrea directorios para filtros comunes y acelera las consultas.- Mantén las claves de partición con baja cardinalidad para evitar muchos archivos pequeños.
Comportamientos de los modos de guardado
| mode | Comportamiento |
|---|---|
error / errorifexists | Por defecto; falla si la ruta existe |
overwrite | Reemplaza los datos existentes en la ruta |
append | Añade nuevos archivos a la ruta existente |
ignore | Omite la escritura si la ruta existe |
Consejos para evolución de esquemas
- Prefiere Parquet cuando se espera evolución de esquema; aprovecha
mergeSchemapara actualizaciones compatibles. - Para CSV, versiona los esquemas explícitamente y valida las columnas antes de escribir.
- Al añadir columnas, escribe Parquet con el nuevo esquema y asegúrate de que los lectores utilicen la definición de tabla más reciente.
Errores comunes y cómo solucionarlos
- Delimitadores que no coinciden: establece
delimiterexplícitamente; evita confiar en los valores por defecto. - Tipos incorrectos: proporciona
schemaen lugar deinferSchema=Trueen entornos de producción. - Demasiados archivos pequeños: usa
coalesceorepartitionantes de escribir; consideramaxRecordsPerFile. - Sobrescrituras accidentales: publica salidas en una ruta temporal y luego muévela de forma atómica si tu capa de almacenamiento lo permite.
Receta mínima de punta a punta
schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
df = (
spark.read
.option("header", True)
.schema(schema)
.csv("s3://bucket/raw/events/*.csv")
)
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
(
cleaned
.write
.mode("append")
.partitionBy("country")
.parquet("s3://bucket/warehouse/events_parquet")
)