Python asyncio: Guia Completo para Programação Assíncrona
Updated on
Sua aplicação Python faz 100 chamadas de API, cada uma levando 2 segundos. Com código sequencial tradicional, são 200 segundos de espera. Seus usuários encaram telas de carregamento. Seus servidores ficam ociosos, consumindo recursos enquanto aguardam respostas. Este comportamento bloqueante é o problema que destrói a performance da aplicação e a experiência do usuário.
A dor se intensifica quando você escala. Consultas ao banco de dados se acumulam. Operações de arquivo enfileiram uma atrás da outra. Web scrapers engatinham em velocidade de lesma. Cada operação de I/O se torna um gargalo que cascateia por todo o seu sistema, transformando o que deveria ser uma aplicação rápida e responsiva em um monstro lento e desperdiçador de recursos.
O Python asyncio resolve isso habilitando a execução concorrente de tarefas limitadas por I/O. Em vez de esperar cada operação completar antes de iniciar a próxima, o asyncio permite que seu código inicie múltiplas operações e alterne entre elas enquanto aguarda. Aquelas 100 chamadas de API? Com asyncio, elas completam em aproximadamente 2 segundos ao invés de 200. Este guia mostra exatamente como implementar programação assíncrona em Python, com exemplos práticos que transformam código lento e bloqueante em aplicações rápidas e concorrentes.
O que é Programação Assíncrona e Por Que Importa
Programação assíncrona permite que um programa inicie tarefas potencialmente demoradas e prossiga para outro trabalho antes que essas tarefas sejam concluídas, ao invés de esperar que cada tarefa termine antes de iniciar a próxima.
Em código síncrono tradicional, quando você faz uma requisição de API, seu programa para e aguarda a resposta. Durante este período de espera, sua CPU fica ociosa, não fazendo nada produtivo. Isso é aceitável para operações únicas, mas catastrófico para aplicações que precisam lidar com múltiplas operações de I/O.
O asyncio fornece uma maneira de escrever código concorrente usando a sintaxe async/await. É particularmente eficaz para operações limitadas por I/O como:
- Fazer requisições HTTP para APIs
- Ler e escrever arquivos
- Consultas a banco de dados
- Comunicação de rede
- Conexões WebSocket
- Processamento de filas de mensagens
A melhoria de performance é dramática. Considere buscar dados de 50 URLs diferentes:
Abordagem síncrona: 50 requisições × 2 segundos cada = 100 segundos totais Abordagem assíncrona: Todas as 50 requisições executando concorrentemente ≈ 2 segundos totais
Esta melhoria de 50x na performance vem de melhor utilização de recursos. Ao invés de bloquear em operações de I/O, o asyncio permite que seu programa continue executando outras tarefas enquanto aguarda o I/O completar.
Concorrência vs Paralelismo vs Async
Entender a distinção entre estes conceitos é essencial para usar o asyncio efetivamente.
Concorrência significa gerenciar múltiplas tarefas de uma vez. As tarefas se alternam no progresso, mas apenas uma executa em qualquer momento dado. Pense em um chef preparando múltiplos pratos, alternando entre tarefas enquanto cada um espera algo cozinhar.
Paralelismo significa executar múltiplas tarefas simultaneamente em diferentes núcleos de CPU. Isso requer hardware de processamento paralelo real e é ideal para tarefas limitadas por CPU como computações matemáticas ou processamento de imagens.
Programação assíncrona é uma forma específica de concorrência projetada para tarefas limitadas por I/O. Ela usa uma única thread e alterna entre tarefas quando estão aguardando operações de I/O.
| Característica | asyncio | Threading | Multiprocessing |
|---|---|---|---|
| Modelo de execução | Single thread, multitarefa cooperativa | Múltiplas threads, multitarefa preemptiva | Múltiplos processos |
| Melhor para | Tarefas limitadas por I/O | Tarefas limitadas por I/O com bibliotecas bloqueantes | Tarefas limitadas por CPU |
| Overhead de memória | Mínimo | Moderado | Alto |
| Custo de troca de contexto | Muito baixo | Baixo a moderado | Alto |
| Complexidade | Moderada (sintaxe async/await) | Alta (condições de corrida, locks) | Alta (IPC, serialização) |
| Limitação do GIL | Não afetada (single thread) | Limitada pelo GIL | Não limitada (processos separados) |
| Speedup típico para I/O | 10-100x | 5-10x | N/A |
O Global Interpreter Lock (GIL) do Python impede a execução paralela verdadeira de bytecode Python em threads, tornando o threading menos efetivo para tarefas limitadas por CPU. O asyncio contorna esta limitação usando uma única thread com multitarefa cooperativa, enquanto o multiprocessing a contorna completamente com processos separados.
async def e await Keywords
A fundação do asyncio é construída sobre duas keywords: async e await.
A keyword async def define uma função corrotina. Quando você chama uma função corrotina, ela não executa imediatamente. Em vez disso, retorna um objeto corrotina que pode ser aguardado.
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"status": "success"}
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine)) # <class 'coroutine'>
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine) # This would execute the coroutineA keyword await pausa a execução de uma corrotina até que a operação aguardada seja concluída. Durante esta pausa, o event loop pode executar outras corrotinas. Você só pode usar await dentro de uma função async def.
async def process_user(user_id):
# Await an I/O operation
user_data = await fetch_user_from_database(user_id)
# Await another I/O operation
user_profile = await fetch_user_profile(user_id)
# Regular synchronous code runs normally
processed_data = transform_data(user_data, user_profile)
return processed_dataRegras chave para async/await:
- Você só pode usar
awaitem corrotinas, tasks ou futures - Você só pode usar
awaitdentro de uma funçãoasync def - Funções regulares não podem usar
await - Chamar uma função async sem await cria um objeto corrotina mas não executa o código
Erro comum:
async def wrong_example():
# This creates a coroutine but doesn't execute it!
fetch_data() # Missing await
async def correct_example():
# This actually executes the coroutine
result = await fetch_data()asyncio.run() Entry Point
A função asyncio.run() é a maneira padrão de iniciar o event loop do asyncio e executar sua corrotina principal. Ela foi introduzida no Python 3.7 e simplifica a execução de código async a partir de contextos síncronos.
import asyncio
async def main():
print("Starting async operations")
await asyncio.sleep(1)
print("Finished")
# Run the main coroutine
asyncio.run(main())O que asyncio.run() faz nos bastidores:
- Cria um novo event loop
- Executa a corrotina fornecida até a conclusão
- Fecha o event loop
- Retorna o resultado da corrotina
import asyncio
async def main():
result = await compute_value()
return result
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)Características importantes de asyncio.run():
- Não pode ser chamado de dentro de um event loop em execução: Se você já está em uma função async, use
awaitem vez disso - Cria um event loop fresco cada vez: Não chame
asyncio.run()múltiplas vezes no mesmo programa a menos que você queira instâncias separadas de event loop - Sempre fecha o loop: O event loop é limpo após a execução
Para Jupyter notebooks ou ambientes onde um event loop já está em execução, use await diretamente ou asyncio.create_task(). Ferramentas como RunCell (opens in a new tab) fornecem suporte async aprimorado em ambientes Jupyter, tornando mais fácil experimentar com padrões asyncio interativamente sem conflitos de event loop.
Antes do Python 3.7, você tinha que gerenciar o event loop manualmente:
# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# Modern way (Python 3.7+)
asyncio.run(main())Coroutines, Tasks, and Futures
Entender estes três conceitos core é essencial para dominar o asyncio.
Coroutines
Uma coroutine é uma função definida com async def. É uma função especial que pode ser pausada e retomada, permitindo que outro código execute durante a pausa.
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "completed"
# This creates a coroutine object
coro = my_coroutine()
# To run it
result = asyncio.run(coro)Tasks
Uma task é um wrapper em torno de uma corrotina que a agenda para execução no event loop. Tasks permitem que corrotinas executem concorrentemente.
import asyncio
async def say_after(delay, message):
await asyncio.sleep(delay)
print(message)
return message
async def main():
# Create tasks to run concurrently
task1 = asyncio.create_task(say_after(1, "First"))
task2 = asyncio.create_task(say_after(2, "Second"))
# Wait for both tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())Criar uma task agenda imediatamente a corrotina para execução. O event loop começa a executá-la assim que possível, mesmo antes de você dar await na task.
async def background_work():
print("Starting background work")
await asyncio.sleep(2)
print("Background work completed")
async def main():
# The coroutine starts running immediately
task = asyncio.create_task(background_work())
print("Task created, doing other work")
await asyncio.sleep(1)
print("Other work done")
# Wait for the background task to finish
await task
asyncio.run(main())Output:
Task created, doing other work
Starting background work
Other work done
Background work completedFutures
Um future é um objeto awaitable de baixo nível que representa um resultado eventual de uma operação assíncrona. Você raramente cria futures diretamente; eles são tipicamente criados por internais do asyncio ou bibliotecas.
async def set_future_result(future):
await asyncio.sleep(1)
future.set_result("Future completed")
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# Schedule a coroutine to set the future's result
asyncio.create_task(set_future_result(future))
# Wait for the future to get a result
result = await future
print(result)
asyncio.run(main())Relação entre coroutines, tasks e futures:
- Coroutines são as funções que você escreve
- Tasks encapsulam coroutines e as agendam para execução
- Futures representam resultados que estarão disponíveis no futuro
- Tasks são uma subclasse de Future
asyncio.create_task() for Concurrent Execution
A função asyncio.create_task() é sua ferramenta primária para alcançar concorrência verdadeira com asyncio. Ela agenda uma corrotina para executar no event loop sem bloquear a corrotina atual.
import asyncio
import time
async def download_file(file_id):
print(f"Starting download {file_id}")
await asyncio.sleep(2) # Simulate download time
print(f"Completed download {file_id}")
return f"file_{file_id}.dat"
async def sequential_downloads():
"""Downloads files one at a time"""
start = time.time()
file1 = await download_file(1)
file2 = await download_file(2)
file3 = await download_file(3)
elapsed = time.time() - start
print(f"Sequential: {elapsed:.2f} seconds")
# Output: ~6 seconds (2 + 2 + 2)
async def concurrent_downloads():
"""Downloads files concurrently"""
start = time.time()
# Create tasks for concurrent execution
task1 = asyncio.create_task(download_file(1))
task2 = asyncio.create_task(download_file(2))
task3 = asyncio.create_task(download_file(3))
# Wait for all tasks to complete
file1 = await task1
file2 = await task2
file3 = await task3
elapsed = time.time() - start
print(f"Concurrent: {elapsed:.2f} seconds")
# Output: ~2 seconds (all run at the same time)
asyncio.run(concurrent_downloads())A task é agendada imediatamente quando create_task() é chamada. Você não precisa dar await nela imediatamente.
async def process_data():
# Start background tasks
task1 = asyncio.create_task(fetch_from_api_1())
task2 = asyncio.create_task(fetch_from_api_2())
# Do some work while tasks run in background
local_data = prepare_local_data()
# Now wait for the background tasks
api_data_1 = await task1
api_data_2 = await task2
# Combine all data
return combine_data(local_data, api_data_1, api_data_2)Você também pode nomear tasks para melhor debugging:
async def main():
task = asyncio.create_task(
long_running_operation(),
name="long-operation-task"
)
# Task name is accessible
print(f"Task name: {task.get_name()}")
await taskasyncio.gather() for Running Multiple Coroutines
A função asyncio.gather() executa múltiplas corrotinas concorrentemente e aguarda que todas sejam concluídas. É mais limpa que criar tasks individuais quando você precisa executar muitas corrotinas.
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
fetch_user(4),
fetch_user(5)
)
print(results)
# Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
asyncio.run(main())gather() retorna resultados na mesma ordem das corrotinas de entrada, independentemente de qual completar primeiro.
async def task_with_delay(delay, value):
await asyncio.sleep(delay)
return value
async def main():
results = await asyncio.gather(
task_with_delay(3, "slow"),
task_with_delay(1, "fast"),
task_with_delay(2, "medium")
)
print(results)
# Output: ['slow', 'fast', 'medium'] (order preserved)
asyncio.run(main())Error Handling with gather()
Por padrão, se qualquer corrotina lançar uma exceção, gather() imediatamente levanta essa exceção e cancela as tasks restantes.
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def successful_task():
await asyncio.sleep(2)
return "success"
async def main():
try:
results = await asyncio.gather(
failing_task(),
successful_task()
)
except ValueError as e:
print(f"Error caught: {e}")
# The successful_task is cancelled
asyncio.run(main())Use return_exceptions=True para coletar exceções junto com resultados bem-sucedidos:
async def main():
results = await asyncio.gather(
failing_task(),
successful_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())Dynamic gather with list comprehension
gather() funciona lindamente com corrotinas geradas dinamicamente:
async def process_item(item):
await asyncio.sleep(1)
return item * 2
async def main():
items = [1, 2, 3, 4, 5]
# Process all items concurrently
results = await asyncio.gather(
*[process_item(item) for item in items]
)
print(results) # [2, 4, 6, 8, 10]
asyncio.run(main())asyncio.wait() and asyncio.as_completed()
Enquanto gather() aguarda que todas as corrotinas sejam concluídas, wait() e as_completed() fornecem controle mais granular sobre como você lida com tasks completadas.
asyncio.wait()
wait() permite que você aguarde tasks com diferentes condições de conclusão: todas as tasks, primeira task, ou primeira exceção.
import asyncio
async def task(delay, name):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
tasks = [
asyncio.create_task(task(1, "Task 1")),
asyncio.create_task(task(2, "Task 2")),
asyncio.create_task(task(3, "Task 3"))
]
# Wait for all tasks to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
print(task.result())
asyncio.run(main())Diferentes condições de conclusão:
async def main():
tasks = [
asyncio.create_task(task(1, "Fast")),
asyncio.create_task(task(3, "Slow")),
asyncio.create_task(task(2, "Medium"))
]
# Return when the first task completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"First completed: {done.pop().result()}")
print(f"Still pending: {len(pending)} tasks")
# Cancel remaining tasks
for task in pending:
task.cancel()
asyncio.run(main())asyncio.as_completed()
as_completed() retorna um iterator que produz tasks à medida que elas completam, permitindo que você processe resultados assim que estiverem disponíveis.
import asyncio
async def fetch_data(url_id, delay):
await asyncio.sleep(delay)
return f"Data from URL {url_id}"
async def main():
tasks = [
fetch_data(1, 3),
fetch_data(2, 1),
fetch_data(3, 2)
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Received: {result}")
asyncio.run(main())O output mostra resultados na ordem de conclusão, não na ordem de submissão:
Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1Isso é particularmente útil quando você quer exibir resultados para usuários o mais rápido possível:
async def search_engine(query, engine_name, delay):
await asyncio.sleep(delay)
return f"{engine_name}: Results for '{query}'"
async def main():
query = "python asyncio"
searches = [
search_engine(query, "Google", 1.5),
search_engine(query, "Bing", 2.0),
search_engine(query, "DuckDuckGo", 1.0)
]
print("Searching...")
for search in asyncio.as_completed(searches):
result = await search
print(result) # Display each result as soon as it arrives
asyncio.run(main())asyncio.sleep() vs time.sleep()
Esta distinção é crítica para a correção de código async.
time.sleep() é uma operação bloqueante que pausa a thread inteira, incluindo o event loop. Isso impede que todas as tasks async executem.
asyncio.sleep() é uma coroutine não-bloqueante que apenas pausa a task atual, permitindo que outras tasks executem.
import asyncio
import time
async def blocking_example():
"""Bad: Uses time.sleep() - blocks the entire event loop"""
print("Starting blocking sleep")
time.sleep(2) # WRONG: Blocks everything!
print("Finished blocking sleep")
async def non_blocking_example():
"""Good: Uses asyncio.sleep() - allows other tasks to run"""
print("Starting non-blocking sleep")
await asyncio.sleep(2) # CORRECT: Only pauses this task
print("Finished non-blocking sleep")
async def concurrent_task():
for i in range(3):
print(f"Concurrent task running: {i}")
await asyncio.sleep(0.5)
async def demo_blocking():
print("\n=== Blocking example (BAD) ===")
await asyncio.gather(
blocking_example(),
concurrent_task()
)
async def demo_non_blocking():
print("\n=== Non-blocking example (GOOD) ===")
await asyncio.gather(
non_blocking_example(),
concurrent_task()
)
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())No exemplo bloqueante, a task concorrente não executa até time.sleep() completar. No exemplo não-bloqueante, ambas as tasks executam concorrentemente.
Regra de ouro: Nunca use time.sleep() em código async. Sempre use await asyncio.sleep().
Para operações limitadas por CPU que você não pode evitar, use loop.run_in_executor() para executá-las em uma thread ou processo separado:
import asyncio
import time
def cpu_intensive_task():
"""Some blocking CPU work"""
time.sleep(2) # Simulate heavy computation
return "CPU task completed"
async def main():
loop = asyncio.get_event_loop()
# Run blocking task in a thread pool
result = await loop.run_in_executor(None, cpu_intensive_task)
print(result)
asyncio.run(main())async for and async with
Python fornece versões async de loops for e context managers para trabalhar com iterables assíncronos e recursos.
Async Iterators (async for)
Um async iterator é um objeto que implementa métodos __aiter__() e __anext__(), permitindo que você itere sobre itens que requerem operações async para buscar.
import asyncio
class AsyncRange:
"""An async iterator that yields numbers with delays"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
# Simulate async operation to get next value
await asyncio.sleep(0.5)
value = self.current
self.current += 1
return value
async def main():
async for number in AsyncRange(1, 5):
print(f"Got number: {number}")
asyncio.run(main())Exemplo do mundo real com cursor de banco de dados async:
class AsyncDatabaseCursor:
"""Simulated async database cursor"""
def __init__(self, query):
self.query = query
self.results = []
self.index = 0
async def execute(self):
# Simulate database query
await asyncio.sleep(1)
self.results = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.results):
raise StopAsyncIteration
# Simulate fetching next row
await asyncio.sleep(0.1)
row = self.results[self.index]
self.index += 1
return row
async def fetch_users():
cursor = AsyncDatabaseCursor("SELECT * FROM users")
await cursor.execute()
async for row in cursor:
print(f"User: {row['name']}")
asyncio.run(fetch_users())Async Context Managers (async with)
Um async context manager implementa métodos __aenter__() e __aexit__() para gerenciar recursos que requerem setup e cleanup assíncronos.
import asyncio
class AsyncDatabaseConnection:
"""An async context manager for database connections"""
async def __aenter__(self):
print("Opening database connection...")
await asyncio.sleep(1) # Simulate connection time
print("Database connection opened")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.5) # Simulate cleanup
print("Database connection closed")
async def query(self, sql):
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
# Connection automatically closed after the with block
asyncio.run(main())Combinando async context managers com async iterators:
class AsyncFileReader:
"""Async context manager and iterator for reading files"""
def __init__(self, filename):
self.filename = filename
self.lines = []
self.index = 0
async def __aenter__(self):
# Simulate async file opening
await asyncio.sleep(0.5)
self.lines = ["Line 1", "Line 2", "Line 3"]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.2)
self.lines = []
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.lines):
raise StopAsyncIteration
await asyncio.sleep(0.1)
line = self.lines[self.index]
self.index += 1
return line
async def read_file():
async with AsyncFileReader("data.txt") as reader:
async for line in reader:
print(line)
asyncio.run(read_file())asyncio.Queue for Producer-Consumer Patterns
asyncio.Queue é uma queue thread-safe e async-aware que é perfeita para coordenar trabalho entre corrotinas produtoras e consumidoras.
import asyncio
import random
async def producer(queue, producer_id):
"""Produces items and puts them in the queue"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
async def consumer(queue, consumer_id):
"""Consumes items from the queue"""
while True:
item = await queue.get()
print(f"Consumer {consumer_id} consuming: {item}")
# Simulate processing time
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for the queue to be fully processed
await queue.join()
# Cancel consumers (they run forever)
for consumer_task in consumers:
consumer_task.cancel()
asyncio.run(main())Exemplo do mundo real: Web scraper com fila de URLs:
import asyncio
from typing import Set
async def fetch_url(session, url):
"""Simulate fetching a URL"""
await asyncio.sleep(1)
return f"Content from {url}"
async def producer(queue: asyncio.Queue, start_urls: list):
"""Add URLs to the queue"""
for url in start_urls:
await queue.put(url)
async def consumer(queue: asyncio.Queue, visited: Set[str]):
"""Fetch URLs from the queue"""
while True:
url = await queue.get()
if url in visited:
queue.task_done()
continue
visited.add(url)
print(f"Scraping: {url}")
# Simulate fetching
content = await fetch_url(None, url)
# Simulate finding new URLs in the content
# In real scraper, you'd parse HTML and extract links
new_urls = [] # Would extract from content
for new_url in new_urls:
if new_url not in visited:
await queue.put(new_url)
queue.task_done()
async def main():
queue = asyncio.Queue()
visited = set()
start_urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
# Add initial URLs
await producer(queue, start_urls)
# Create consumers
consumers = [
asyncio.create_task(consumer(queue, visited))
for _ in range(3)
]
# Wait for all URLs to be processed
await queue.join()
# Cancel consumers
for consumer_task in consumers:
consumer_task.cancel()
print(f"Scraped {len(visited)} unique URLs")
asyncio.run(main())Semaphores for Rate Limiting
asyncio.Semaphore controla o número de corrotinas que podem acessar um recurso concorrentemente. Isso é essencial para rate limiting de chamadas de API ou limitar conexões concorrentes de banco de dados.
import asyncio
import time
async def call_api(semaphore, api_id):
"""Make an API call with rate limiting"""
async with semaphore:
print(f"API call {api_id} started")
await asyncio.sleep(1) # Simulate API call
print(f"API call {api_id} completed")
return f"Result {api_id}"
async def main():
# Allow only 3 concurrent API calls
semaphore = asyncio.Semaphore(3)
start = time.time()
# Create 10 API calls
tasks = [call_api(semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
# With semaphore(3): ~4 seconds (10 calls in batches of 3)
# Without semaphore: ~1 second (all concurrent)
asyncio.run(main())Rate limiting para conformidade com quota de API:
import asyncio
from datetime import datetime
class RateLimiter:
"""Rate limiter that allows N requests per time period"""
def __init__(self, max_requests, time_period):
self.max_requests = max_requests
self.time_period = time_period
self.semaphore = asyncio.Semaphore(max_requests)
self.request_times = []
async def __aenter__(self):
await self.semaphore.acquire()
# Wait if we've hit the rate limit
while len(self.request_times) >= self.max_requests:
oldest = self.request_times[0]
elapsed = datetime.now().timestamp() - oldest
if elapsed < self.time_period:
sleep_time = self.time_period - elapsed
await asyncio.sleep(sleep_time)
self.request_times.pop(0)
self.request_times.append(datetime.now().timestamp())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
async def fetch_with_rate_limit(rate_limiter, item_id):
async with rate_limiter:
print(f"Fetching item {item_id} at {datetime.now()}")
await asyncio.sleep(0.5)
return f"Item {item_id}"
async def main():
# Allow 5 requests per 2 seconds
rate_limiter = RateLimiter(max_requests=5, time_period=2)
# Make 15 requests
tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(main())aiohttp for Async HTTP Requests
A biblioteca aiohttp fornece funcionalidade de cliente e servidor HTTP async. É a escolha padrão para fazer requisições HTTP em código async.
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch_url(session, "https://api.github.com")
print(f"Fetched {len(html)} characters")
asyncio.run(main())Buscando múltiplas URLs concorrentemente:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Fetch URL and return status code and content length"""
async with session.get(url) as response:
content = await response.text()
return {
"url": url,
"status": response.status,
"length": len(content)
}
async def fetch_all_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
start = time.time()
results = await fetch_all_urls(urls)
elapsed = time.time() - start
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())Exemplo prático com tratamento de erros e retries:
import asyncio
import aiohttp
from typing import Optional
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Optional[dict]:
"""Fetch URL with retry logic"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return data
elif response.status == 404:
print(f"URL not found: {url}")
return None
else:
print(f"Unexpected status {response.status} for {url}")
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1} for {url}")
except aiohttp.ClientError as e:
print(f"Client error on attempt {attempt + 1} for {url}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
print(f"Failed to fetch {url} after {max_retries} attempts")
return None
async def main():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/nonexistent",
"https://httpbin.org/delay/5"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
asyncio.run(main())aiofiles for Async File I/O
A biblioteca aiofiles fornece operações de arquivo assíncronas, prevenindo bloqueios durante leituras e escritas de arquivos.
import asyncio
import aiofiles
async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
return contents
async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# Write data
await write_file('test.txt', 'Hello, async file I/O!')
# Read data
content = await read_file('test.txt')
print(f"Read: {content}")
asyncio.run(main())Processando múltiplos arquivos concorrentemente:
import asyncio
import aiofiles
async def process_file(input_file, output_file):
"""Read, process, and write a file"""
async with aiofiles.open(input_file, mode='r') as f:
content = await f.read()
# Process content (convert to uppercase)
processed = content.upper()
async with aiofiles.open(output_file, mode='w') as f:
await f.write(processed)
return f"Processed {input_file} -> {output_file}"
async def main():
files = [
('input1.txt', 'output1.txt'),
('input2.txt', 'output2.txt'),
('input3.txt', 'output3.txt')
]
tasks = [process_file(inp, out) for inp, out in files]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())Lendo arquivos grandes linha por linha:
import asyncio
import aiofiles
async def process_large_file(filename):
"""Process a large file line by line without loading it all into memory"""
line_count = 0
async with aiofiles.open(filename, mode='r') as f:
async for line in f:
# Process each line
line_count += 1
if line.strip():
# Do something with the line
pass
return line_count
async def main():
count = await process_large_file('large_data.txt')
print(f"Processed {count} lines")
asyncio.run(main())Error Handling in Async Code
Tratamento de erros em código async requer atenção especial para garantir que exceções sejam propriamente capturadas e recursos sejam limpos.
import asyncio
async def risky_operation(item_id):
"""An operation that might fail"""
await asyncio.sleep(1)
if item_id % 2 == 0:
raise ValueError(f"Item {item_id} caused an error")
return f"Item {item_id} processed"
async def handle_with_try_except():
"""Handle errors with try/except"""
try:
result = await risky_operation(2)
print(result)
except ValueError as e:
print(f"Error caught: {e}")
asyncio.run(handle_with_try_except())Lidando com erros em tasks concorrentes:
async def safe_operation(item_id):
"""Wrapper that catches errors from risky_operation"""
try:
result = await risky_operation(item_id)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e), "item_id": item_id}
async def main():
tasks = [safe_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks)
for result in results:
if result["success"]:
print(f"Success: {result['result']}")
else:
print(f"Failed: Item {result['item_id']} - {result['error']}")
asyncio.run(main())Usando gather() com return_exceptions=True:
async def main():
tasks = [risky_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())Cleanup com async context managers:
class AsyncResource:
"""A resource that needs cleanup even if errors occur"""
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Cleaning up resource")
await asyncio.sleep(0.5)
if exc_type is not None:
print(f"Exception during resource use: {exc_val}")
# Return False to propagate exception, True to suppress
return False
async def do_work(self):
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
try:
async with AsyncResource() as resource:
await resource.do_work()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())Performance Benchmarks: Sync vs Async for I/O-Bound Tasks
Vamos comparar abordagens síncronas e assíncronas para operações limitadas por I/O com benchmarks reais.
import asyncio
import time
import requests
import aiohttp
# Synchronous approach
def fetch_sync(url):
response = requests.get(url)
return len(response.text)
def benchmark_sync(urls):
start = time.time()
results = [fetch_sync(url) for url in urls]
elapsed = time.time() - start
return elapsed, results
# Asynchronous approach
async def fetch_async(session, url):
async with session.get(url) as response:
text = await response.text()
return len(text)
async def benchmark_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# Run benchmarks
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
print(f"\nSpeedup: {sync_time / async_time:.2f}x")Resultados típicos para 5 URLs com delay de 1 segundo cada:
- Síncrono: ~5 segundos (execução sequencial)
- Assíncrono: ~1 segundo (execução concorrente)
- Speedup: ~5x
| Número de URLs | Tempo Sync | Tempo Async | Speedup |
|---|---|---|---|
| 5 | 5.2s | 1.1s | 4.7x |
| 10 | 10.4s | 1.2s | 8.7x |
| 20 | 20.8s | 1.4s | 14.9x |
| 50 | 52.1s | 2.1s | 24.8x |
| 100 | 104.5s | 3.8s | 27.5x |
O speedup aumenta com o número de operações concorrentes até que você atinja restrições de banda ou rate limiting.
Common Pitfalls and How to Avoid Them
Esquecendo o await
# WRONG: Coroutine is created but not executed
async def wrong():
result = fetch_data() # Missing await!
print(result) # Prints coroutine object, not the result
# CORRECT: Await the coroutine
async def correct():
result = await fetch_data()
print(result) # Prints actual resultBloqueando o event loop
# WRONG: Blocks the entire event loop
async def blocking_code():
time.sleep(5) # Blocks everything!
result = compute_something() # Blocks if CPU-intensive
return result
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
await asyncio.sleep(5) # Only pauses this task
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute_something)
return resultNão lidando com cancelamento de task
# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
resource = await acquire_resource()
await long_operation()
await release_resource(resource) # May not execute if cancelled
# CORRECT: Use try/finally or async with
async def proper_cleanup():
resource = await acquire_resource()
try:
await long_operation()
finally:
await release_resource(resource) # Always executesCriando conflitos de event loop
# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
result = asyncio.run(some_coroutine()) # Error!
return result
# CORRECT: Just await the coroutine
async def correct_nesting():
result = await some_coroutine()
return resultNão limitando concorrência
# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
tasks = [process_item(item) for item in items] # 10,000 tasks!
return await asyncio.gather(*tasks)
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)Real-World Examples
Web Scraping with Rate Limiting
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_page(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
tasks = [self.fetch_page(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
async with AsyncWebScraper(max_concurrent=3) as scraper:
pages = await scraper.scrape_urls(urls)
for url, html in zip(urls, pages):
if html:
print(f"Scraped {url}: {len(html)} bytes")
asyncio.run(main())Async API Data Pipeline
import asyncio
import aiohttp
async def fetch_user_ids(session):
"""Fetch list of user IDs from API"""
async with session.get("https://api.example.com/users") as response:
data = await response.json()
return [user["id"] for user in data["users"]]
async def fetch_user_details(session, user_id):
"""Fetch detailed info for a user"""
async with session.get(f"https://api.example.com/users/{user_id}") as response:
return await response.json()
async def fetch_user_posts(session, user_id):
"""Fetch posts for a user"""
async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
return await response.json()
async def process_user(session, user_id):
"""Fetch all data for a user concurrently"""
details, posts = await asyncio.gather(
fetch_user_details(session, user_id),
fetch_user_posts(session, user_id)
)
return {
"user": details,
"posts": posts,
"post_count": len(posts)
}
async def main():
async with aiohttp.ClientSession() as session:
# Fetch user IDs
user_ids = await fetch_user_ids(session)
# Process all users concurrently
user_data = await asyncio.gather(
*[process_user(session, uid) for uid in user_ids[:10]]
)
for data in user_data:
print(f"User {data['user']['name']}: {data['post_count']} posts")
asyncio.run(main())Async Chat Server
import asyncio
class ChatServer:
def __init__(self):
self.clients = set()
async def handle_client(self, reader, writer):
"""Handle a single client connection"""
addr = writer.get_extra_info('peername')
print(f"Client connected: {addr}")
self.clients.add(writer)
try:
while True:
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Received from {addr}: {message}")
# Broadcast to all clients
await self.broadcast(f"{addr}: {message}", writer)
except asyncio.CancelledError:
pass
finally:
print(f"Client disconnected: {addr}")
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
async def broadcast(self, message, sender):
"""Send message to all clients except sender"""
for client in self.clients:
if client != sender:
try:
client.write(message.encode())
await client.drain()
except Exception as e:
print(f"Error broadcasting: {e}")
async def start(self, host='127.0.0.1', port=8888):
"""Start the chat server"""
server = await asyncio.start_server(
self.handle_client,
host,
port
)
addr = server.sockets[0].getsockname()
print(f"Chat server running on {addr}")
async with server:
await server.serve_forever()
async def main():
chat_server = ChatServer()
await chat_server.start()
asyncio.run(main())Experimenting with Asyncio in Jupyter
Ao trabalhar com asyncio em Jupyter notebooks, você pode encontrar conflitos de event loop. O Jupyter já executa um event loop, o que pode interferir com asyncio.run().
Para experimentação async seamless em ambientes Jupyter, considere usar RunCell (opens in a new tab), um agente de IA projetado especificamente para Jupyter notebooks. RunCell gerencia o event loop automaticamente e fornece capacidades aprimoradas de debugging async, permitindo que você teste padrões asyncio interativamente sem conflitos.
Em Jupyter padrão, você pode usar await no nível superior:
# In Jupyter, this works directly
async def fetch_data():
await asyncio.sleep(1)
return "data"
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)Ou use o pacote nest_asyncio para permitir loops aninhados:
import nest_asyncio
nest_asyncio.apply()
# Now asyncio.run() works in Jupyter
asyncio.run(main())FAQ
Qual é a diferença entre asyncio e threading em Python?
Asyncio usa multitarefa cooperativa em uma única thread, onde as tarefas voluntariamente cedem controle usando await. Threading usa multitarefa preemptiva com múltiplas threads do SO, onde o SO decide quando alternar entre threads. Asyncio é mais eficiente para tarefas limitadas por I/O com menor overhead de memória e sem riscos de condições de corrida, enquanto threading pode lidar com bibliotecas bloqueantes que não suportam async. Ambas são limitadas pelo GIL do Python para trabalho limitado por CPU, mas asyncio evita o overhead de troca de contexto de threads.
Quando devo usar asyncio ao invés de multiprocessing?
Use asyncio para tarefas limitadas por I/O como chamadas de API, consultas a banco de dados, operações de arquivo e comunicação de rede. Use multiprocessing para tarefas limitadas por CPU como processamento de dados, computações matemáticas, manipulação de imagens e treinamento de modelos de machine learning. O asyncio cria processos separados que contornam completamente o GIL do Python, habilitando execução paralela verdadeira em múltiplos núcleos de CPU. O asyncio excele em lidar com milhares de operações concorrentes de I/O com overhead mínimo de recursos, enquanto o multiprocessing é limitado pela contagem de núcleos de CPU mas fornece computação paralela real.
Posso misturar código async e sync na mesma aplicação?
Sim, mas com planejamento cuidadoso. Você pode chamar funções async de código sync usando asyncio.run(), embora não possa chamá-lo de dentro de um event loop já em execução. Para chamar funções bloqueantes sync de código async, use loop.run_in_executor() para executá-las em um pool de threads, prevenindo que bloqueiem o event loop. Nunca use operações bloqueantes como time.sleep(), requests.get() ou I/O de arquivo síncrono diretamente em funções async, pois elas bloqueiam o event loop inteiro. Em vez disso, use equivalentes async como asyncio.sleep(), aiohttp e aiofiles.
Como faço para debugar código asyncio efetivamente?
Habilite o modo debug do asyncio com asyncio.run(main(), debug=True) ou defina a variável de ambiente PYTHONASYNCIODEBUG=1. Isso detecta erros comuns como esquecer await, callbacks tomando muito tempo e corrotinas que nunca foram aguardadas. Use logging extensivamente para rastrear o fluxo de execução, já que debuggers tradicionais podem ser confusos com código async. Adicione nomes de tasks com asyncio.create_task(coro(), name="task-name") para mensagens de erro mais claras. Use asyncio.gather(..., return_exceptions=True) para prevenir que uma task falhando esconda erros em outras. Monitore seu event loop com asyncio.all_tasks() para verificar tasks que não estão completando.
Quais são os limites de performance do asyncio?
Asyncio pode lidar com dezenas de milhares de operações concorrentes de I/O em uma única thread, excedendo limites de threading. A principal restrição é que asyncio não fornece benefício para operações limitadas por CPU já que usa uma única thread. A performance degrada se você bloquear o event loop com I/O síncrono ou computação pesada. Largura de banda de rede e limites de rate de API se tornam o gargalo antes dos limites de concorrência do asyncio. Uso de memória escala com o número de tasks concorrentes, mas cada task tem overhead mínimo comparado a threads. Para performance máxima, combine asyncio para concorrência de I/O com multiprocessing para trabalho limitado por CPU, e sempre use semáforos para limitar operações concorrentes a níveis razoáveis.
Conclusão
O Python asyncio transforma aplicações limitadas por I/O de operações lentas e bloqueantes em sistemas rápidos e concorrentes. Ao dominar a sintaxe async/await, entender o event loop e aproveitar ferramentas como gather(), create_task() e semáforos, você pode construir aplicações que lidam eficientemente com milhares de operações concorrentes.
A chave para o sucesso com asyncio é reconhecer quando é a ferramenta certa. Use-o para requisições de rede, consultas a banco de dados, operações de arquivo e qualquer tarefa que passe tempo aguardando recursos externos. Evite bloquear o event loop com operações síncronas, sempre use await com corrotinas e limite a concorrência com semáforos quando necessário.
Comece convertendo pequenas seções de sua base de código para async, meça a melhoria de performance e expanda gradualmente. Os speedups dramáticos em aplicações pesadas em I/O fazem o investimento de aprendizado valer a pena.