Skip to content

Python asyncio: Guía Completa de Programación Asincrónica

Updated on

Tu aplicación Python realiza 100 llamadas a API, cada una tomando 2 segundos. Con código secuencial tradicional, eso son 200 segundos de espera. Tus usuarios miran pantallas de carga. Tus servidores permanecen inactivos, consumiendo recursos mientras esperan respuestas. Este comportamiento bloqueante es el problema que destruye el rendimiento de la aplicación y la experiencia del usuario.

El dolor se intensifica cuando escalas. Las consultas a la base de datos se acumulan. Las operaciones de archivos se encolan una tras otra. Los web scrapers avanzan a paso de tortuga. Cada operación de E/S se convierte en un cuello de botella que se propaga por todo tu sistema, convirtiendo lo que debería ser una aplicación rápida y responsiva en un monstruo lento y desperdiciador de recursos.

Python asyncio resuelve esto permitiendo la ejecución concurrente de tareas limitadas por E/S. En lugar de esperar a que cada operación se complete antes de iniciar la siguiente, asyncio permite que tu código inicie múltiples operaciones y cambie entre ellas mientras espera. ¿Esas 100 llamadas a API? Con asyncio, se completan en aproximadamente 2 segundos en lugar de 200. Esta guía te muestra exactamente cómo implementar la programación asíncrona en Python, con ejemplos prácticos que transforman código lento y bloqueante en aplicaciones rápidas y concurrentes.

📚

Qué es la Programación Asincrónica y Por Qué Importa

La programación asincrónica permite que un programa inicie tareas potencialmente de larga duración y pase a otro trabajo antes de que esas tareas se completen, en lugar de esperar a que cada tarea termine antes de iniciar la siguiente.

En el código síncrono tradicional, cuando haces una solicitud a una API, tu programa se detiene y espera la respuesta. Durante este período de espera, tu CPU permanece inactiva, sin hacer nada productivo. Esto es aceptable para operaciones únicas, pero catastrófico para aplicaciones que necesitan manejar múltiples operaciones de E/S.

Asyncio proporciona una forma de escribir código concurrente utilizando la sintaxis async/await. Es particularmente efectivo para operaciones limitadas por E/S como:

  • Realizar solicitudes HTTP a APIs
  • Leer y escribir archivos
  • Consultas a bases de datos
  • Comunicación de red
  • Conexiones WebSocket
  • Procesamiento de colas de mensajes

La mejora de rendimiento es dramática. Considera obtener datos de 50 URLs diferentes:

Enfoque síncrono: 50 solicitudes × 2 segundos cada una = 100 segundos totales Enfoque asíncrono: Las 50 solicitudes se ejecutan concurrentemente ≈ 2 segundos totales

Esta mejora de rendimiento de 50x proviene de una mejor utilización de recursos. En lugar de bloquearse en operaciones de E/S, asyncio permite que tu programa continúe ejecutando otras tareas mientras espera que la E/S se complete.

Concurrencia vs Paralelismo vs Asíncrono

Entender la distinción entre estos conceptos es esencial para usar asyncio efectivamente.

Concurrencia significa gestionar múltiples tareas a la vez. Las tareas se turnan para progresar, pero solo una se ejecuta en cualquier momento dado. Piensa en un chef preparando múltiples platos, cambiando entre tareas mientras cada uno espera a que se cocine.

Paralelismo significa ejecutar múltiples tareas simultáneamente en diferentes núcleos de CPU. Esto requiere hardware de procesamiento paralelo real y es ideal para tareas limitadas por CPU como cálculos matemáticos o procesamiento de imágenes.

Programación asíncrona es una forma específica de concurrencia diseñada para tareas limitadas por E/S. Utiliza un solo hilo y cambia entre tareas cuando están esperando operaciones de E/S.

CaracterísticaasyncioThreadingMultiprocessing
Modelo de ejecuciónHilo único, multitarea cooperativaMúltiples hilos, multitarea preventivaMúltiples procesos
Mejor paraTareas limitadas por E/STareas limitadas por E/S con librerías bloqueantesTareas limitadas por CPU
Sobrecarga de memoriaMínimaModeradaAlta
Costo de cambio de contextoMuy bajoBajo a moderadoAlto
ComplejidadModerada (sintaxis async/await)Alta (condiciones de carrera, locks)Alta (IPC, serialización)
Limitación del GILNo afectada (hilo único)Limitada por GILNo limitada (procesos separados)
Aceleración típica para E/S10-100x5-10xN/A

El Global Interpreter Lock (GIL) de Python previene la ejecución paralela real del bytecode de Python en hilos, haciendo que el threading sea menos efectivo para tareas limitadas por CPU. Asyncio evita esta limitación utilizando un solo hilo con multitarea cooperativa, mientras que multiprocessing la evita completamente con procesos separados.

async def y await Palabras Clave

La base de asyncio se construye sobre dos palabras clave: async y await.

La palabra clave async def define una función de corrutina. Cuando llamas a una función de corrutina, no se ejecuta inmediatamente. En su lugar, devuelve un objeto de corrutina que puede ser esperado.

import asyncio
 
async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(2)  # Simulate I/O operation
    print("Data fetched!")
    return {"status": "success"}
 
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine))  # <class 'coroutine'>
 
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine)  # This would execute the coroutine

La palabra clave await pausa la ejecución de una corrutina hasta que la operación esperada se complete. Durante esta pausa, el bucle de eventos puede ejecutar otras corrutinas. Solo puedes usar await dentro de una función async def.

