Skip to content

Python asyncio: Complete Guide to Asynchronous Programming

Updated on

Your Python application makes 100 API calls, each taking 2 seconds. With traditional sequential code, that's 200 seconds of waiting. Your users stare at loading screens. Your servers sit idle, burning resources while waiting for responses. This blocking behavior is the problem that destroys application performance and user experience.

The pain intensifies when you scale. Database queries pile up. File operations queue behind each other. Web scrapers crawl at a snail's pace. Every I/O operation becomes a bottleneck that cascades through your entire system, turning what should be a fast, responsive application into a sluggish, resource-wasting monster.

Python asyncio solves this by enabling concurrent execution of I/O-bound tasks. Instead of waiting for each operation to complete before starting the next, asyncio lets your code initiate multiple operations and switch between them while waiting. Those 100 API calls? With asyncio, they complete in roughly 2 seconds instead of 200. This guide shows you exactly how to implement asynchronous programming in Python, with practical examples that transform slow, blocking code into fast, concurrent applications.

📚

What is Asynchronous Programming and Why It Matters

Asynchronous programming allows a program to initiate potentially long-running tasks and move on to other work before those tasks complete, rather than waiting for each task to finish before starting the next one.

In traditional synchronous code, when you make an API request, your program stops and waits for the response. During this waiting period, your CPU sits idle, doing nothing productive. This is acceptable for single operations, but catastrophic for applications that need to handle multiple I/O operations.

Asyncio provides a way to write concurrent code using the async/await syntax. It's particularly effective for I/O-bound operations like:

  • Making HTTP requests to APIs
  • Reading and writing files
  • Database queries
  • Network communication
  • WebSocket connections
  • Message queue processing

The performance improvement is dramatic. Consider fetching data from 50 different URLs:

Synchronous approach: 50 requests × 2 seconds each = 100 seconds total Asynchronous approach: All 50 requests run concurrently ≈ 2 seconds total

This 50x performance improvement comes from better resource utilization. Instead of blocking on I/O operations, asyncio allows your program to continue executing other tasks while waiting for I/O to complete.

Concurrency vs Parallelism vs Async

Understanding the distinction between these concepts is essential for using asyncio effectively.

Concurrency means managing multiple tasks at once. The tasks take turns progressing, but only one executes at any given moment. Think of a chef preparing multiple dishes, switching between tasks as each waits for something to cook.

Parallelism means executing multiple tasks simultaneously on different CPU cores. This requires actual parallel processing hardware and is ideal for CPU-bound tasks like mathematical computations or image processing.

Asynchronous programming is a specific form of concurrency designed for I/O-bound tasks. It uses a single thread and switches between tasks when they're waiting for I/O operations.

FeatureasyncioThreadingMultiprocessing
Execution modelSingle thread, cooperative multitaskingMultiple threads, preemptive multitaskingMultiple processes
Best forI/O-bound tasksI/O-bound tasks with blocking librariesCPU-bound tasks
Memory overheadMinimalModerateHigh
Context switching costVery lowLow to moderateHigh
ComplexityModerate (async/await syntax)High (race conditions, locks)High (IPC, serialization)
GIL limitationNot affected (single thread)Limited by GILNot limited (separate processes)
Typical speedup for I/O10-100x5-10xN/A

The Python Global Interpreter Lock (GIL) prevents true parallel execution of Python bytecode in threads, making threading less effective for CPU-bound tasks. Asyncio sidesteps this limitation by using a single thread with cooperative multitasking, while multiprocessing bypasses it entirely with separate processes.

async def and await Keywords

The foundation of asyncio is built on two keywords: async and await.

The async def keyword defines a coroutine function. When you call a coroutine function, it doesn't execute immediately. Instead, it returns a coroutine object that can be awaited.

import asyncio
 
async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(2)  # Simulate I/O operation
    print("Data fetched!")
    return {"status": "success"}
 
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine))  # <class 'coroutine'>
 
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine)  # This would execute the coroutine

The await keyword pauses the execution of a coroutine until the awaited operation completes. During this pause, the event loop can run other coroutines. You can only use await inside an async def function.

async def process_user(user_id):
    # Await an I/O operation
    user_data = await fetch_user_from_database(user_id)
 
    # Await another I/O operation
    user_profile = await fetch_user_profile(user_id)
 
    # Regular synchronous code runs normally
    processed_data = transform_data(user_data, user_profile)
 
    return processed_data

Key rules for async/await:

  1. You can only await coroutines, tasks, or futures
  2. You can only use await inside an async def function
  3. Regular functions cannot use await
  4. Calling an async function without awaiting it creates a coroutine object but doesn't execute the code

Common mistake:

async def wrong_example():
    # This creates a coroutine but doesn't execute it!
    fetch_data()  # Missing await
 
async def correct_example():
    # This actually executes the coroutine
    result = await fetch_data()

asyncio.run() Entry Point

The asyncio.run() function is the standard way to start the asyncio event loop and run your main coroutine. It was introduced in Python 3.7 and simplifies running async code from synchronous contexts.

