PySpark: Ler e Escrever CSV e Parquet – Guia Confiável de IO
Updated on
Projetos de dados travam quando arquivos são carregados de forma incorreta, schemas mudam silenciosamente ou gravações sobrescrevem dados válidos. PySpark resolve esses problemas com opções consistentes de leitura/escrita para CSV e Parquet, mas os padrões podem surpreender times que lidam com conjuntos de dados variados.
As APIs DataFrameReader/DataFrameWriter do PySpark oferecem ingestão e exportação previsíveis. Ao escolher o formato e o modo de gravação corretos, adicionar controle de schema e particionar de forma inteligente, as equipes mantêm pipelines estáveis e performáticos.
Quer criar rapidamente visualizações de dados a partir de um Pandas DataFrame em Python sem escrever código?
PyGWalker é uma biblioteca Python para Análise Exploratória de Dados com Visualização. O PyGWalker (opens in a new tab) pode simplificar seu fluxo de análise e visualização de dados no Jupyter Notebook, transformando seu pandas dataframe (e polars dataframe) em uma interface de usuário ao estilo tableau para exploração visual.
CSV vs Parquet em resumo
| Tópico | CSV | Parquet |
|---|---|---|
| Schema | Inferido, a menos que fornecido; derivações de tipo são comuns | Schema armazenado no arquivo; tipos estáveis |
| Compressão | Nenhuma por padrão; arquivos maiores | Colunar + compressão; menor tamanho, mais rápido |
| Velocidade de leitura | Baseado em linhas; mais lento para leituras amplas | Colunar; seleção de colunas mais rápida |
| Melhor uso | Intercâmbio com ferramentas externas | Analytics, leituras repetidas, data lakes particionados |
Configuração rápida
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("io-guide").getOrCreate()Lendo CSV com controle
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", False)
.schema("id INT, name STRING, ts TIMESTAMP")
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.csv("s3://bucket/raw/users/*.csv")
)headerlê os nomes das colunas da primeira linha.- Forneça um schema para evitar surpresas de tipos e acelerar leituras.
- Opções de
mode:PERMISSIVE(padrão),DROPMALFORMED,FAILFAST.
Lendo Parquet
df_parquet = spark.read.parquet("s3://bucket/curated/users/")- Parquet armazena schema e estatísticas;
inferSchemaé desnecessário. - Column pruning e predicate pushdown melhoram a performance.
Escrevendo CSV com segurança
(
df_csv
.repartition(1) # reduce output shards when needed
.write
.option("header", True)
.option("delimiter", ",")
.mode("overwrite")
.csv("s3://bucket/exports/users_csv")
)- Use
repartitionpara controlar a quantidade de arquivos; evite um único arquivo gigantesco. - Escolha o
modecom cuidado (veja tabela abaixo).
Escrevendo Parquet com particionamento
(
df_parquet
.write
.mode("append")
.partitionBy("country", "ingest_date")
.parquet("s3://bucket/warehouse/users_parquet")
)partitionBycria diretórios para filtros comuns, acelerando consultas.- Mantenha chaves de partição com baixa cardinalidade para evitar muitos arquivos pequenos.
Comportamento dos modos de gravação
| mode | Comportamento |
|---|---|
error / errorifexists | Padrão; falha se o caminho já existir |
overwrite | Substitui os dados existentes no caminho |
append | Adiciona novos arquivos ao caminho existente |
ignore | Ignora a gravação se o caminho já existir |
Dicas para evolução de schema
- Prefira Parquet quando a evolução de schema é esperada; use
mergeSchemapara atualizações compatíveis. - Para CSV, versione schemas explicitamente e valide colunas antes de gravar.
- Ao adicionar colunas, escreva Parquet com o novo schema e garanta que os consumidores usem a definição de tabela mais recente.
Armadilhas comuns e correções
- Delimitadores incorretos: defina
delimiterexplicitamente; não dependa dos padrões. - Tipos errados: forneça
schemaem vez deinferSchema=Trueem produção. - Muitos arquivos pequenos: use
coalesceourepartitionantes da escrita; consideremaxRecordsPerFile. - Acidentes com overwrite: grave primeiro em um caminho temporário e depois mova de forma atômica, se sua camada de armazenamento suportar.
Receita mínima de ponta a ponta
schema = "user_id BIGINT, event STRING, ts TIMESTAMP, country STRING"
df = (
spark.read
.option("header", True)
.schema(schema)
.csv("s3://bucket/raw/events/*.csv")
)
cleaned = df.dropna(subset=["user_id", "event"]).withColumnRenamed("ts", "event_ts")
(
cleaned
.write
.mode("append")
.partitionBy("country")
.parquet("s3://bucket/warehouse/events_parquet")
)