async def process_user(user_id):
    # Await an I/O operation
    user_data = await fetch_user_from_database(user_id)
 
    # Await another I/O operation
    user_profile = await fetch_user_profile(user_id)
 
    # Regular synchronous code runs normally
    processed_data = transform_data(user_data, user_profile)
 
    return processed_data

Reglas clave para async/await:

  1. Solo puedes usar await en corrutinas, tareas o futuros
  2. Solo puedes usar await dentro de una función async def
  3. Las funciones regulares no pueden usar await
  4. Llamar una función async sin usar await crea un objeto de corrutina pero no ejecuta el código

Error común:

async def wrong_example():
    # This creates a coroutine but doesn't execute it!
    fetch_data()  # Missing await
 
async def correct_example():
    # This actually executes the coroutine
    result = await fetch_data()

asyncio.run() Punto de Entrada

La función asyncio.run() es la forma estándar de iniciar el bucle de eventos de asyncio y ejecutar tu corrutina principal. Fue introducida en Python 3.7 y simplifica la ejecución de código async desde contextos síncronos.

import asyncio
 
async def main():
    print("Starting async operations")
    await asyncio.sleep(1)
    print("Finished")
 
# Run the main coroutine
asyncio.run(main())

Lo que asyncio.run() hace detrás de escenas:

  1. Crea un nuevo bucle de eventos
  2. Ejecuta la corrutina proporcionada hasta su finalización
  3. Cierra el bucle de eventos
  4. Devuelve el resultado de la corrutina
import asyncio
 
async def main():
    result = await compute_value()
    return result
 
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)

Características importantes de asyncio.run():

  • No puede llamarse desde dentro de un bucle de eventos en ejecución: Si ya estás en una función async, usa await en su lugar
  • Crea un bucle de eventos fresco cada vez: No llames a asyncio.run() múltiples veces en el mismo programa a menos que quieras instancias separadas del bucle de eventos
  • Siempre cierra el bucle: El bucle de eventos se limpia después de la ejecución

Para Jupyter notebooks o entornos donde ya hay un bucle de eventos en ejecución, usa await directamente o asyncio.create_task(). Herramientas como RunCell (opens in a new tab) proporcionan soporte async mejorado en entornos Jupyter, facilitando experimentar con patrones asyncio interactivamente sin conflictos del bucle de eventos.

Antes de Python 3.7, tenías que gestionar manualmente el bucle de eventos:

# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
 
# Modern way (Python 3.7+)
asyncio.run(main())

Corrutinas, Tareas y Futuros

Entender estos tres conceptos fundamentales es esencial para dominar asyncio.

Corrutinas

Una corrutina es una función definida con async def. Es una función especial que puede ser pausada y reanudada, permitiendo que otro código se ejecute durante la pausa.

import asyncio
 
async def my_coroutine():
    await asyncio.sleep(1)
    return "completed"
 
# This creates a coroutine object
coro = my_coroutine()
 
# To run it
result = asyncio.run(coro)

Tareas

Una tarea es un envoltorio alrededor de una corrutina que la programa para ejecución en el bucle de eventos. Las tareas permiten que las corrutinas se ejecuten concurrentemente.

import asyncio
 
async def say_after(delay, message):
    await asyncio.sleep(delay)
    print(message)
    return message
 
async def main():
    # Create tasks to run concurrently
    task1 = asyncio.create_task(say_after(1, "First"))
    task2 = asyncio.create_task(say_after(2, "Second"))
 
    # Wait for both tasks to complete
    result1 = await task1
    result2 = await task2
 
    print(f"Results: {result1}, {result2}")
 
asyncio.run(main())

Crear una tarea programa inmediatamente la corrutina para ejecución. El bucle de eventos comienza a ejecutarla tan pronto como sea posible, incluso antes de que hagas await de la tarea.

async def background_work():
    print("Starting background work")
    await asyncio.sleep(2)
    print("Background work completed")
 
async def main():
    # The coroutine starts running immediately
    task = asyncio.create_task(background_work())
 
    print("Task created, doing other work")
    await asyncio.sleep(1)
    print("Other work done")
 
    # Wait for the background task to finish
    await task
 
asyncio.run(main())

Output:

Task created, doing other work
Starting background work
Other work done
Background work completed

Futuros

Un futuro es un objeto esperable de bajo nivel que representa un resultado eventual de una operación asíncrona. Raramente creas futuros directamente; típicamente son creados por asyncio internamente o por librerías.

async def set_future_result(future):
    await asyncio.sleep(1)
    future.set_result("Future completed")
 
async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
 
    # Schedule a coroutine to set the future's result
    asyncio.create_task(set_future_result(future))
 
    # Wait for the future to get a result
    result = await future
    print(result)
 
asyncio.run(main())

Relación entre corrutinas, tareas y futuros:

  • Las corrutinas son las funciones que escribes
  • Las tareas envuelven corrutinas y las programan para ejecución
  • Los futuros representan resultados que estarán disponibles en el futuro
  • Las tareas son una subclase de Futuro

asyncio.create_task() para Ejecución Concurrente

La función asyncio.create_task() es tu herramienta principal para lograr concurrencia real con asyncio. Programa una corrutina para ejecutarse en el bucle de eventos sin bloquear la corrutina actual.

import asyncio
import time
 
async def download_file(file_id):
    print(f"Starting download {file_id}")
    await asyncio.sleep(2)  # Simulate download time
    print(f"Completed download {file_id}")
    return f"file_{file_id}.dat"
 