import asyncio
 
async def main():
    print("Starting async operations")
    await asyncio.sleep(1)
    print("Finished")
 
# Run the main coroutine
asyncio.run(main())

What asyncio.run() does behind the scenes:

  1. Creates a new event loop
  2. Runs the provided coroutine to completion
  3. Closes the event loop
  4. Returns the result of the coroutine
import asyncio
 
async def main():
    result = await compute_value()
    return result
 
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)

Important characteristics of asyncio.run():

  • Cannot be called from within a running event loop: If you're already in an async function, use await instead
  • Creates a fresh event loop each time: Don't call asyncio.run() multiple times in the same program unless you want separate event loop instances
  • Always closes the loop: The event loop is cleaned up after execution

For Jupyter notebooks or environments where an event loop is already running, use await directly or asyncio.create_task(). Tools like RunCell (opens in a new tab) provide enhanced async support in Jupyter environments, making it easier to experiment with asyncio patterns interactively without event loop conflicts.

Before Python 3.7, you had to manually manage the event loop:

# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
 
# Modern way (Python 3.7+)
asyncio.run(main())

Coroutines, Tasks, and Futures

Understanding these three core concepts is essential for mastering asyncio.

Coroutines

A coroutine is a function defined with async def. It's a special function that can be paused and resumed, allowing other code to run during the pause. Coroutines evolved from Python generators, which introduced the concept of pausable functions with yield.

import asyncio
 
async def my_coroutine():
    await asyncio.sleep(1)
    return "completed"
 
# This creates a coroutine object
coro = my_coroutine()
 
# To run it
result = asyncio.run(coro)

Tasks

A task is a wrapper around a coroutine that schedules it for execution on the event loop. Tasks allow coroutines to run concurrently.

import asyncio
 
async def say_after(delay, message):
    await asyncio.sleep(delay)
    print(message)
    return message
 
async def main():
    # Create tasks to run concurrently
    task1 = asyncio.create_task(say_after(1, "First"))
    task2 = asyncio.create_task(say_after(2, "Second"))
 
    # Wait for both tasks to complete
    result1 = await task1
    result2 = await task2
 
    print(f"Results: {result1}, {result2}")
 
asyncio.run(main())

Creating a task immediately schedules the coroutine for execution. The event loop starts running it as soon as possible, even before you await the task.

async def background_work():
    print("Starting background work")
    await asyncio.sleep(2)
    print("Background work completed")
 
async def main():
    # The coroutine starts running immediately
    task = asyncio.create_task(background_work())
 
    print("Task created, doing other work")
    await asyncio.sleep(1)
    print("Other work done")
 
    # Wait for the background task to finish
    await task
 
asyncio.run(main())

Output:

Task created, doing other work
Starting background work
Other work done
Background work completed

Futures

A future is a low-level awaitable object that represents an eventual result of an asynchronous operation. You rarely create futures directly; they're typically created by asyncio internals or libraries.

async def set_future_result(future):
    await asyncio.sleep(1)
    future.set_result("Future completed")
 
async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
 
    # Schedule a coroutine to set the future's result
    asyncio.create_task(set_future_result(future))
 
    # Wait for the future to get a result
    result = await future
    print(result)
 
asyncio.run(main())

Relationship between coroutines, tasks, and futures:

  • Coroutines are the functions you write
  • Tasks wrap coroutines and schedule them for execution
  • Futures represent results that will be available in the future
  • Tasks are a subclass of Future

asyncio.create_task() for Concurrent Execution

The asyncio.create_task() function is your primary tool for achieving true concurrency with asyncio. It schedules a coroutine to run on the event loop without blocking the current coroutine.

import asyncio
import time
 
async def download_file(file_id):
    print(f"Starting download {file_id}")
    await asyncio.sleep(2)  # Simulate download time
    print(f"Completed download {file_id}")
    return f"file_{file_id}.dat"
 
async def sequential_downloads():
    """Downloads files one at a time"""
    start = time.time()
 
    file1 = await download_file(1)
    file2 = await download_file(2)
    file3 = await download_file(3)
 
    elapsed = time.time() - start
    print(f"Sequential: {elapsed:.2f} seconds")
    # Output: ~6 seconds (2 + 2 + 2)
 
async def concurrent_downloads():
    """Downloads files concurrently"""
    start = time.time()
 
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(download_file(1))
    task2 = asyncio.create_task(download_file(2))
    task3 = asyncio.create_task(download_file(3))
 
    # Wait for all tasks to complete
    file1 = await task1
    file2 = await task2
    file3 = await task3
 
    elapsed = time.time() - start
    print(f"Concurrent: {elapsed:.2f} seconds")
    # Output: ~2 seconds (all run at the same time)
 
asyncio.run(concurrent_downloads())

The task is scheduled immediately when create_task() is called. You don't have to await it right away.

