Python asyncio: Guía Completa de Programación Asincrónica
Updated on
Tu aplicación Python realiza 100 llamadas a API, cada una tomando 2 segundos. Con código secuencial tradicional, eso son 200 segundos de espera. Tus usuarios miran pantallas de carga. Tus servidores permanecen inactivos, consumiendo recursos mientras esperan respuestas. Este comportamiento bloqueante es el problema que destruye el rendimiento de la aplicación y la experiencia del usuario.
El dolor se intensifica cuando escalas. Las consultas a la base de datos se acumulan. Las operaciones de archivos se encolan una tras otra. Los web scrapers avanzan a paso de tortuga. Cada operación de E/S se convierte en un cuello de botella que se propaga por todo tu sistema, convirtiendo lo que debería ser una aplicación rápida y responsiva en un monstruo lento y desperdiciador de recursos.
Python asyncio resuelve esto permitiendo la ejecución concurrente de tareas limitadas por E/S. En lugar de esperar a que cada operación se complete antes de iniciar la siguiente, asyncio permite que tu código inicie múltiples operaciones y cambie entre ellas mientras espera. ¿Esas 100 llamadas a API? Con asyncio, se completan en aproximadamente 2 segundos en lugar de 200. Esta guía te muestra exactamente cómo implementar la programación asíncrona en Python, con ejemplos prácticos que transforman código lento y bloqueante en aplicaciones rápidas y concurrentes.
Qué es la Programación Asincrónica y Por Qué Importa
La programación asincrónica permite que un programa inicie tareas potencialmente de larga duración y pase a otro trabajo antes de que esas tareas se completen, en lugar de esperar a que cada tarea termine antes de iniciar la siguiente.
En el código síncrono tradicional, cuando haces una solicitud a una API, tu programa se detiene y espera la respuesta. Durante este período de espera, tu CPU permanece inactiva, sin hacer nada productivo. Esto es aceptable para operaciones únicas, pero catastrófico para aplicaciones que necesitan manejar múltiples operaciones de E/S.
Asyncio proporciona una forma de escribir código concurrente utilizando la sintaxis async/await. Es particularmente efectivo para operaciones limitadas por E/S como:
- Realizar solicitudes HTTP a APIs
- Leer y escribir archivos
- Consultas a bases de datos
- Comunicación de red
- Conexiones WebSocket
- Procesamiento de colas de mensajes
La mejora de rendimiento es dramática. Considera obtener datos de 50 URLs diferentes:
Enfoque síncrono: 50 solicitudes × 2 segundos cada una = 100 segundos totales Enfoque asíncrono: Las 50 solicitudes se ejecutan concurrentemente ≈ 2 segundos totales
Esta mejora de rendimiento de 50x proviene de una mejor utilización de recursos. En lugar de bloquearse en operaciones de E/S, asyncio permite que tu programa continúe ejecutando otras tareas mientras espera que la E/S se complete.
Concurrencia vs Paralelismo vs Asíncrono
Entender la distinción entre estos conceptos es esencial para usar asyncio efectivamente.
Concurrencia significa gestionar múltiples tareas a la vez. Las tareas se turnan para progresar, pero solo una se ejecuta en cualquier momento dado. Piensa en un chef preparando múltiples platos, cambiando entre tareas mientras cada uno espera a que se cocine.
Paralelismo significa ejecutar múltiples tareas simultáneamente en diferentes núcleos de CPU. Esto requiere hardware de procesamiento paralelo real y es ideal para tareas limitadas por CPU como cálculos matemáticos o procesamiento de imágenes.
Programación asíncrona es una forma específica de concurrencia diseñada para tareas limitadas por E/S. Utiliza un solo hilo y cambia entre tareas cuando están esperando operaciones de E/S.
| Característica | asyncio | Threading | Multiprocessing |
|---|---|---|---|
| Modelo de ejecución | Hilo único, multitarea cooperativa | Múltiples hilos, multitarea preventiva | Múltiples procesos |
| Mejor para | Tareas limitadas por E/S | Tareas limitadas por E/S con librerías bloqueantes | Tareas limitadas por CPU |
| Sobrecarga de memoria | Mínima | Moderada | Alta |
| Costo de cambio de contexto | Muy bajo | Bajo a moderado | Alto |
| Complejidad | Moderada (sintaxis async/await) | Alta (condiciones de carrera, locks) | Alta (IPC, serialización) |
| Limitación del GIL | No afectada (hilo único) | Limitada por GIL | No limitada (procesos separados) |
| Aceleración típica para E/S | 10-100x | 5-10x | N/A |
El Global Interpreter Lock (GIL) de Python previene la ejecución paralela real del bytecode de Python en hilos, haciendo que el threading sea menos efectivo para tareas limitadas por CPU. Asyncio evita esta limitación utilizando un solo hilo con multitarea cooperativa, mientras que multiprocessing la evita completamente con procesos separados.
async def y await Palabras Clave
La base de asyncio se construye sobre dos palabras clave: async y await.
La palabra clave async def define una función de corrutina. Cuando llamas a una función de corrutina, no se ejecuta inmediatamente. En su lugar, devuelve un objeto de corrutina que puede ser esperado.
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"status": "success"}
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine)) # <class 'coroutine'>
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine) # This would execute the coroutineLa palabra clave await pausa la ejecución de una corrutina hasta que la operación esperada se complete. Durante esta pausa, el bucle de eventos puede ejecutar otras corrutinas. Solo puedes usar await dentro de una función async def.
async def process_user(user_id):
# Await an I/O operation
user_data = await fetch_user_from_database(user_id)
# Await another I/O operation
user_profile = await fetch_user_profile(user_id)
# Regular synchronous code runs normally
processed_data = transform_data(user_data, user_profile)
return processed_dataReglas clave para async/await:
- Solo puedes usar
awaiten corrutinas, tareas o futuros - Solo puedes usar
awaitdentro de una funciónasync def - Las funciones regulares no pueden usar
await - Llamar una función async sin usar await crea un objeto de corrutina pero no ejecuta el código
Error común:
async def wrong_example():
# This creates a coroutine but doesn't execute it!
fetch_data() # Missing await
async def correct_example():
# This actually executes the coroutine
result = await fetch_data()asyncio.run() Punto de Entrada
La función asyncio.run() es la forma estándar de iniciar el bucle de eventos de asyncio y ejecutar tu corrutina principal. Fue introducida en Python 3.7 y simplifica la ejecución de código async desde contextos síncronos.
import asyncio
async def main():
print("Starting async operations")
await asyncio.sleep(1)
print("Finished")
# Run the main coroutine
asyncio.run(main())Lo que asyncio.run() hace detrás de escenas:
- Crea un nuevo bucle de eventos
- Ejecuta la corrutina proporcionada hasta su finalización
- Cierra el bucle de eventos
- Devuelve el resultado de la corrutina
import asyncio
async def main():
result = await compute_value()
return result
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)Características importantes de asyncio.run():
- No puede llamarse desde dentro de un bucle de eventos en ejecución: Si ya estás en una función async, usa
awaiten su lugar - Crea un bucle de eventos fresco cada vez: No llames a
asyncio.run()múltiples veces en el mismo programa a menos que quieras instancias separadas del bucle de eventos - Siempre cierra el bucle: El bucle de eventos se limpia después de la ejecución
Para Jupyter notebooks o entornos donde ya hay un bucle de eventos en ejecución, usa await directamente o asyncio.create_task(). Herramientas como RunCell (opens in a new tab) proporcionan soporte async mejorado en entornos Jupyter, facilitando experimentar con patrones asyncio interactivamente sin conflictos del bucle de eventos.
Antes de Python 3.7, tenías que gestionar manualmente el bucle de eventos:
# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# Modern way (Python 3.7+)
asyncio.run(main())Corrutinas, Tareas y Futuros
Entender estos tres conceptos fundamentales es esencial para dominar asyncio.
Corrutinas
Una corrutina es una función definida con async def. Es una función especial que puede ser pausada y reanudada, permitiendo que otro código se ejecute durante la pausa.
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "completed"
# This creates a coroutine object
coro = my_coroutine()
# To run it
result = asyncio.run(coro)Tareas
Una tarea es un envoltorio alrededor de una corrutina que la programa para ejecución en el bucle de eventos. Las tareas permiten que las corrutinas se ejecuten concurrentemente.
import asyncio
async def say_after(delay, message):
await asyncio.sleep(delay)
print(message)
return message
async def main():
# Create tasks to run concurrently
task1 = asyncio.create_task(say_after(1, "First"))
task2 = asyncio.create_task(say_after(2, "Second"))
# Wait for both tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())Crear una tarea programa inmediatamente la corrutina para ejecución. El bucle de eventos comienza a ejecutarla tan pronto como sea posible, incluso antes de que hagas await de la tarea.
async def background_work():
print("Starting background work")
await asyncio.sleep(2)
print("Background work completed")
async def main():
# The coroutine starts running immediately
task = asyncio.create_task(background_work())
print("Task created, doing other work")
await asyncio.sleep(1)
print("Other work done")
# Wait for the background task to finish
await task
asyncio.run(main())Output:
Task created, doing other work
Starting background work
Other work done
Background work completedFuturos
Un futuro es un objeto esperable de bajo nivel que representa un resultado eventual de una operación asíncrona. Raramente creas futuros directamente; típicamente son creados por asyncio internamente o por librerías.
async def set_future_result(future):
await asyncio.sleep(1)
future.set_result("Future completed")
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# Schedule a coroutine to set the future's result
asyncio.create_task(set_future_result(future))
# Wait for the future to get a result
result = await future
print(result)
asyncio.run(main())Relación entre corrutinas, tareas y futuros:
- Las corrutinas son las funciones que escribes
- Las tareas envuelven corrutinas y las programan para ejecución
- Los futuros representan resultados que estarán disponibles en el futuro
- Las tareas son una subclase de Futuro
asyncio.create_task() para Ejecución Concurrente
La función asyncio.create_task() es tu herramienta principal para lograr concurrencia real con asyncio. Programa una corrutina para ejecutarse en el bucle de eventos sin bloquear la corrutina actual.
import asyncio
import time
async def download_file(file_id):
print(f"Starting download {file_id}")
await asyncio.sleep(2) # Simulate download time
print(f"Completed download {file_id}")
return f"file_{file_id}.dat"
async def sequential_downloads():
"""Downloads files one at a time"""
start = time.time()
file1 = await download_file(1)
file2 = await download_file(2)
file3 = await download_file(3)
elapsed = time.time() - start
print(f"Sequential: {elapsed:.2f} seconds")
# Output: ~6 seconds (2 + 2 + 2)
async def concurrent_downloads():
"""Downloads files concurrently"""
start = time.time()
# Create tasks for concurrent execution
task1 = asyncio.create_task(download_file(1))
task2 = asyncio.create_task(download_file(2))
task3 = asyncio.create_task(download_file(3))
# Wait for all tasks to complete
file1 = await task1
file2 = await task2
file3 = await task3
elapsed = time.time() - start
print(f"Concurrent: {elapsed:.2f} seconds")
# Output: ~2 seconds (all run at the same time)
asyncio.run(concurrent_downloads())La tarea se programa inmediatamente cuando se llama a create_task(). No tienes que hacerle await de inmediato.
async def process_data():
# Start background tasks
task1 = asyncio.create_task(fetch_from_api_1())
task2 = asyncio.create_task(fetch_from_api_2())
# Do some work while tasks run in background
local_data = prepare_local_data()
# Now wait for the background tasks
api_data_1 = await task1
api_data_2 = await task2
# Combine all data
return combine_data(local_data, api_data_1, api_data_2)También puedes nombrar tareas para mejor depuración:
async def main():
task = asyncio.create_task(
long_running_operation(),
name="long-operation-task"
)
# Task name is accessible
print(f"Task name: {task.get_name()}")
await taskasyncio.gather() para Ejecutar Múltiples Corrutinas
La función asyncio.gather() ejecuta múltiples corrutinas concurrentemente y espera a que todas se completen. Es más limpia que crear tareas individuales cuando necesitas ejecutar muchas corrutinas.
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
fetch_user(4),
fetch_user(5)
)
print(results)
# Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
asyncio.run(main())gather() devuelve resultados en el mismo orden que las corrutinas de entrada, independientemente de cuál se complete primero.
async def task_with_delay(delay, value):
await asyncio.sleep(delay)
return value
async def main():
results = await asyncio.gather(
task_with_delay(3, "slow"),
task_with_delay(1, "fast"),
task_with_delay(2, "medium")
)
print(results)
# Output: ['slow', 'fast', 'medium'] (order preserved)
asyncio.run(main())Manejo de Errores con gather()
Por defecto, si alguna corrutina lanza una excepción, gather() lanza inmediatamente esa excepción y cancela las tareas restantes.
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def successful_task():
await asyncio.sleep(2)
return "success"
async def main():
try:
results = await asyncio.gather(
failing_task(),
successful_task()
)
except ValueError as e:
print(f"Error caught: {e}")
# The successful_task is cancelled
asyncio.run(main())Usa return_exceptions=True para recoger excepciones junto con resultados exitosos:
async def main():
results = await asyncio.gather(
failing_task(),
successful_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())gather() dinámico con comprensión de listas
gather() funciona hermosamente con corrutinas generadas dinámicamente:
async def process_item(item):
await asyncio.sleep(1)
return item * 2
async def main():
items = [1, 2, 3, 4, 5]
# Process all items concurrently
results = await asyncio.gather(
*[process_item(item) for item in items]
)
print(results) # [2, 4, 6, 8, 10]
asyncio.run(main())asyncio.wait() y asyncio.as_completed()
Mientras que gather() espera a que todas las corrutinas se completen, wait() y as_completed() proporcionan un control más granular sobre cómo manejas las tareas completadas.
asyncio.wait()
wait() te permite esperar tareas con diferentes condiciones de finalización: todas las tareas, primera tarea, o primera excepción.
import asyncio
async def task(delay, name):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
tasks = [
asyncio.create_task(task(1, "Task 1")),
asyncio.create_task(task(2, "Task 2")),
asyncio.create_task(task(3, "Task 3"))
]
# Wait for all tasks to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
print(task.result())
asyncio.run(main())Diferentes condiciones de finalización:
async def main():
tasks = [
asyncio.create_task(task(1, "Fast")),
asyncio.create_task(task(3, "Slow")),
asyncio.create_task(task(2, "Medium"))
]
# Return when the first task completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"First completed: {done.pop().result()}")
print(f"Still pending: {len(pending)} tasks")
# Cancel remaining tasks
for task in pending:
task.cancel()
asyncio.run(main())asyncio.as_completed()
as_completed() devuelve un iterador que produce tareas a medida que se completan, permitiéndote procesar resultados tan pronto como estén disponibles.
import asyncio
async def fetch_data(url_id, delay):
await asyncio.sleep(delay)
return f"Data from URL {url_id}"
async def main():
tasks = [
fetch_data(1, 3),
fetch_data(2, 1),
fetch_data(3, 2)
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Received: {result}")
asyncio.run(main())La salida muestra resultados en orden de finalización, no en orden de envío:
Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1Esto es particularmente útil cuando quieres mostrar resultados a los usuarios tan rápido como sea posible:
async def search_engine(query, engine_name, delay):
await asyncio.sleep(delay)
return f"{engine_name}: Results for '{query}'"
async def main():
query = "python asyncio"
searches = [
search_engine(query, "Google", 1.5),
search_engine(query, "Bing", 2.0),
search_engine(query, "DuckDuckGo", 1.0)
]
print("Searching...")
for search in asyncio.as_completed(searches):
result = await search
print(result) # Display each result as soon as it arrives
asyncio.run(main())asyncio.sleep() vs time.sleep()
Esta distinción es crítica para la corrección del código async.
time.sleep() es una operación bloqueante que pausa todo el hilo, incluyendo el bucle de eventos. Esto previene que todas las tareas async se ejecuten.
asyncio.sleep() es una corrutina no bloqueante que solo pausa la tarea actual, permitiendo que otras tareas se ejecuten.
import asyncio
import time
async def blocking_example():
"""Bad: Uses time.sleep() - blocks the entire event loop"""
print("Starting blocking sleep")
time.sleep(2) # WRONG: Blocks everything!
print("Finished blocking sleep")
async def non_blocking_example():
"""Good: Uses asyncio.sleep() - allows other tasks to run"""
print("Starting non-blocking sleep")
await asyncio.sleep(2) # CORRECT: Only pauses this task
print("Finished non-blocking sleep")
async def concurrent_task():
for i in range(3):
print(f"Concurrent task running: {i}")
await asyncio.sleep(0.5)
async def demo_blocking():
print("\n=== Blocking example (BAD) ===")
await asyncio.gather(
blocking_example(),
concurrent_task()
)
async def demo_non_blocking():
print("\n=== Non-blocking example (GOOD) ===")
await asyncio.gather(
non_blocking_example(),
concurrent_task()
)
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())En el ejemplo bloqueante, la tarea concurrente no se ejecuta hasta que time.sleep() se completa. En el ejemplo no bloqueante, ambas tareas se ejecutan concurrentemente.
Regla de oro: Nunca uses time.sleep() en código async. Siempre usa await asyncio.sleep().
Para operaciones limitadas por CPU que no puedes evitar, usa loop.run_in_executor() para ejecutarlas en un hilo o proceso separado:
import asyncio
import time
def cpu_intensive_task():
"""Some blocking CPU work"""
time.sleep(2) # Simulate heavy computation
return "CPU task completed"
async def main():
loop = asyncio.get_event_loop()
# Run blocking task in a thread pool
result = await loop.run_in_executor(None, cpu_intensive_task)
print(result)
asyncio.run(main())async for y async with
Python proporciona versiones asíncronas de los bucles for y gestores de contexto para trabajar con iterables asíncronos y recursos.
Iteradores Asíncronos (async for)
Un iterador asíncrono es un objeto que implementa los métodos __aiter__() y __anext__(), permitiéndote iterar sobre elementos que requieren operaciones async para obtener.
import asyncio
class AsyncRange:
"""An async iterator that yields numbers with delays"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
# Simulate async operation to get next value
await asyncio.sleep(0.5)
value = self.current
self.current += 1
return value
async def main():
async for number in AsyncRange(1, 5):
print(f"Got number: {number}")
asyncio.run(main())Ejemplo del mundo real con cursor de base de datos async:
class AsyncDatabaseCursor:
"""Simulated async database cursor"""
def __init__(self, query):
self.query = query
self.results = []
self.index = 0
async def execute(self):
# Simulate database query
await asyncio.sleep(1)
self.results = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.results):
raise StopAsyncIteration
# Simulate fetching next row
await asyncio.sleep(0.1)
row = self.results[self.index]
self.index += 1
return row
async def fetch_users():
cursor = AsyncDatabaseCursor("SELECT * FROM users")
await cursor.execute()
async for row in cursor:
print(f"User: {row['name']}")
asyncio.run(fetch_users())Gestores de Contexto Asíncronos (async with)
Un gestor de contexto async implementa los métodos __aenter__() y __aexit__() para gestionar recursos que requieren configuración y limpieza async.
import asyncio
class AsyncDatabaseConnection:
"""An async context manager for database connections"""
async def __aenter__(self):
print("Opening database connection...")
await asyncio.sleep(1) # Simulate connection time
print("Database connection opened")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.5) # Simulate cleanup
print("Database connection closed")
async def query(self, sql):
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
# Connection automatically closed after the with block
asyncio.run(main())Combinando gestores de contexto async con iteradores async:
class AsyncFileReader:
"""Async context manager and iterator for reading files"""
def __init__(self, filename):
self.filename = filename
self.lines = []
self.index = 0
async def __aenter__(self):
# Simulate async file opening
await asyncio.sleep(0.5)
self.lines = ["Line 1", "Line 2", "Line 3"]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.2)
self.lines = []
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.lines):
raise StopAsyncIteration
await asyncio.sleep(0.1)
line = self.lines[self.index]
self.index += 1
return line
async def read_file():
async with AsyncFileReader("data.txt") as reader:
async for line in reader:
print(line)
asyncio.run(read_file())asyncio.Queue para Patrones Productor-Consumidor
asyncio.Queue es una cola segura para hilos, consciente de async, que es perfecta para coordinar trabajo entre corrutinas productor y consumidor.
import asyncio
import random
async def producer(queue, producer_id):
"""Produces items and puts them in the queue"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
async def consumer(queue, consumer_id):
"""Consumes items from the queue"""
while True:
item = await queue.get()
print(f"Consumer {consumer_id} consuming: {item}")
# Simulate processing time
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for the queue to be fully processed
await queue.join()
# Cancel consumers (they run forever)
for consumer_task in consumers:
consumer_task.cancel()
asyncio.run(main())Ejemplo del mundo real: Web scraper con cola de URLs:
import asyncio
from typing import Set
async def fetch_url(session, url):
"""Simulate fetching a URL"""
await asyncio.sleep(1)
return f"Content from {url}"
async def producer(queue: asyncio.Queue, start_urls: list):
"""Add URLs to the queue"""
for url in start_urls:
await queue.put(url)
async def consumer(queue: asyncio.Queue, visited: Set[str]):
"""Fetch URLs from the queue"""
while True:
url = await queue.get()
if url in visited:
queue.task_done()
continue
visited.add(url)
print(f"Scraping: {url}")
# Simulate fetching
content = await fetch_url(None, url)
# Simulate finding new URLs in the content
# In real scraper, you'd parse HTML and extract links
new_urls = [] # Would extract from content
for new_url in new_urls:
if new_url not in visited:
await queue.put(new_url)
queue.task_done()
async def main():
queue = asyncio.Queue()
visited = set()
start_urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
# Add initial URLs
await producer(queue, start_urls)
# Create consumers
consumers = [
asyncio.create_task(consumer(queue, visited))
for _ in range(3)
]
# Wait for all URLs to be processed
await queue.join()
# Cancel consumers
for consumer_task in consumers:
consumer_task.cancel()
print(f"Scraped {len(visited)} unique URLs")
asyncio.run(main())Semáforos para Limitación de Tasa
asyncio.Semaphore controla el número de corrutinas que pueden acceder a un recurso concurrentemente. Esto es esencial para limitar la tasa de llamadas a API o limitar conexiones concurrentes a bases de datos.
import asyncio
import time
async def call_api(semaphore, api_id):
"""Make an API call with rate limiting"""
async with semaphore:
print(f"API call {api_id} started")
await asyncio.sleep(1) # Simulate API call
print(f"API call {api_id} completed")
return f"Result {api_id}"
async def main():
# Allow only 3 concurrent API calls
semaphore = asyncio.Semaphore(3)
start = time.time()
# Create 10 API calls
tasks = [call_api(semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
# With semaphore(3): ~4 seconds (10 calls in batches of 3)
# Without semaphore: ~1 second (all concurrent)
asyncio.run(main())Limitación de tasa para cumplimiento de cuota de API:
import asyncio
from datetime import datetime
class RateLimiter:
"""Rate limiter that allows N requests per time period"""
def __init__(self, max_requests, time_period):
self.max_requests = max_requests
self.time_period = time_period
self.semaphore = asyncio.Semaphore(max_requests)
self.request_times = []
async def __aenter__(self):
await self.semaphore.acquire()
# Wait if we've hit the rate limit
while len(self.request_times) >= self.max_requests:
oldest = self.request_times[0]
elapsed = datetime.now().timestamp() - oldest
if elapsed < self.time_period:
sleep_time = self.time_period - elapsed
await asyncio.sleep(sleep_time)
self.request_times.pop(0)
self.request_times.append(datetime.now().timestamp())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
async def fetch_with_rate_limit(rate_limiter, item_id):
async with rate_limiter:
print(f"Fetching item {item_id} at {datetime.now()}")
await asyncio.sleep(0.5)
return f"Item {item_id}"
async def main():
# Allow 5 requests per 2 seconds
rate_limiter = RateLimiter(max_requests=5, time_period=2)
# Make 15 requests
tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(main())aiohttp para Solicitudes HTTP Asíncronas
La librería aiohttp proporciona funcionalidad cliente y servidor HTTP async. Es la opción estándar para hacer solicitudes HTTP en código async.
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch_url(session, "https://api.github.com")
print(f"Fetched {len(html)} characters")
asyncio.run(main())Obteniendo múltiples URLs concurrentemente:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Fetch URL and return status code and content length"""
async with session.get(url) as response:
content = await response.text()
return {
"url": url,
"status": response.status,
"length": len(content)
}
async def fetch_all_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
start = time.time()
results = await fetch_all_urls(urls)
elapsed = time.time() - start
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())Ejemplo práctico con manejo de errores y reintentos:
import asyncio
import aiohttp
from typing import Optional
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Optional[dict]:
"""Fetch URL with retry logic"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return data
elif response.status == 404:
print(f"URL not found: {url}")
return None
else:
print(f"Unexpected status {response.status} for {url}")
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1} for {url}")
except aiohttp.ClientError as e:
print(f"Client error on attempt {attempt + 1} for {url}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
print(f"Failed to fetch {url} after {max_retries} attempts")
return None
async def main():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/nonexistent",
"https://httpbin.org/delay/5"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
asyncio.run(main())aiofiles para E/S de Archivos Asíncrona
La librería aiofiles proporciona operaciones de archivos async, previniendo bloqueos durante lecturas y escrituras de archivos.
import asyncio
import aiofiles
async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
return contents
async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# Write data
await write_file('test.txt', 'Hello, async file I/O!')
# Read data
content = await read_file('test.txt')
print(f"Read: {content}")
asyncio.run(main())Procesando múltiples archivos concurrentemente:
import asyncio
import aiofiles
async def process_file(input_file, output_file):
"""Read, process, and write a file"""
async with aiofiles.open(input_file, mode='r') as f:
content = await f.read()
# Process content (convert to uppercase)
processed = content.upper()
async with aiofiles.open(output_file, mode='w') as f:
await f.write(processed)
return f"Processed {input_file} -> {output_file}"
async def main():
files = [
('input1.txt', 'output1.txt'),
('input2.txt', 'output2.txt'),
('input3.txt', 'output3.txt')
]
tasks = [process_file(inp, out) for inp, out in files]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())Leyendo archivos grandes línea por línea:
import asyncio
import aiofiles
async def process_large_file(filename):
"""Process a large file line by line without loading it all into memory"""
line_count = 0
async with aiofiles.open(filename, mode='r') as f:
async for line in f:
# Process each line
line_count += 1
if line.strip():
# Do something with the line
pass
return line_count
async def main():
count = await process_large_file('large_data.txt')
print(f"Processed {count} lines")
asyncio.run(main())Manejo de Errores en Código Async
El manejo de errores en código async requiere atención especial para asegurar que las excepciones sean capturadas adecuadamente y los recursos se limpien.
import asyncio
async def risky_operation(item_id):
"""An operation that might fail"""
await asyncio.sleep(1)
if item_id % 2 == 0:
raise ValueError(f"Item {item_id} caused an error")
return f"Item {item_id} processed"
async def handle_with_try_except():
"""Handle errors with try/except"""
try:
result = await risky_operation(2)
print(result)
except ValueError as e:
print(f"Error caught: {e}")
asyncio.run(handle_with_try_except())Manejando errores en tareas concurrentes:
async def safe_operation(item_id):
"""Wrapper that catches errors from risky_operation"""
try:
result = await risky_operation(item_id)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e), "item_id": item_id}
async def main():
tasks = [safe_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks)
for result in results:
if result["success"]:
print(f"Success: {result['result']}")
else:
print(f"Failed: Item {result['item_id']} - {result['error']}")
asyncio.run(main())Usando gather() con return_exceptions=True:
async def main():
tasks = [risky_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())Limpieza con gestores de contexto async:
class AsyncResource:
"""A resource that needs cleanup even if errors occur"""
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Cleaning up resource")
await asyncio.sleep(0.5)
if exc_type is not None:
print(f"Exception during resource use: {exc_val}")
# Return False to propagate exception, True to suppress
return False
async def do_work(self):
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
try:
async with AsyncResource() as resource:
await resource.do_work()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())Benchmarks de Rendimiento: Sync vs Async para Tareas Limitadas por E/S
Comparemos enfoques síncronos y asíncronos para operaciones limitadas por E/S con benchmarks reales.
import asyncio
import time
import requests
import aiohttp
# Synchronous approach
def fetch_sync(url):
response = requests.get(url)
return len(response.text)
def benchmark_sync(urls):
start = time.time()
results = [fetch_sync(url) for url in urls]
elapsed = time.time() - start
return elapsed, results
# Asynchronous approach
async def fetch_async(session, url):
async with session.get(url) as response:
text = await response.text()
return len(text)
async def benchmark_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# Run benchmarks
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
print(f"\nSpeedup: {sync_time / async_time:.2f}x")Resultados típicos para 5 URLs con retraso de 1 segundo cada una:
- Síncrono: ~5 segundos (ejecución secuencial)
- Asíncrono: ~1 segundo (ejecución concurrente)
- Aceleración: ~5x
| Número de URLs | Tiempo Sync | Tiempo Async | Aceleración |
|---|---|---|---|
| 5 | 5.2s | 1.1s | 4.7x |
| 10 | 10.4s | 1.2s | 8.7x |
| 20 | 20.8s | 1.4s | 14.9x |
| 50 | 52.1s | 2.1s | 24.8x |
| 100 | 104.5s | 3.8s | 27.5x |
La aceleración aumenta con el número de operaciones concurrentes hasta que alcanzas restricciones de ancho de banda o limitación de tasa.
Errores Comunes y Cómo Evitarlos
Olvidar await
# WRONG: Coroutine is created but not executed
async def wrong():
result = fetch_data() # Missing await!
print(result) # Prints coroutine object, not the result
# CORRECT: Await the coroutine
async def correct():
result = await fetch_data()
print(result) # Prints actual resultBloquear el bucle de eventos
# WRONG: Blocks the entire event loop
async def blocking_code():
time.sleep(5) # Blocks everything!
result = compute_something() # Blocks if CPU-intensive
return result
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
await asyncio.sleep(5) # Only pauses this task
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute_something)
return resultNo manejar cancelación de tareas
# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
resource = await acquire_resource()
await long_operation()
await release_resource(resource) # May not execute if cancelled
# CORRECT: Use try/finally or async with
async def proper_cleanup():
resource = await acquire_resource()
try:
await long_operation()
finally:
await release_resource(resource) # Always executesCrear conflictos de bucle de eventos
# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
result = asyncio.run(some_coroutine()) # Error!
return result
# CORRECT: Just await the coroutine
async def correct_nesting():
result = await some_coroutine()
return resultNo limitar concurrencia
# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
tasks = [process_item(item) for item in items] # 10,000 tasks!
return await asyncio.gather(*tasks)
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)Ejemplos del Mundo Real
Web Scraping con Limitación de Tasa
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_page(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
tasks = [self.fetch_page(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
async with AsyncWebScraper(max_concurrent=3) as scraper:
pages = await scraper.scrape_urls(urls)
for url, html in zip(urls, pages):
if html:
print(f"Scraped {url}: {len(html)} bytes")
asyncio.run(main())Pipeline de Datos de API Asíncrona
import asyncio
import aiohttp
async def fetch_user_ids(session):
"""Fetch list of user IDs from API"""
async with session.get("https://api.example.com/users") as response:
data = await response.json()
return [user["id"] for user in data["users"]]
async def fetch_user_details(session, user_id):
"""Fetch detailed info for a user"""
async with session.get(f"https://api.example.com/users/{user_id}") as response:
return await response.json()
async def fetch_user_posts(session, user_id):
"""Fetch posts for a user"""
async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
return await response.json()
async def process_user(session, user_id):
"""Fetch all data for a user concurrently"""
details, posts = await asyncio.gather(
fetch_user_details(session, user_id),
fetch_user_posts(session, user_id)
)
return {
"user": details,
"posts": posts,
"post_count": len(posts)
}
async def main():
async with aiohttp.ClientSession() as session:
# Fetch user IDs
user_ids = await fetch_user_ids(session)
# Process all users concurrently
user_data = await asyncio.gather(
*[process_user(session, uid) for uid in user_ids[:10]]
)
for data in user_data:
print(f"User {data['user']['name']}: {data['post_count']} posts")
asyncio.run(main())Servidor de Chat Asíncrono
import asyncio
class ChatServer:
def __init__(self):
self.clients = set()
async def handle_client(self, reader, writer):
"""Handle a single client connection"""
addr = writer.get_extra_info('peername')
print(f"Client connected: {addr}")
self.clients.add(writer)
try:
while True:
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Received from {addr}: {message}")
# Broadcast to all clients
await self.broadcast(f"{addr}: {message}", writer)
except asyncio.CancelledError:
pass
finally:
print(f"Client disconnected: {addr}")
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
async def broadcast(self, message, sender):
"""Send message to all clients except sender"""
for client in self.clients:
if client != sender:
try:
client.write(message.encode())
await client.drain()
except Exception as e:
print(f"Error broadcasting: {e}")
async def start(self, host='127.0.0.1', port=8888):
"""Start the chat server"""
server = await asyncio.start_server(
self.handle_client,
host,
port
)
addr = server.sockets[0].getsockname()
print(f"Chat server running on {addr}")
async with server:
await server.serve_forever()
async def main():
chat_server = ChatServer()
await chat_server.start()
asyncio.run(main())Experimentando con Asyncio en Jupyter
Cuando trabajas con asyncio en Jupyter notebooks, puedes encontrar conflictos de bucle de eventos. Jupyter ya ejecuta un bucle de eventos, lo cual puede interferir con asyncio.run().
Para experimentación async fluida en entornos Jupyter, considera usar RunCell (opens in a new tab), un agente de IA diseñado específicamente para Jupyter notebooks. RunCell maneja la gestión del bucle de eventos automáticamente y proporciona capacidades mejoradas de depuración async, permitiéndote probar patrones asyncio interactivamente sin conflictos.
En Jupyter estándar, puedes usar await de nivel superior:
# In Jupyter, this works directly
async def fetch_data():
await asyncio.sleep(1)
return "data"
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)O usa el paquete nest_asyncio para permitir bucles de eventos anidados:
import nest_asyncio
nest_asyncio.apply()
# Now asyncio.run() works in Jupyter
asyncio.run(main())FAQ
¿Cuál es la diferencia entre asyncio y threading en Python?
Asyncio usa multitarea cooperativa en un solo hilo, donde las tareas ceden control voluntariamente usando await. Threading usa multitarea preventiva con múltiples hilos del sistema operativo, donde el OS decide cuándo cambiar entre hilos. Asyncio es más eficiente para tareas limitadas por E/S con menor sobrecarga de memoria y sin riesgos de condiciones de carrera, mientras que threading puede manejar librerías bloqueantes que no soportan async. Ambos están limitados por el GIL de Python para trabajo limitado por CPU, pero asyncio evita la sobrecarga del cambio de contexto de hilos.
¿Cuándo debería usar asyncio en lugar de multiprocessing?
Usa asyncio para tareas limitadas por E/S como llamadas a APIs, consultas a bases de datos, operaciones de archivos y comunicación de red. Usa multiprocessing para tareas limitadas por CPU como procesamiento de datos, cálculos matemáticos, manipulación de imágenes y entrenamiento de modelos de machine learning. Asyncio sobresale manejando miles de operaciones concurrentes de E/S con mínima sobrecarga de recursos, mientras que multiprocessing proporciona computación paralela real en múltiples núcleos de CPU.
¿Puedo mezclar código async y sync en la misma aplicación?
Sí, pero con planificación cuidadosa. Puedes llamar funciones async desde código sync usando asyncio.run(), aunque no desde dentro de un bucle de eventos ya en ejecución. Para llamar funciones sync bloqueantes desde código async, usa loop.run_in_executor() para ejecutarlas en un pool de hilos, previniendo que bloqueen el bucle de eventos. Nunca uses operaciones bloqueantes como time.sleep(), requests.get() o E/S de archivos síncrona directamente en funciones async, ya que bloquean todo el bucle de eventos.
¿Cómo depuro código asyncio efectivamente?
Habilita el modo debug de asyncio con asyncio.run(main(), debug=True) o establece la variable de entorno PYTHONASYNCIODEBUG=1. Esto detecta errores comunes como olvidar await, callbacks que toman demasiado tiempo y corrutinas que nunca fueron esperadas. Usa logging extensivamente para trazar el flujo de ejecución, ya que los debuggers tradicionales pueden ser confusos con código async. Añade nombres de tareas con asyncio.create_task(coro(), name="task-name") para mensajes de error más claros. Usa asyncio.gather(..., return_exceptions=True) para prevenir que una tarea fallida oculte errores en otras. Monitorea tu bucle de eventos con asyncio.all_tasks() para verificar tareas que no se completan.
¿Cuáles son los límites de rendimiento de asyncio?
Asyncio puede manejar decenas de miles de operaciones concurrentes de E/S en un solo hilo, superando ampliamente los límites de threading. La restricción principal es que asyncio no proporciona beneficio para operaciones limitadas por CPU ya que usa un solo hilo. El rendimiento se degrada si bloqueas el bucle de eventos con E/S síncrona o computación pesada. El ancho de banda de red y los límites de tasa de API se convierten en el cuello de botella antes que los límites de concurrencia de asyncio. El uso de memoria escala con el número de tareas concurrentes, pero cada tarea tiene sobrecarga mínima comparada con hilos. Para máximo rendimiento, combina asyncio para concurrencia de E/S con multiprocessing para trabajo limitado por CPU, y siempre usa semáforos para limitar operaciones concurrentes a niveles razonables.
Conclusión
Python asyncio transforma aplicaciones limitadas por E/S de operaciones lentas y bloqueantes en sistemas rápidos y concurrentes. Al dominar la sintaxis async/await, entender el bucle de eventos y aprovechar herramientas como gather(), create_task() y semáforos, puedes construir aplicaciones que manejen miles de operaciones concurrentes eficientemente.
La clave del éxito con asyncio es reconocer cuándo es la herramienta correcta. Úsalo para solicitudes de red, consultas a bases de datos, operaciones de archivos y cualquier tarea que pase tiempo esperando recursos externos. Evita bloquear el bucle de eventos con operaciones síncronas, siempre usa await con corrutinas y limita la concurrencia con semáforos cuando sea necesario.
Comienza convirtiendo pequeñas secciones de tu codebase a async, mide la mejora de rendimiento y expande gradualmente. Las mejoras dramáticas de velocidad en aplicaciones con mucha E/S hacen que la inversión de aprendizaje valga la pena.