async def sequential_downloads():
    """Downloads files one at a time"""
    start = time.time()
 
    file1 = await download_file(1)
    file2 = await download_file(2)
    file3 = await download_file(3)
 
    elapsed = time.time() - start
    print(f"Sequential: {elapsed:.2f} seconds")
    # Output: ~6 seconds (2 + 2 + 2)
 
async def concurrent_downloads():
    """Downloads files concurrently"""
    start = time.time()
 
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(download_file(1))
    task2 = asyncio.create_task(download_file(2))
    task3 = asyncio.create_task(download_file(3))
 
    # Wait for all tasks to complete
    file1 = await task1
    file2 = await task2
    file3 = await task3
 
    elapsed = time.time() - start
    print(f"Concurrent: {elapsed:.2f} seconds")
    # Output: ~2 seconds (all run at the same time)
 
asyncio.run(concurrent_downloads())

La tarea se programa inmediatamente cuando se llama a create_task(). No tienes que hacerle await de inmediato.

async def process_data():
    # Start background tasks
    task1 = asyncio.create_task(fetch_from_api_1())
    task2 = asyncio.create_task(fetch_from_api_2())
 
    # Do some work while tasks run in background
    local_data = prepare_local_data()
 
    # Now wait for the background tasks
    api_data_1 = await task1
    api_data_2 = await task2
 
    # Combine all data
    return combine_data(local_data, api_data_1, api_data_2)

También puedes nombrar tareas para mejor depuración:

async def main():
    task = asyncio.create_task(
        long_running_operation(),
        name="long-operation-task"
    )
 
    # Task name is accessible
    print(f"Task name: {task.get_name()}")
 
    await task

asyncio.gather() para Ejecutar Múltiples Corrutinas

La función asyncio.gather() ejecuta múltiples corrutinas concurrentemente y espera a que todas se completen. Es más limpia que crear tareas individuales cuando necesitas ejecutar muchas corrutinas.

import asyncio
 
async def fetch_user(user_id):
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}
 
async def main():
    # Run multiple coroutines concurrently
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
        fetch_user(4),
        fetch_user(5)
    )
 
    print(results)
    # Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
 
asyncio.run(main())

gather() devuelve resultados en el mismo orden que las corrutinas de entrada, independientemente de cuál se complete primero.

async def task_with_delay(delay, value):
    await asyncio.sleep(delay)
    return value
 
async def main():
    results = await asyncio.gather(
        task_with_delay(3, "slow"),
        task_with_delay(1, "fast"),
        task_with_delay(2, "medium")
    )
 
    print(results)
    # Output: ['slow', 'fast', 'medium'] (order preserved)
 
asyncio.run(main())

Manejo de Errores con gather()

Por defecto, si alguna corrutina lanza una excepción, gather() lanza inmediatamente esa excepción y cancela las tareas restantes.

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("Task failed!")
 
async def successful_task():
    await asyncio.sleep(2)
    return "success"
 
async def main():
    try:
        results = await asyncio.gather(
            failing_task(),
            successful_task()
        )
    except ValueError as e:
        print(f"Error caught: {e}")
        # The successful_task is cancelled
 
asyncio.run(main())

Usa return_exceptions=True para recoger excepciones junto con resultados exitosos:

async def main():
    results = await asyncio.gather(
        failing_task(),
        successful_task(),
        return_exceptions=True
    )
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

gather() dinámico con comprensión de listas

gather() funciona hermosamente con corrutinas generadas dinámicamente:

async def process_item(item):
    await asyncio.sleep(1)
    return item * 2
 
async def main():
    items = [1, 2, 3, 4, 5]
 
    # Process all items concurrently
    results = await asyncio.gather(
        *[process_item(item) for item in items]
    )
 
    print(results)  # [2, 4, 6, 8, 10]
 
asyncio.run(main())

asyncio.wait() y asyncio.as_completed()

Mientras que gather() espera a que todas las corrutinas se completen, wait() y as_completed() proporcionan un control más granular sobre cómo manejas las tareas completadas.

asyncio.wait()

wait() te permite esperar tareas con diferentes condiciones de finalización: todas las tareas, primera tarea, o primera excepción.

import asyncio
 
async def task(delay, name):
    await asyncio.sleep(delay)
    return f"{name} completed"
 
async def main():
    tasks = [
        asyncio.create_task(task(1, "Task 1")),
        asyncio.create_task(task(2, "Task 2")),
        asyncio.create_task(task(3, "Task 3"))
    ]
 
    # Wait for all tasks to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )
 
    for task in done:
        print(task.result())
 
asyncio.run(main())

Diferentes condiciones de finalización:

async def main():
    tasks = [
        asyncio.create_task(task(1, "Fast")),
        asyncio.create_task(task(3, "Slow")),
        asyncio.create_task(task(2, "Medium"))
    ]
 
    # Return when the first task completes
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
 
    print(f"First completed: {done.pop().result()}")
    print(f"Still pending: {len(pending)} tasks")
 
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
 
asyncio.run(main())

asyncio.as_completed()

as_completed() devuelve un iterador que produce tareas a medida que se completan, permitiéndote procesar resultados tan pronto como estén disponibles.

import asyncio
 
async def fetch_data(url_id, delay):
    await asyncio.sleep(delay)
    return f"Data from URL {url_id}"
 
async def main():
    tasks = [
        fetch_data(1, 3),
        fetch_data(2, 1),
        fetch_data(3, 2)
    ]
 
    # Process results as they complete
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Received: {result}")
 
asyncio.run(main())

La salida muestra resultados en orden de finalización, no en orden de envío:

Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1

Esto es particularmente útil cuando quieres mostrar resultados a los usuarios tan rápido como sea posible:

async def search_engine(query, engine_name, delay):
    await asyncio.sleep(delay)
    return f"{engine_name}: Results for '{query}'"
 
async def main():
    query = "python asyncio"
 
    searches = [
        search_engine(query, "Google", 1.5),
        search_engine(query, "Bing", 2.0),
        search_engine(query, "DuckDuckGo", 1.0)
    ]
 
    print("Searching...")
    for search in asyncio.as_completed(searches):
        result = await search
        print(result)  # Display each result as soon as it arrives
 
asyncio.run(main())

asyncio.sleep() vs time.sleep()

Esta distinción es crítica para la corrección del código async.

time.sleep() es una operación bloqueante que pausa todo el hilo, incluyendo el bucle de eventos. Esto previene que todas las tareas async se ejecuten.

asyncio.sleep() es una corrutina no bloqueante que solo pausa la tarea actual, permitiendo que otras tareas se ejecuten.

import asyncio
import time
 
async def blocking_example():
    """Bad: Uses time.sleep() - blocks the entire event loop"""
    print("Starting blocking sleep")
    time.sleep(2)  # WRONG: Blocks everything!
    print("Finished blocking sleep")
 
async def non_blocking_example():
    """Good: Uses asyncio.sleep() - allows other tasks to run"""
    print("Starting non-blocking sleep")
    await asyncio.sleep(2)  # CORRECT: Only pauses this task
    print("Finished non-blocking sleep")
 
async def concurrent_task():
    for i in range(3):
        print(f"Concurrent task running: {i}")
        await asyncio.sleep(0.5)
 
async def demo_blocking():
    print("\n=== Blocking example (BAD) ===")
    await asyncio.gather(
        blocking_example(),
        concurrent_task()
    )
 
async def demo_non_blocking():
    print("\n=== Non-blocking example (GOOD) ===")
    await asyncio.gather(
        non_blocking_example(),
        concurrent_task()
    )
 
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())

En el ejemplo bloqueante, la tarea concurrente no se ejecuta hasta que time.sleep() se completa. En el ejemplo no bloqueante, ambas tareas se ejecutan concurrentemente.

Regla de oro: Nunca uses time.sleep() en código async. Siempre usa await asyncio.sleep().

Para operaciones limitadas por CPU que no puedes evitar, usa loop.run_in_executor() para ejecutarlas en un hilo o proceso separado:

import asyncio
import time
 
def cpu_intensive_task():
    """Some blocking CPU work"""
    time.sleep(2)  # Simulate heavy computation
    return "CPU task completed"
 
async def main():
    loop = asyncio.get_event_loop()
 
    # Run blocking task in a thread pool
    result = await loop.run_in_executor(None, cpu_intensive_task)
    print(result)
 
asyncio.run(main())

async for y async with

Python proporciona versiones asíncronas de los bucles for y gestores de contexto para trabajar con iterables asíncronos y recursos.

Iteradores Asíncronos (async for)

Un iterador asíncrono es un objeto que implementa los métodos __aiter__() y __anext__(), permitiéndote iterar sobre elementos que requieren operaciones async para obtener.

import asyncio
 
class AsyncRange:
    """An async iterator that yields numbers with delays"""
    def __init__(self, start, end):
        self.current = start
        self.end = end
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
 
        # Simulate async operation to get next value
        await asyncio.sleep(0.5)
        value = self.current
        self.current += 1
        return value
 
async def main():
    async for number in AsyncRange(1, 5):
        print(f"Got number: {number}")
 
asyncio.run(main())

Ejemplo del mundo real con cursor de base de datos async:

class AsyncDatabaseCursor:
    """Simulated async database cursor"""
    def __init__(self, query):
        self.query = query
        self.results = []
        self.index = 0
 
    async def execute(self):
        # Simulate database query
        await asyncio.sleep(1)
        self.results = [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"},
            {"id": 3, "name": "Charlie"}
        ]
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.results):
            raise StopAsyncIteration
 
        # Simulate fetching next row
        await asyncio.sleep(0.1)
        row = self.results[self.index]
        self.index += 1
        return row
 
async def fetch_users():
    cursor = AsyncDatabaseCursor("SELECT * FROM users")
    await cursor.execute()
 
    async for row in cursor:
        print(f"User: {row['name']}")
 
asyncio.run(fetch_users())

Gestores de Contexto Asíncronos (async with)

Un gestor de contexto async implementa los métodos __aenter__() y __aexit__() para gestionar recursos que requieren configuración y limpieza async.

import asyncio
 
class AsyncDatabaseConnection:
    """An async context manager for database connections"""
 
    async def __aenter__(self):
        print("Opening database connection...")
        await asyncio.sleep(1)  # Simulate connection time
        print("Database connection opened")
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)  # Simulate cleanup
        print("Database connection closed")
 
    async def query(self, sql):
        await asyncio.sleep(0.5)
        return f"Results for: {sql}"
 
async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(result)
    # Connection automatically closed after the with block
 
asyncio.run(main())

Combinando gestores de contexto async con iteradores async:

class AsyncFileReader:
    """Async context manager and iterator for reading files"""
 
    def __init__(self, filename):
        self.filename = filename
        self.lines = []
        self.index = 0
 
    async def __aenter__(self):
        # Simulate async file opening
        await asyncio.sleep(0.5)
        self.lines = ["Line 1", "Line 2", "Line 3"]
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.2)
        self.lines = []
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.lines):
            raise StopAsyncIteration
 
        await asyncio.sleep(0.1)
        line = self.lines[self.index]
        self.index += 1
        return line
 