async def process_data():
    # Start background tasks
    task1 = asyncio.create_task(fetch_from_api_1())
    task2 = asyncio.create_task(fetch_from_api_2())
 
    # Do some work while tasks run in background
    local_data = prepare_local_data()
 
    # Now wait for the background tasks
    api_data_1 = await task1
    api_data_2 = await task2
 
    # Combine all data
    return combine_data(local_data, api_data_1, api_data_2)

You can also name tasks for better debugging:

async def main():
    task = asyncio.create_task(
        long_running_operation(),
        name="long-operation-task"
    )
 
    # Task name is accessible
    print(f"Task name: {task.get_name()}")
 
    await task

asyncio.gather() for Running Multiple Coroutines

The asyncio.gather() function runs multiple coroutines concurrently and waits for all of them to complete. It's cleaner than creating individual tasks when you need to run many coroutines.

import asyncio
 
async def fetch_user(user_id):
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}
 
async def main():
    # Run multiple coroutines concurrently
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
        fetch_user(4),
        fetch_user(5)
    )
 
    print(results)
    # Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
 
asyncio.run(main())

gather() returns results in the same order as the input coroutines, regardless of which completes first.

async def task_with_delay(delay, value):
    await asyncio.sleep(delay)
    return value
 
async def main():
    results = await asyncio.gather(
        task_with_delay(3, "slow"),
        task_with_delay(1, "fast"),
        task_with_delay(2, "medium")
    )
 
    print(results)
    # Output: ['slow', 'fast', 'medium'] (order preserved)
 
asyncio.run(main())

Error Handling with gather()

By default, if any coroutine raises an exception, gather() immediately raises that exception and cancels remaining tasks.

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("Task failed!")
 
async def successful_task():
    await asyncio.sleep(2)
    return "success"
 
async def main():
    try:
        results = await asyncio.gather(
            failing_task(),
            successful_task()
        )
    except ValueError as e:
        print(f"Error caught: {e}")
        # The successful_task is cancelled
 
asyncio.run(main())

Use return_exceptions=True to collect exceptions along with successful results:

async def main():
    results = await asyncio.gather(
        failing_task(),
        successful_task(),
        return_exceptions=True
    )
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

Dynamic gather with list comprehension

gather() works beautifully with dynamically generated coroutines:

async def process_item(item):
    await asyncio.sleep(1)
    return item * 2
 
async def main():
    items = [1, 2, 3, 4, 5]
 
    # Process all items concurrently
    results = await asyncio.gather(
        *[process_item(item) for item in items]
    )
 
    print(results)  # [2, 4, 6, 8, 10]
 
asyncio.run(main())

asyncio.wait() and asyncio.as_completed()

While gather() waits for all coroutines to complete, wait() and as_completed() provide more granular control over how you handle completed tasks.

asyncio.wait()

wait() allows you to wait for tasks with different completion conditions: all tasks, first task, or first exception.

import asyncio
 
async def task(delay, name):
    await asyncio.sleep(delay)
    return f"{name} completed"
 
async def main():
    tasks = [
        asyncio.create_task(task(1, "Task 1")),
        asyncio.create_task(task(2, "Task 2")),
        asyncio.create_task(task(3, "Task 3"))
    ]
 
    # Wait for all tasks to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )
 
    for task in done:
        print(task.result())
 
asyncio.run(main())

Different completion conditions:

async def main():
    tasks = [
        asyncio.create_task(task(1, "Fast")),
        asyncio.create_task(task(3, "Slow")),
        asyncio.create_task(task(2, "Medium"))
    ]
 
    # Return when the first task completes
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
 
    print(f"First completed: {done.pop().result()}")
    print(f"Still pending: {len(pending)} tasks")
 
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
 
asyncio.run(main())

asyncio.as_completed()

as_completed() returns an iterator that yields tasks as they complete, allowing you to process results as soon as they're available.

import asyncio
 
async def fetch_data(url_id, delay):
    await asyncio.sleep(delay)
    return f"Data from URL {url_id}"
 
async def main():
    tasks = [
        fetch_data(1, 3),
        fetch_data(2, 1),
        fetch_data(3, 2)
    ]
 
    # Process results as they complete
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Received: {result}")
 
asyncio.run(main())

Output shows results in completion order, not submission order:

Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1

This is particularly useful when you want to display results to users as quickly as possible:

async def search_engine(query, engine_name, delay):
    await asyncio.sleep(delay)
    return f"{engine_name}: Results for '{query}'"
 
async def main():
    query = "python asyncio"
 
    searches = [
        search_engine(query, "Google", 1.5),
        search_engine(query, "Bing", 2.0),
        search_engine(query, "DuckDuckGo", 1.0)
    ]
 
    print("Searching...")
    for search in asyncio.as_completed(searches):
        result = await search
        print(result)  # Display each result as soon as it arrives
 
asyncio.run(main())

asyncio.sleep() vs time.sleep()

This distinction is critical for async code correctness.

time.sleep() is a blocking operation that pauses the entire thread, including the event loop. This prevents all async tasks from running.

asyncio.sleep() is a non-blocking coroutine that only pauses the current task, allowing other tasks to run.

import asyncio
import time
 
async def blocking_example():
    """Bad: Uses time.sleep() - blocks the entire event loop"""
    print("Starting blocking sleep")
    time.sleep(2)  # WRONG: Blocks everything!
    print("Finished blocking sleep")
 
async def non_blocking_example():
    """Good: Uses asyncio.sleep() - allows other tasks to run"""
    print("Starting non-blocking sleep")
    await asyncio.sleep(2)  # CORRECT: Only pauses this task
    print("Finished non-blocking sleep")
 
async def concurrent_task():
    for i in range(3):
        print(f"Concurrent task running: {i}")
        await asyncio.sleep(0.5)
 
async def demo_blocking():
    print("\n=== Blocking example (BAD) ===")
    await asyncio.gather(
        blocking_example(),
        concurrent_task()
    )
 
async def demo_non_blocking():
    print("\n=== Non-blocking example (GOOD) ===")
    await asyncio.gather(
        non_blocking_example(),
        concurrent_task()
    )
 
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())

In the blocking example, the concurrent task doesn't run until time.sleep() completes. In the non-blocking example, both tasks run concurrently.

Rule of thumb: Never use time.sleep() in async code. Always use await asyncio.sleep().

For CPU-bound operations that you can't avoid, use loop.run_in_executor() to run them in a separate thread or process. You can also use subprocess with asyncio.create_subprocess_exec() for running external commands asynchronously:

import asyncio
import time
 
def cpu_intensive_task():
    """Some blocking CPU work"""
    time.sleep(2)  # Simulate heavy computation
    return "CPU task completed"
 
async def main():
    loop = asyncio.get_event_loop()
 
    # Run blocking task in a thread pool
    result = await loop.run_in_executor(None, cpu_intensive_task)
    print(result)
 
asyncio.run(main())

async for and async with

Python provides async versions of for loops and context managers for working with asynchronous iterables and resources.

Async Iterators (async for)

An async iterator is an object that implements __aiter__() and __anext__() methods, allowing you to iterate over items that require async operations to fetch.

import asyncio
 
class AsyncRange:
    """An async iterator that yields numbers with delays"""
    def __init__(self, start, end):
        self.current = start
        self.end = end
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
 
        # Simulate async operation to get next value
        await asyncio.sleep(0.5)
        value = self.current
        self.current += 1
        return value
 
async def main():
    async for number in AsyncRange(1, 5):
        print(f"Got number: {number}")
 
asyncio.run(main())

Real-world example with async database cursor:

class AsyncDatabaseCursor:
    """Simulated async database cursor"""
    def __init__(self, query):
        self.query = query
        self.results = []
        self.index = 0
 
    async def execute(self):
        # Simulate database query
        await asyncio.sleep(1)
        self.results = [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"},
            {"id": 3, "name": "Charlie"}
        ]
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.results):
            raise StopAsyncIteration
 
        # Simulate fetching next row
        await asyncio.sleep(0.1)
        row = self.results[self.index]
        self.index += 1
        return row
 
async def fetch_users():
    cursor = AsyncDatabaseCursor("SELECT * FROM users")
    await cursor.execute()
 
    async for row in cursor:
        print(f"User: {row['name']}")
 
asyncio.run(fetch_users())

Async Context Managers (async with)

An async context manager implements __aenter__() and __aexit__() methods for managing resources that require async setup and cleanup.

import asyncio
 
class AsyncDatabaseConnection:
    """An async context manager for database connections"""
 
    async def __aenter__(self):
        print("Opening database connection...")
        await asyncio.sleep(1)  # Simulate connection time
        print("Database connection opened")
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)  # Simulate cleanup
        print("Database connection closed")
 
    async def query(self, sql):
        await asyncio.sleep(0.5)
        return f"Results for: {sql}"
 
async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(result)
    # Connection automatically closed after the with block
 
asyncio.run(main())

Combining async context managers with async iterators:

class AsyncFileReader:
    """Async context manager and iterator for reading files"""
 
    def __init__(self, filename):
        self.filename = filename
        self.lines = []
        self.index = 0
 
    async def __aenter__(self):
        # Simulate async file opening
        await asyncio.sleep(0.5)
        self.lines = ["Line 1", "Line 2", "Line 3"]
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.2)
        self.lines = []
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.lines):
            raise StopAsyncIteration
 
        await asyncio.sleep(0.1)
        line = self.lines[self.index]
        self.index += 1
        return line
 
async def read_file():
    async with AsyncFileReader("data.txt") as reader:
        async for line in reader:
            print(line)
 
asyncio.run(read_file())

asyncio.Queue for Producer-Consumer Patterns

asyncio.Queue is a thread-safe, async-aware queue that's perfect for coordinating work between producer and consumer coroutines.

import asyncio
import random
 
async def producer(queue, producer_id):
    """Produces items and puts them in the queue"""
    for i in range(5):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
 