async def read_file():
    async with AsyncFileReader("data.txt") as reader:
        async for line in reader:
            print(line)
 
asyncio.run(read_file())

asyncio.Queue para Patrones Productor-Consumidor

asyncio.Queue es una cola segura para hilos, consciente de async, que es perfecta para coordinar trabajo entre corrutinas productor y consumidor.

import asyncio
import random
 
async def producer(queue, producer_id):
    """Produces items and puts them in the queue"""
    for i in range(5):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
 
async def consumer(queue, consumer_id):
    """Consumes items from the queue"""
    while True:
        item = await queue.get()
        print(f"Consumer {consumer_id} consuming: {item}")
 
        # Simulate processing time
        await asyncio.sleep(random.uniform(0.2, 0.8))
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue(maxsize=10)
 
    # Create producers and consumers
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
 
    # Wait for all producers to finish
    await asyncio.gather(*producers)
 
    # Wait for the queue to be fully processed
    await queue.join()
 
    # Cancel consumers (they run forever)
    for consumer_task in consumers:
        consumer_task.cancel()
 
asyncio.run(main())

Ejemplo del mundo real: Web scraper con cola de URLs:

import asyncio
from typing import Set
 
async def fetch_url(session, url):
    """Simulate fetching a URL"""
    await asyncio.sleep(1)
    return f"Content from {url}"
 
async def producer(queue: asyncio.Queue, start_urls: list):
    """Add URLs to the queue"""
    for url in start_urls:
        await queue.put(url)
 
async def consumer(queue: asyncio.Queue, visited: Set[str]):
    """Fetch URLs from the queue"""
    while True:
        url = await queue.get()
 
        if url in visited:
            queue.task_done()
            continue
 
        visited.add(url)
        print(f"Scraping: {url}")
 
        # Simulate fetching
        content = await fetch_url(None, url)
 
        # Simulate finding new URLs in the content
        # In real scraper, you'd parse HTML and extract links
        new_urls = []  # Would extract from content
 
        for new_url in new_urls:
            if new_url not in visited:
                await queue.put(new_url)
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue()
    visited = set()
 
    start_urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    # Add initial URLs
    await producer(queue, start_urls)
 
    # Create consumers
    consumers = [
        asyncio.create_task(consumer(queue, visited))
        for _ in range(3)
    ]
 
    # Wait for all URLs to be processed
    await queue.join()
 
    # Cancel consumers
    for consumer_task in consumers:
        consumer_task.cancel()
 
    print(f"Scraped {len(visited)} unique URLs")
 
asyncio.run(main())

Semáforos para Limitación de Tasa

asyncio.Semaphore controla el número de corrutinas que pueden acceder a un recurso concurrentemente. Esto es esencial para limitar la tasa de llamadas a API o limitar conexiones concurrentes a bases de datos.

import asyncio
import time
 
async def call_api(semaphore, api_id):
    """Make an API call with rate limiting"""
    async with semaphore:
        print(f"API call {api_id} started")
        await asyncio.sleep(1)  # Simulate API call
        print(f"API call {api_id} completed")
        return f"Result {api_id}"
 