async def consumer(queue, consumer_id):
    """Consumes items from the queue"""
    while True:
        item = await queue.get()
        print(f"Consumer {consumer_id} consuming: {item}")
 
        # Simulate processing time
        await asyncio.sleep(random.uniform(0.2, 0.8))
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue(maxsize=10)
 
    # Create producers and consumers
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
 
    # Wait for all producers to finish
    await asyncio.gather(*producers)
 
    # Wait for the queue to be fully processed
    await queue.join()
 
    # Cancel consumers (they run forever)
    for consumer_task in consumers:
        consumer_task.cancel()
 
asyncio.run(main())

Real-world example: Web scraper with URL queue:

import asyncio
from typing import Set
 
async def fetch_url(session, url):
    """Simulate fetching a URL"""
    await asyncio.sleep(1)
    return f"Content from {url}"
 
async def producer(queue: asyncio.Queue, start_urls: list):
    """Add URLs to the queue"""
    for url in start_urls:
        await queue.put(url)
 
async def consumer(queue: asyncio.Queue, visited: Set[str]):
    """Fetch URLs from the queue"""
    while True:
        url = await queue.get()
 
        if url in visited:
            queue.task_done()
            continue
 
        visited.add(url)
        print(f"Scraping: {url}")
 
        # Simulate fetching
        content = await fetch_url(None, url)
 
        # Simulate finding new URLs in the content
        # In real scraper, you'd parse HTML and extract links
        new_urls = []  # Would extract from content
 
        for new_url in new_urls:
            if new_url not in visited:
                await queue.put(new_url)
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue()
    visited = set()
 
    start_urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    # Add initial URLs
    await producer(queue, start_urls)
 
    # Create consumers
    consumers = [
        asyncio.create_task(consumer(queue, visited))
        for _ in range(3)
    ]
 
    # Wait for all URLs to be processed
    await queue.join()
 
    # Cancel consumers
    for consumer_task in consumers:
        consumer_task.cancel()
 
    print(f"Scraped {len(visited)} unique URLs")
 
asyncio.run(main())

Semaphores for Rate Limiting

asyncio.Semaphore controls the number of coroutines that can access a resource concurrently. This is essential for rate limiting API calls or limiting concurrent database connections.

import asyncio
import time
 
async def call_api(semaphore, api_id):
    """Make an API call with rate limiting"""
    async with semaphore:
        print(f"API call {api_id} started")
        await asyncio.sleep(1)  # Simulate API call
        print(f"API call {api_id} completed")
        return f"Result {api_id}"
 