async def main():
    # Allow only 3 concurrent API calls
    semaphore = asyncio.Semaphore(3)
 
    start = time.time()
 
    # Create 10 API calls
    tasks = [call_api(semaphore, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
 
    elapsed = time.time() - start
    print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
    # With semaphore(3): ~4 seconds (10 calls in batches of 3)
    # Without semaphore: ~1 second (all concurrent)
 
asyncio.run(main())

Limitación de tasa para cumplimiento de cuota de API:

import asyncio
from datetime import datetime
 
class RateLimiter:
    """Rate limiter that allows N requests per time period"""
 
    def __init__(self, max_requests, time_period):
        self.max_requests = max_requests
        self.time_period = time_period
        self.semaphore = asyncio.Semaphore(max_requests)
        self.request_times = []
 
    async def __aenter__(self):
        await self.semaphore.acquire()
 
        # Wait if we've hit the rate limit
        while len(self.request_times) >= self.max_requests:
            oldest = self.request_times[0]
            elapsed = datetime.now().timestamp() - oldest
 
            if elapsed < self.time_period:
                sleep_time = self.time_period - elapsed
                await asyncio.sleep(sleep_time)
 
            self.request_times.pop(0)
 
        self.request_times.append(datetime.now().timestamp())
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.semaphore.release()
 
async def fetch_with_rate_limit(rate_limiter, item_id):
    async with rate_limiter:
        print(f"Fetching item {item_id} at {datetime.now()}")
        await asyncio.sleep(0.5)
        return f"Item {item_id}"
 
async def main():
    # Allow 5 requests per 2 seconds
    rate_limiter = RateLimiter(max_requests=5, time_period=2)
 
    # Make 15 requests
    tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
    results = await asyncio.gather(*tasks)
 
    print(f"Completed {len(results)} requests")
 
asyncio.run(main())

aiohttp para Solicitudes HTTP Asíncronas

La librería aiohttp proporciona funcionalidad cliente y servidor HTTP async. Es la opción estándar para hacer solicitudes HTTP en código async.

import asyncio
import aiohttp
 
async def fetch_url(session, url):
    """Fetch a single URL"""
    async with session.get(url) as response:
        return await response.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch_url(session, "https://api.github.com")
        print(f"Fetched {len(html)} characters")
 
asyncio.run(main())

Obteniendo múltiples URLs concurrentemente:

import asyncio
import aiohttp
import time
 
async def fetch_url(session, url):
    """Fetch URL and return status code and content length"""
    async with session.get(url) as response:
        content = await response.text()
        return {
            "url": url,
            "status": response.status,
            "length": len(content)
        }
 
async def fetch_all_urls(urls):
    """Fetch multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
 
async def main():
    urls = [
        "https://api.github.com",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404"
    ]
 
    start = time.time()
    results = await fetch_all_urls(urls)
    elapsed = time.time() - start
 
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
 
    print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
 
asyncio.run(main())

Ejemplo práctico con manejo de errores y reintentos:

import asyncio
import aiohttp
from typing import Optional
 
async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Optional[dict]:
    """Fetch URL with retry logic"""
 
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                if response.status == 200:
                    data = await response.json()
                    return data
                elif response.status == 404:
                    print(f"URL not found: {url}")
                    return None
                else:
                    print(f"Unexpected status {response.status} for {url}")
 
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1} for {url}")
 
        except aiohttp.ClientError as e:
            print(f"Client error on attempt {attempt + 1} for {url}: {e}")
 
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
 
    print(f"Failed to fetch {url} after {max_retries} attempts")
    return None
 
async def main():
    urls = [
        "https://api.github.com/users/github",
        "https://api.github.com/users/nonexistent",
        "https://httpbin.org/delay/5"
    ]
 
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
 
        successful = [r for r in results if r is not None]
        print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
 
asyncio.run(main())

aiofiles para E/S de Archivos Asíncrona

La librería aiofiles proporciona operaciones de archivos async, previniendo bloqueos durante lecturas y escrituras de archivos.

import asyncio
import aiofiles
 
async def read_file(filename):
    """Read file asynchronously"""
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
        return contents
 
async def write_file(filename, content):
    """Write file asynchronously"""
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)
 
async def main():
    # Write data
    await write_file('test.txt', 'Hello, async file I/O!')
 
    # Read data
    content = await read_file('test.txt')
    print(f"Read: {content}")
 
asyncio.run(main())

Procesando múltiples archivos concurrentemente:

import asyncio
import aiofiles
 
async def process_file(input_file, output_file):
    """Read, process, and write a file"""
    async with aiofiles.open(input_file, mode='r') as f:
        content = await f.read()
 
    # Process content (convert to uppercase)
    processed = content.upper()
 
    async with aiofiles.open(output_file, mode='w') as f:
        await f.write(processed)
 
    return f"Processed {input_file} -> {output_file}"
 
async def main():
    files = [
        ('input1.txt', 'output1.txt'),
        ('input2.txt', 'output2.txt'),
        ('input3.txt', 'output3.txt')
    ]
 
    tasks = [process_file(inp, out) for inp, out in files]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        print(result)
 
asyncio.run(main())

Leyendo archivos grandes línea por línea:

import asyncio
import aiofiles
 
async def process_large_file(filename):
    """Process a large file line by line without loading it all into memory"""
    line_count = 0
 
    async with aiofiles.open(filename, mode='r') as f:
        async for line in f:
            # Process each line
            line_count += 1
            if line.strip():
                # Do something with the line
                pass
 
    return line_count
 
async def main():
    count = await process_large_file('large_data.txt')
    print(f"Processed {count} lines")
 
asyncio.run(main())

Manejo de Errores en Código Async

El manejo de errores en código async requiere atención especial para asegurar que las excepciones sean capturadas adecuadamente y los recursos se limpien.

import asyncio
 
async def risky_operation(item_id):
    """An operation that might fail"""
    await asyncio.sleep(1)
 
    if item_id % 2 == 0:
        raise ValueError(f"Item {item_id} caused an error")
 
    return f"Item {item_id} processed"
 
async def handle_with_try_except():
    """Handle errors with try/except"""
    try:
        result = await risky_operation(2)
        print(result)
    except ValueError as e:
        print(f"Error caught: {e}")
 
asyncio.run(handle_with_try_except())

Manejando errores en tareas concurrentes:

async def safe_operation(item_id):
    """Wrapper that catches errors from risky_operation"""
    try:
        result = await risky_operation(item_id)
        return {"success": True, "result": result}
    except Exception as e:
        return {"success": False, "error": str(e), "item_id": item_id}
 
async def main():
    tasks = [safe_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        if result["success"]:
            print(f"Success: {result['result']}")
        else:
            print(f"Failed: Item {result['item_id']} - {result['error']}")
 
asyncio.run(main())

Usando gather() con return_exceptions=True:

async def main():
    tasks = [risky_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

Limpieza con gestores de contexto async:

class AsyncResource:
    """A resource that needs cleanup even if errors occur"""
 
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(0.5)
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Cleaning up resource")
        await asyncio.sleep(0.5)
 
        if exc_type is not None:
            print(f"Exception during resource use: {exc_val}")
 
        # Return False to propagate exception, True to suppress
        return False
 
    async def do_work(self):
        await asyncio.sleep(1)
        raise ValueError("Something went wrong")
 
async def main():
    try:
        async with AsyncResource() as resource:
            await resource.do_work()
    except ValueError as e:
        print(f"Caught error: {e}")
 
asyncio.run(main())

Benchmarks de Rendimiento: Sync vs Async para Tareas Limitadas por E/S

Comparemos enfoques síncronos y asíncronos para operaciones limitadas por E/S con benchmarks reales.

import asyncio
import time
import requests
import aiohttp
 
# Synchronous approach
def fetch_sync(url):
    response = requests.get(url)
    return len(response.text)
 
def benchmark_sync(urls):
    start = time.time()
    results = [fetch_sync(url) for url in urls]
    elapsed = time.time() - start
    return elapsed, results
 
# Asynchronous approach
async def fetch_async(session, url):
    async with session.get(url) as response:
        text = await response.text()
        return len(text)
 
async def benchmark_async(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    return elapsed, results
 
# Run benchmarks
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1"
]
 
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
 
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
 
print(f"\nSpeedup: {sync_time / async_time:.2f}x")

Resultados típicos para 5 URLs con retraso de 1 segundo cada una:

  • Síncrono: ~5 segundos (ejecución secuencial)
  • Asíncrono: ~1 segundo (ejecución concurrente)
  • Aceleración: ~5x
Número de URLsTiempo SyncTiempo AsyncAceleración
55.2s1.1s4.7x
1010.4s1.2s8.7x
2020.8s1.4s14.9x
5052.1s2.1s24.8x
100104.5s3.8s27.5x

La aceleración aumenta con el número de operaciones concurrentes hasta que alcanzas restricciones de ancho de banda o limitación de tasa.

Errores Comunes y Cómo Evitarlos

Olvidar await

# WRONG: Coroutine is created but not executed
async def wrong():
    result = fetch_data()  # Missing await!
    print(result)  # Prints coroutine object, not the result
 
# CORRECT: Await the coroutine
async def correct():
    result = await fetch_data()
    print(result)  # Prints actual result

Bloquear el bucle de eventos

# WRONG: Blocks the entire event loop
async def blocking_code():
    time.sleep(5)  # Blocks everything!
    result = compute_something()  # Blocks if CPU-intensive
    return result
 
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
    await asyncio.sleep(5)  # Only pauses this task
 
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute_something)
    return result

No manejar cancelación de tareas

# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
    resource = await acquire_resource()
    await long_operation()
    await release_resource(resource)  # May not execute if cancelled
 
# CORRECT: Use try/finally or async with
async def proper_cleanup():
    resource = await acquire_resource()
    try:
        await long_operation()
    finally:
        await release_resource(resource)  # Always executes

Crear conflictos de bucle de eventos

# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
    result = asyncio.run(some_coroutine())  # Error!
    return result
 
# CORRECT: Just await the coroutine
async def correct_nesting():
    result = await some_coroutine()
    return result

No limitar concurrencia

# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
    tasks = [process_item(item) for item in items]  # 10,000 tasks!
    return await asyncio.gather(*tasks)
 
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
 
    async def process_with_limit(item):
        async with semaphore:
            return await process_item(item)
 
    tasks = [process_with_limit(item) for item in items]
    return await asyncio.gather(*tasks)

Ejemplos del Mundo Real

Web Scraping con Limitación de Tasa

import asyncio
import aiohttp
from bs4 import BeautifulSoup
 
class AsyncWebScraper:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
 
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
 
    async def fetch_page(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
 
    async def scrape_urls(self, urls):
        tasks = [self.fetch_page(url) for url in urls]
        return await asyncio.gather(*tasks)
 
async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    async with AsyncWebScraper(max_concurrent=3) as scraper:
        pages = await scraper.scrape_urls(urls)
 
        for url, html in zip(urls, pages):
            if html:
                print(f"Scraped {url}: {len(html)} bytes")
 
asyncio.run(main())

Pipeline de Datos de API Asíncrona

import asyncio
import aiohttp
 
async def fetch_user_ids(session):
    """Fetch list of user IDs from API"""
    async with session.get("https://api.example.com/users") as response:
        data = await response.json()
        return [user["id"] for user in data["users"]]
 
async def fetch_user_details(session, user_id):
    """Fetch detailed info for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}") as response:
        return await response.json()
 
async def fetch_user_posts(session, user_id):
    """Fetch posts for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
        return await response.json()
 
async def process_user(session, user_id):
    """Fetch all data for a user concurrently"""
    details, posts = await asyncio.gather(
        fetch_user_details(session, user_id),
        fetch_user_posts(session, user_id)
    )
 
    return {
        "user": details,
        "posts": posts,
        "post_count": len(posts)
    }
 
async def main():
    async with aiohttp.ClientSession() as session:
        # Fetch user IDs
        user_ids = await fetch_user_ids(session)
 
        # Process all users concurrently
        user_data = await asyncio.gather(
            *[process_user(session, uid) for uid in user_ids[:10]]
        )
 
        for data in user_data:
            print(f"User {data['user']['name']}: {data['post_count']} posts")
 
asyncio.run(main())

Servidor de Chat Asíncrono

import asyncio
 
class ChatServer:
    def __init__(self):
        self.clients = set()
 
    async def handle_client(self, reader, writer):
        """Handle a single client connection"""
        addr = writer.get_extra_info('peername')
        print(f"Client connected: {addr}")
 
        self.clients.add(writer)
 
        try:
            while True:
                data = await reader.read(100)
                if not data:
                    break
 
                message = data.decode()
                print(f"Received from {addr}: {message}")
 
                # Broadcast to all clients
                await self.broadcast(f"{addr}: {message}", writer)
 
        except asyncio.CancelledError:
            pass
 
        finally:
            print(f"Client disconnected: {addr}")
            self.clients.remove(writer)
            writer.close()
            await writer.wait_closed()
 
    async def broadcast(self, message, sender):
        """Send message to all clients except sender"""
        for client in self.clients:
            if client != sender:
                try:
                    client.write(message.encode())
                    await client.drain()
                except Exception as e:
                    print(f"Error broadcasting: {e}")
 
    async def start(self, host='127.0.0.1', port=8888):
        """Start the chat server"""
        server = await asyncio.start_server(
            self.handle_client,
            host,
            port
        )
 
        addr = server.sockets[0].getsockname()
        print(f"Chat server running on {addr}")
 
        async with server:
            await server.serve_forever()
 
async def main():
    chat_server = ChatServer()
    await chat_server.start()
 
asyncio.run(main())

Experimentando con Asyncio en Jupyter

Cuando trabajas con asyncio en Jupyter notebooks, puedes encontrar conflictos de bucle de eventos. Jupyter ya ejecuta un bucle de eventos, lo cual puede interferir con asyncio.run().

Para experimentación async fluida en entornos Jupyter, considera usar RunCell (opens in a new tab), un agente de IA diseñado específicamente para Jupyter notebooks. RunCell maneja la gestión del bucle de eventos automáticamente y proporciona capacidades mejoradas de depuración async, permitiéndote probar patrones asyncio interactivamente sin conflictos.

En Jupyter estándar, puedes usar await de nivel superior:

# In Jupyter, this works directly
async def fetch_data():
    await asyncio.sleep(1)
    return "data"
 
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)

O usa el paquete nest_asyncio para permitir bucles de eventos anidados:

import nest_asyncio
nest_asyncio.apply()
 
# Now asyncio.run() works in Jupyter
asyncio.run(main())

FAQ

¿Cuál es la diferencia entre asyncio y threading en Python?

Asyncio usa multitarea cooperativa en un solo hilo, donde las tareas ceden control voluntariamente usando await. Threading usa multitarea preventiva con múltiples hilos del sistema operativo, donde el OS decide cuándo cambiar entre hilos. Asyncio es más eficiente para tareas limitadas por E/S con menor sobrecarga de memoria y sin riesgos de condiciones de carrera, mientras que threading puede manejar librerías bloqueantes que no soportan async. Ambos están limitados por el GIL de Python para trabajo limitado por CPU, pero asyncio evita la sobrecarga del cambio de contexto de hilos.

¿Cuándo debería usar asyncio en lugar de multiprocessing?

Usa asyncio para tareas limitadas por E/S como llamadas a APIs, consultas a bases de datos, operaciones de archivos y comunicación de red. Usa multiprocessing para tareas limitadas por CPU como procesamiento de datos, cálculos matemáticos, manipulación de imágenes y entrenamiento de modelos de machine learning. Asyncio sobresale manejando miles de operaciones concurrentes de E/S con mínima sobrecarga de recursos, mientras que multiprocessing proporciona computación paralela real en múltiples núcleos de CPU.

¿Puedo mezclar código async y sync en la misma aplicación?

Sí, pero con planificación cuidadosa. Puedes llamar funciones async desde código sync usando asyncio.run(), aunque no desde dentro de un bucle de eventos ya en ejecución. Para llamar funciones sync bloqueantes desde código async, usa loop.run_in_executor() para ejecutarlas en un pool de hilos, previniendo que bloqueen el bucle de eventos. Nunca uses operaciones bloqueantes como time.sleep(), requests.get() o E/S de archivos síncrona directamente en funciones async, ya que bloquean todo el bucle de eventos.

¿Cómo depuro código asyncio efectivamente?

Habilita el modo debug de asyncio con asyncio.run(main(), debug=True) o establece la variable de entorno PYTHONASYNCIODEBUG=1. Esto detecta errores comunes como olvidar await, callbacks que toman demasiado tiempo y corrutinas que nunca fueron esperadas. Usa logging extensivamente para trazar el flujo de ejecución, ya que los debuggers tradicionales pueden ser confusos con código async. Añade nombres de tareas con asyncio.create_task(coro(), name="task-name") para mensajes de error más claros. Usa asyncio.gather(..., return_exceptions=True) para prevenir que una tarea fallida oculte errores en otras. Monitorea tu bucle de eventos con asyncio.all_tasks() para verificar tareas que no se completan.

¿Cuáles son los límites de rendimiento de asyncio?

Asyncio puede manejar decenas de miles de operaciones concurrentes de E/S en un solo hilo, superando ampliamente los límites de threading. La restricción principal es que asyncio no proporciona beneficio para operaciones limitadas por CPU ya que usa un solo hilo. El rendimiento se degrada si bloqueas el bucle de eventos con E/S síncrona o computación pesada. El ancho de banda de red y los límites de tasa de API se convierten en el cuello de botella antes que los límites de concurrencia de asyncio. El uso de memoria escala con el número de tareas concurrentes, pero cada tarea tiene sobrecarga mínima comparada con hilos. Para máximo rendimiento, combina asyncio para concurrencia de E/S con multiprocessing para trabajo limitado por CPU, y siempre usa semáforos para limitar operaciones concurrentes a niveles razonables.

Conclusión

Python asyncio transforma aplicaciones limitadas por E/S de operaciones lentas y bloqueantes en sistemas rápidos y concurrentes. Al dominar la sintaxis async/await, entender el bucle de eventos y aprovechar herramientas como gather(), create_task() y semáforos, puedes construir aplicaciones que manejen miles de operaciones concurrentes eficientemente.

La clave del éxito con asyncio es reconocer cuándo es la herramienta correcta. Úsalo para solicitudes de red, consultas a bases de datos, operaciones de archivos y cualquier tarea que pase tiempo esperando recursos externos. Evita bloquear el bucle de eventos con operaciones síncronas, siempre usa await con corrutinas y limita la concurrencia con semáforos cuando sea necesario.

Comienza convirtiendo pequeñas secciones de tu codebase a async, mide la mejora de rendimiento y expande gradualmente. Las mejoras dramáticas de velocidad en aplicaciones con mucha E/S hacen que la inversión de aprendizaje valga la pena.

📚