async def main():
    # Allow only 3 concurrent API calls
    semaphore = asyncio.Semaphore(3)
 
    start = time.time()
 
    # Create 10 API calls
    tasks = [call_api(semaphore, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
 
    elapsed = time.time() - start
    print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
    # With semaphore(3): ~4 seconds (10 calls in batches of 3)
    # Without semaphore: ~1 second (all concurrent)
 
asyncio.run(main())

Rate limiting for API quota compliance:

import asyncio
from datetime import datetime
 
class RateLimiter:
    """Rate limiter that allows N requests per time period"""
 
    def __init__(self, max_requests, time_period):
        self.max_requests = max_requests
        self.time_period = time_period
        self.semaphore = asyncio.Semaphore(max_requests)
        self.request_times = []
 
    async def __aenter__(self):
        await self.semaphore.acquire()
 
        # Wait if we've hit the rate limit
        while len(self.request_times) >= self.max_requests:
            oldest = self.request_times[0]
            elapsed = datetime.now().timestamp() - oldest
 
            if elapsed < self.time_period:
                sleep_time = self.time_period - elapsed
                await asyncio.sleep(sleep_time)
 
            self.request_times.pop(0)
 
        self.request_times.append(datetime.now().timestamp())
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.semaphore.release()
 
async def fetch_with_rate_limit(rate_limiter, item_id):
    async with rate_limiter:
        print(f"Fetching item {item_id} at {datetime.now()}")
        await asyncio.sleep(0.5)
        return f"Item {item_id}"
 
async def main():
    # Allow 5 requests per 2 seconds
    rate_limiter = RateLimiter(max_requests=5, time_period=2)
 
    # Make 15 requests
    tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
    results = await asyncio.gather(*tasks)
 
    print(f"Completed {len(results)} requests")
 
asyncio.run(main())

aiohttp for Async HTTP Requests

The aiohttp library provides async HTTP client and server functionality. It's the standard choice for making HTTP requests in async code.

import asyncio
import aiohttp
 
async def fetch_url(session, url):
    """Fetch a single URL"""
    async with session.get(url) as response:
        return await response.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch_url(session, "https://api.github.com")
        print(f"Fetched {len(html)} characters")
 
asyncio.run(main())

Fetching multiple URLs concurrently:

import asyncio
import aiohttp
import time
 
async def fetch_url(session, url):
    """Fetch URL and return status code and content length"""
    async with session.get(url) as response:
        content = await response.text()
        return {
            "url": url,
            "status": response.status,
            "length": len(content)
        }
 
async def fetch_all_urls(urls):
    """Fetch multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
 
async def main():
    urls = [
        "https://api.github.com",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404"
    ]
 
    start = time.time()
    results = await fetch_all_urls(urls)
    elapsed = time.time() - start
 
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
 
    print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
 
asyncio.run(main())

Practical example with error handling and retries:

import asyncio
import aiohttp
from typing import Optional
 
async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Optional[dict]:
    """Fetch URL with retry logic"""
 
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                if response.status == 200:
                    data = await response.json()
                    return data
                elif response.status == 404:
                    print(f"URL not found: {url}")
                    return None
                else:
                    print(f"Unexpected status {response.status} for {url}")
 
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1} for {url}")
 
        except aiohttp.ClientError as e:
            print(f"Client error on attempt {attempt + 1} for {url}: {e}")
 
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
 
    print(f"Failed to fetch {url} after {max_retries} attempts")
    return None
 
async def main():
    urls = [
        "https://api.github.com/users/github",
        "https://api.github.com/users/nonexistent",
        "https://httpbin.org/delay/5"
    ]
 
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
 
        successful = [r for r in results if r is not None]
        print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
 
asyncio.run(main())

aiofiles for Async File I/O

The aiofiles library provides async file operations, preventing blocking during file reads and writes.

import asyncio
import aiofiles
 
async def read_file(filename):
    """Read file asynchronously"""
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
        return contents
 
async def write_file(filename, content):
    """Write file asynchronously"""
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)
 
async def main():
    # Write data
    await write_file('test.txt', 'Hello, async file I/O!')
 
    # Read data
    content = await read_file('test.txt')
    print(f"Read: {content}")
 
asyncio.run(main())

Processing multiple files concurrently:

import asyncio
import aiofiles
 
async def process_file(input_file, output_file):
    """Read, process, and write a file"""
    async with aiofiles.open(input_file, mode='r') as f:
        content = await f.read()
 
    # Process content (convert to uppercase)
    processed = content.upper()
 
    async with aiofiles.open(output_file, mode='w') as f:
        await f.write(processed)
 
    return f"Processed {input_file} -> {output_file}"
 
async def main():
    files = [
        ('input1.txt', 'output1.txt'),
        ('input2.txt', 'output2.txt'),
        ('input3.txt', 'output3.txt')
    ]
 
    tasks = [process_file(inp, out) for inp, out in files]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        print(result)
 
asyncio.run(main())

Reading large files line by line:

import asyncio
import aiofiles
 
async def process_large_file(filename):
    """Process a large file line by line without loading it all into memory"""
    line_count = 0
 
    async with aiofiles.open(filename, mode='r') as f:
        async for line in f:
            # Process each line
            line_count += 1
            if line.strip():
                # Do something with the line
                pass
 
    return line_count
 
async def main():
    count = await process_large_file('large_data.txt')
    print(f"Processed {count} lines")
 
asyncio.run(main())

Error Handling in Async Code

Error handling in async code requires special attention to ensure exceptions are properly caught and resources are cleaned up.

import asyncio
 
async def risky_operation(item_id):
    """An operation that might fail"""
    await asyncio.sleep(1)
 
    if item_id % 2 == 0:
        raise ValueError(f"Item {item_id} caused an error")
 
    return f"Item {item_id} processed"
 
async def handle_with_try_except():
    """Handle errors with try/except"""
    try:
        result = await risky_operation(2)
        print(result)
    except ValueError as e:
        print(f"Error caught: {e}")
 
asyncio.run(handle_with_try_except())

Handling errors in concurrent tasks:

async def safe_operation(item_id):
    """Wrapper that catches errors from risky_operation"""
    try:
        result = await risky_operation(item_id)
        return {"success": True, "result": result}
    except Exception as e:
        return {"success": False, "error": str(e), "item_id": item_id}
 
async def main():
    tasks = [safe_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        if result["success"]:
            print(f"Success: {result['result']}")
        else:
            print(f"Failed: Item {result['item_id']} - {result['error']}")
 
asyncio.run(main())

Using gather() with return_exceptions=True:

async def main():
    tasks = [risky_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

Cleanup with async context managers:

class AsyncResource:
    """A resource that needs cleanup even if errors occur"""
 
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(0.5)
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Cleaning up resource")
        await asyncio.sleep(0.5)
 
        if exc_type is not None:
            print(f"Exception during resource use: {exc_val}")
 
        # Return False to propagate exception, True to suppress
        return False
 
    async def do_work(self):
        await asyncio.sleep(1)
        raise ValueError("Something went wrong")
 
async def main():
    try:
        async with AsyncResource() as resource:
            await resource.do_work()
    except ValueError as e:
        print(f"Caught error: {e}")
 
asyncio.run(main())

Performance Benchmarks: Sync vs Async for I/O-Bound Tasks

Let's compare synchronous and asynchronous approaches for I/O-bound operations with real benchmarks.

import asyncio
import time
import requests
import aiohttp
 
# Synchronous approach
def fetch_sync(url):
    response = requests.get(url)
    return len(response.text)
 
def benchmark_sync(urls):
    start = time.time()
    results = [fetch_sync(url) for url in urls]
    elapsed = time.time() - start
    return elapsed, results
 
# Asynchronous approach
async def fetch_async(session, url):
    async with session.get(url) as response:
        text = await response.text()
        return len(text)
 
async def benchmark_async(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    return elapsed, results
 
# Run benchmarks
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1"
]
 
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
 
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
 
print(f"\nSpeedup: {sync_time / async_time:.2f}x")

Typical results for 5 URLs with 1-second delay each:

  • Synchronous: ~5 seconds (sequential execution)
  • Asynchronous: ~1 second (concurrent execution)
  • Speedup: ~5x
Number of URLsSync TimeAsync TimeSpeedup
55.2s1.1s4.7x
1010.4s1.2s8.7x
2020.8s1.4s14.9x
5052.1s2.1s24.8x
100104.5s3.8s27.5x

The speedup increases with the number of concurrent operations until you hit bandwidth or rate limiting constraints.

Common Pitfalls and How to Avoid Them

Forgetting to await

# WRONG: Coroutine is created but not executed
async def wrong():
    result = fetch_data()  # Missing await!
    print(result)  # Prints coroutine object, not the result
 
# CORRECT: Await the coroutine
async def correct():
    result = await fetch_data()
    print(result)  # Prints actual result

Blocking the event loop

# WRONG: Blocks the entire event loop
async def blocking_code():
    time.sleep(5)  # Blocks everything!
    result = compute_something()  # Blocks if CPU-intensive
    return result
 
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
    await asyncio.sleep(5)  # Only pauses this task
 
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute_something)
    return result

Not handling task cancellation

# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
    resource = await acquire_resource()
    await long_operation()
    await release_resource(resource)  # May not execute if cancelled
 
# CORRECT: Use try/finally or async with
async def proper_cleanup():
    resource = await acquire_resource()
    try:
        await long_operation()
    finally:
        await release_resource(resource)  # Always executes

Creating event loop conflicts

# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
    result = asyncio.run(some_coroutine())  # Error!
    return result
 
# CORRECT: Just await the coroutine
async def correct_nesting():
    result = await some_coroutine()
    return result

Not limiting concurrency

# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
    tasks = [process_item(item) for item in items]  # 10,000 tasks!
    return await asyncio.gather(*tasks)
 
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
 
    async def process_with_limit(item):
        async with semaphore:
            return await process_item(item)
 
    tasks = [process_with_limit(item) for item in items]
    return await asyncio.gather(*tasks)

Real-World Examples

Web Scraping with Rate Limiting

import asyncio
import aiohttp
from bs4 import BeautifulSoup
 
class AsyncWebScraper:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
 
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
 
    async def fetch_page(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
 
    async def scrape_urls(self, urls):
        tasks = [self.fetch_page(url) for url in urls]
        return await asyncio.gather(*tasks)
 
async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    async with AsyncWebScraper(max_concurrent=3) as scraper:
        pages = await scraper.scrape_urls(urls)
 
        for url, html in zip(urls, pages):
            if html:
                print(f"Scraped {url}: {len(html)} bytes")
 
asyncio.run(main())

Async API Data Pipeline

import asyncio
import aiohttp
 
async def fetch_user_ids(session):
    """Fetch list of user IDs from API"""
    async with session.get("https://api.example.com/users") as response:
        data = await response.json()
        return [user["id"] for user in data["users"]]
 
async def fetch_user_details(session, user_id):
    """Fetch detailed info for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}") as response:
        return await response.json()
 
async def fetch_user_posts(session, user_id):
    """Fetch posts for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
        return await response.json()
 
async def process_user(session, user_id):
    """Fetch all data for a user concurrently"""
    details, posts = await asyncio.gather(
        fetch_user_details(session, user_id),
        fetch_user_posts(session, user_id)
    )
 
    return {
        "user": details,
        "posts": posts,
        "post_count": len(posts)
    }
 
async def main():
    async with aiohttp.ClientSession() as session:
        # Fetch user IDs
        user_ids = await fetch_user_ids(session)
 
        # Process all users concurrently
        user_data = await asyncio.gather(
            *[process_user(session, uid) for uid in user_ids[:10]]
        )
 
        for data in user_data:
            print(f"User {data['user']['name']}: {data['post_count']} posts")
 
asyncio.run(main())

Async Chat Server

import asyncio
 
class ChatServer:
    def __init__(self):
        self.clients = set()
 
    async def handle_client(self, reader, writer):
        """Handle a single client connection"""
        addr = writer.get_extra_info('peername')
        print(f"Client connected: {addr}")
 
        self.clients.add(writer)
 
        try:
            while True:
                data = await reader.read(100)
                if not data:
                    break
 
                message = data.decode()
                print(f"Received from {addr}: {message}")
 
                # Broadcast to all clients
                await self.broadcast(f"{addr}: {message}", writer)
 
        except asyncio.CancelledError:
            pass
 
        finally:
            print(f"Client disconnected: {addr}")
            self.clients.remove(writer)
            writer.close()
            await writer.wait_closed()
 
    async def broadcast(self, message, sender):
        """Send message to all clients except sender"""
        for client in self.clients:
            if client != sender:
                try:
                    client.write(message.encode())
                    await client.drain()
                except Exception as e:
                    print(f"Error broadcasting: {e}")
 
    async def start(self, host='127.0.0.1', port=8888):
        """Start the chat server"""
        server = await asyncio.start_server(
            self.handle_client,
            host,
            port
        )
 
        addr = server.sockets[0].getsockname()
        print(f"Chat server running on {addr}")
 
        async with server:
            await server.serve_forever()
 
async def main():
    chat_server = ChatServer()
    await chat_server.start()
 
asyncio.run(main())

Experimenting with Asyncio in Jupyter

When working with asyncio in Jupyter notebooks, you may encounter event loop conflicts. Jupyter already runs an event loop, which can interfere with asyncio.run().

For seamless async experimentation in Jupyter environments, consider using RunCell (opens in a new tab), an AI agent designed specifically for Jupyter notebooks. RunCell handles event loop management automatically and provides enhanced async debugging capabilities, allowing you to test asyncio patterns interactively without conflicts.

In standard Jupyter, you can use top-level await:

# In Jupyter, this works directly
async def fetch_data():
    await asyncio.sleep(1)
    return "data"
 
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)

Or use the nest_asyncio package to allow nested event loops:

import nest_asyncio
nest_asyncio.apply()
 
# Now asyncio.run() works in Jupyter
asyncio.run(main())

FAQ

What is the difference between asyncio and threading in Python?

Asyncio uses cooperative multitasking on a single thread, where tasks voluntarily yield control using await. Threading uses preemptive multitasking with multiple OS threads, where the OS decides when to switch between threads. Asyncio is more efficient for I/O-bound tasks with lower memory overhead and no race condition risks, while threading can handle blocking libraries that don't support async. Both are limited by Python's GIL for CPU-bound work, but asyncio avoids the overhead of thread context switching.

When should I use asyncio instead of multiprocessing?

Use asyncio for I/O-bound tasks like API calls, database queries, file operations, and network communication. These tasks spend most of their time waiting for external resources. Use multiprocessing for CPU-bound tasks like data processing, mathematical computations, image manipulation, and machine learning model training. Multiprocessing creates separate processes that bypass Python's GIL, enabling true parallel execution on multiple CPU cores. Asyncio excels at handling thousands of concurrent I/O operations with minimal resource overhead, while multiprocessing is limited by CPU core count but provides actual parallel computation.

Can I mix async and sync code in the same application?

Yes, but with careful planning. You can call async functions from sync code using asyncio.run(), though you cannot call it from within an already-running event loop. To call sync blocking functions from async code, use loop.run_in_executor() to run them in a thread pool, preventing them from blocking the event loop. Never use blocking operations like time.sleep(), requests.get(), or synchronous file I/O directly in async functions, as they block the entire event loop. Instead, use async equivalents like asyncio.sleep(), aiohttp, and aiofiles.

How do I debug asyncio code effectively?

Enable asyncio debug mode with asyncio.run(main(), debug=True) or set the environment variable PYTHONASYNCIODEBUG=1. This detects common mistakes like forgetting await, callbacks taking too long, and coroutines that were never awaited. Use logging extensively to trace execution flow, as traditional debuggers can be confusing with async code. Add task names with asyncio.create_task(coro(), name="task-name") for clearer error messages. Use asyncio.gather(..., return_exceptions=True) to prevent one failing task from hiding errors in others. Monitor your event loop with asyncio.all_tasks() to check for tasks that aren't completing.

What are the performance limits of asyncio?

Asyncio can handle tens of thousands of concurrent I/O operations on a single thread, far exceeding threading limits. The main constraint is that asyncio provides no benefit for CPU-bound operations since it uses a single thread. Performance degrades if you block the event loop with synchronous I/O or heavy computation. Network bandwidth and API rate limits become the bottleneck before asyncio's concurrency limits. Memory usage scales with the number of concurrent tasks, but each task has minimal overhead compared to threads. For maximum performance, combine asyncio for I/O concurrency with multiprocessing for CPU-bound work, and always use semaphores to limit concurrent operations to reasonable levels.

Conclusion

Python asyncio transforms I/O-bound applications from slow, blocking operations into fast, concurrent systems. By mastering async/await syntax, understanding the event loop, and leveraging tools like gather(), create_task(), and semaphores, you can build applications that handle thousands of concurrent operations efficiently.

The key to success with asyncio is recognizing when it's the right tool. Use it for network requests, database queries, file operations, and any task that spends time waiting for external resources. Avoid blocking the event loop with synchronous operations, always use await with coroutines, and limit concurrency with semaphores when necessary.

Start by converting small sections of your codebase to async, measure the performance improvement, and gradually expand. The dramatic speedups in I/O-heavy applications make the learning investment worthwhile.

Related Guides

📚