Skip to content

Python asyncio:异步编程完全指南

Updated on

你的 Python 应用发出 100 次 API 调用,每次耗时 2 秒。使用传统的顺序代码,这需要等待 200 秒。用户盯着加载屏幕。服务器空闲等待响应。这种阻塞行为是破坏应用性能和用户体验的问题。

当你扩展时,痛苦会加剧。数据库查询堆积。文件操作在队列中相互等待。网络爬虫以蜗牛般的速度爬行。每个 I/O 操作都成为瓶颈,在整个系统中连锁反应,将本应快速、响应迅速的应用变成迟缓、浪费资源的怪物。

Python asyncio 通过支持 I/O 密集型任务的并发执行来解决这个问题。代码不必等待每个操作完成后再开始下一个,asyncio 允许你启动多个操作并在等待期间在它们之间切换。这 100 次 API 调用?使用 asyncio,它们大约在 2 秒内完成,而不是 200 秒。本指南向你展示如何在 Python 中实现异步编程,通过实用示例将缓慢、阻塞的代码转换为快速、并发的应用。

📚

什么是异步编程以及为什么它很重要

异步编程允许程序启动可能需要长时间运行的任务,并在这些任务完成之前继续执行其他工作,而不是等待每个任务完成后再开始下一个。

在传统的同步代码中,当你发出 API 请求时,程序会停止并等待响应。在这个等待期间,CPU 处于空闲状态,不做任何有效工作。这对于单个操作来说可以接受,但对于需要处理多个 I/O 操作的应用来说是灾难性的。

Asyncio 提供了一种使用 async/await 语法编写并发代码的方式。它对于以下 I/O 密集型操作特别有效:

  • 向 API 发出 HTTP 请求
  • 读写文件
  • 数据库查询
  • 网络通信
  • WebSocket 连接
  • 消息队列处理

性能提升是显著的。考虑从 50 个不同的 URL 获取数据:

同步方式:50 个请求 × 每个 2 秒 = 总共 100 秒 异步方式:50 个请求并发运行 ≈ 总共 2 秒

这种 50 倍的性能提升来自于更好的资源利用。asyncio 不是在 I/O 操作上阻塞,而是允许程序在等待 I/O 完成时继续执行其他任务。

并发 vs 并行 vs 异步

理解这些概念之间的区别对于有效使用 asyncio 至关重要。

并发意味着同时管理多个任务。任务轮流进行,但在任何给定时刻只有一个在执行。想象一个厨师准备多道菜肴,在每道菜等待烹饪时在任务之间切换。

并行意味着在不同的 CPU 核心上同时执行多个任务。这需要实际的并行处理硬件,非常适合 CPU 密集型任务,如数学计算或图像处理。

异步编程是专为 I/O 密集型任务设计的并发特定形式。它使用单线程并在等待 I/O 操作时在任务之间切换。

特性asyncio线程 (Threading)多进程 (Multiprocessing)
执行模型单线程,协作式多任务多线程,抢占式多任务多进程
最适合I/O 密集型任务使用阻塞库的 I/O 密集型任务CPU 密集型任务
内存开销最小中等
上下文切换成本非常低低到中
复杂性中等 (async/await 语法)高 (竞态条件、锁)高 (IPC、序列化)
GIL 限制不受影响 (单线程)受 GIL 限制不受限制 (独立进程)
I/O 的典型加速比10-100x5-10xN/A

Python 全局解释器锁 (GIL) 阻止线程中 Python 字节码的真正并行执行,使得线程对于 CPU 密集型任务效果较差。Asyncio 通过使用具有协作式多任务的单线程来规避这一限制,而多进程则通过独立进程完全绕过它。

async def 和 await 关键字

asyncio 的基础建立在两个关键字上:asyncawait

async def 关键字定义一个协程函数。当你调用协程函数时,它不会立即执行。相反,它返回一个可以被等待的协程对象。

import asyncio
 
async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(2)  # Simulate I/O operation
    print("Data fetched!")
    return {"status": "success"}
 
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine))  # <class 'coroutine'>
 
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine)  # This would execute the coroutine

await 关键字暂停协程的执行,直到被等待的操作完成。在这个暂停期间,事件循环可以运行其他协程。你只能在 async def 函数内部使用 await

async def process_user(user_id):
    # Await an I/O operation
    user_data = await fetch_user_from_database(user_id)
 
    # Await another I/O operation
    user_profile = await fetch_user_profile(user_id)
 
    # Regular synchronous code runs normally
    processed_data = transform_data(user_data, user_profile)
 
    return processed_data

async/await 的关键规则

  1. 你只能 await 协程、任务或 futures
  2. 你只能在 async def 函数内部使用 await
  3. 普通函数不能使用 await
  4. 调用异步函数而不等待它会创建一个协程对象,但不执行代码

常见错误:

async def wrong_example():
    # This creates a coroutine but doesn't execute it!
    fetch_data()  # Missing await
 
async def correct_example():
    # This actually executes the coroutine
    result = await fetch_data()

asyncio.run() 入口点

asyncio.run() 函数是启动 asyncio 事件循环并运行主协程的标准方式。它在 Python 3.7 中引入,简化了从同步上下文运行异步代码的过程。

import asyncio
 
async def main():
    print("Starting async operations")
    await asyncio.sleep(1)
    print("Finished")
 
# Run the main coroutine
asyncio.run(main())

asyncio.run() 在后台执行的操作:

  1. 创建新的事件循环
  2. 运行提供的协程直到完成
  3. 关闭事件循环
  4. 返回协程的结果
import asyncio
 
async def main():
    result = await compute_value()
    return result
 
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)

asyncio.run() 的重要特性:

  • 不能从正在运行的事件循环内部调用:如果你已经在异步函数中,请改用 await
  • 每次都创建一个新的事件循环:除非你想要独立的事件循环实例,否则不要在同一个程序中多次调用 asyncio.run()
  • 总是关闭循环:执行后事件循环会被清理

对于 Jupyter notebook 或事件循环已在运行的环境,直接使用 awaitasyncio.create_task()。像 RunCell (opens in a new tab) 这样的工具在 Jupyter 环境中提供增强的异步支持,使你无需事件循环冲突即可更轻松地交互式试验 asyncio 模式。

在 Python 3.7 之前,你必须手动管理事件循环:

# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
 
# Modern way (Python 3.7+)
asyncio.run(main())

协程、任务和 Futures

理解这三个核心概念对于掌握 asyncio 至关重要。

协程

协程是用 async def 定义的函数。它是一种特殊的函数,可以被暂停和恢复,允许在暂停期间运行其他代码。

import asyncio
 
async def my_coroutine():
    await asyncio.sleep(1)
    return "completed"
 
# This creates a coroutine object
coro = my_coroutine()
 
# To run it
result = asyncio.run(coro)

任务

任务是协程的包装器,用于在事件循环上调度执行。任务允许协程并发运行。

import asyncio
 
async def say_after(delay, message):
    await asyncio.sleep(delay)
    print(message)
    return message
 
async def main():
    # Create tasks to run concurrently
    task1 = asyncio.create_task(say_after(1, "First"))
    task2 = asyncio.create_task(say_after(2, "Second"))
 
    # Wait for both tasks to complete
    result1 = await task1
    result2 = await task2
 
    print(f"Results: {result1}, {result2}")
 
asyncio.run(main())

创建任务会立即将协程调度执行。事件循环会尽快开始运行它,甚至在你 await 该任务之前。

async def background_work():
    print("Starting background work")
    await asyncio.sleep(2)
    print("Background work completed")
 
async def main():
    # The coroutine starts running immediately
    task = asyncio.create_task(background_work())
 
    print("Task created, doing other work")
    await asyncio.sleep(1)
    print("Other work done")
 
    # Wait for the background task to finish
    await task
 
asyncio.run(main())

输出:

Task created, doing other work
Starting background work
Other work done
Background work completed

Futures

Future 是一个低级的可等待对象,表示异步操作的最终结果。你很少直接创建 futures;它们通常由 asyncio 内部或库创建。

async def set_future_result(future):
    await asyncio.sleep(1)
    future.set_result("Future completed")
 
async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
 
    # Schedule a coroutine to set the future's result
    asyncio.create_task(set_future_result(future))
 
    # Wait for the future to get a result
    result = await future
    print(result)
 
asyncio.run(main())

协程、任务和 futures 之间的关系

  • 协程是你编写的函数
  • 任务包装协程并调度它们执行
  • Futures 表示将在未来可用的结果
  • 任务是 Future 的子类

用于并发执行的 asyncio.create_task()

asyncio.create_task() 函数是你使用 asyncio 实现真正并发的主要工具。它调度协程在事件循环上运行,而不会阻塞当前协程。

import asyncio
import time
 
async def download_file(file_id):
    print(f"Starting download {file_id}")
    await asyncio.sleep(2)  # Simulate download time
    print(f"Completed download {file_id}")
    return f"file_{file_id}.dat"
 
async def sequential_downloads():
    """Downloads files one at a time"""
    start = time.time()
 
    file1 = await download_file(1)
    file2 = await download_file(2)
    file3 = await download_file(3)
 
    elapsed = time.time() - start
    print(f"Sequential: {elapsed:.2f} seconds")
    # Output: ~6 seconds (2 + 2 + 2)
 
async def concurrent_downloads():
    """Downloads files concurrently"""
    start = time.time()
 
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(download_file(1))
    task2 = asyncio.create_task(download_file(2))
    task3 = asyncio.create_task(download_file(3))
 
    # Wait for all tasks to complete
    file1 = await task1
    file2 = await task2
    file3 = await task3
 
    elapsed = time.time() - start
    print(f"Concurrent: {elapsed:.2f} seconds")
    # Output: ~2 seconds (all run at the same time)
 
asyncio.run(concurrent_downloads())

调用 create_task() 时任务立即被调度。你不必立即等待它。

async def process_data():
    # Start background tasks
    task1 = asyncio.create_task(fetch_from_api_1())
    task2 = asyncio.create_task(fetch_from_api_2())
 
    # Do some work while tasks run in background
    local_data = prepare_local_data()
 
    # Now wait for the background tasks
    api_data_1 = await task1
    api_data_2 = await task2
 
    # Combine all data
    return combine_data(local_data, api_data_1, api_data_2)

你还可以为任务命名以便更好地调试:

async def main():
    task = asyncio.create_task(
        long_running_operation(),
        name="long-operation-task"
    )
 
    # Task name is accessible
    print(f"Task name: {task.get_name()}")
 
    await task

用于运行多个协程的 asyncio.gather()

asyncio.gather() 函数并发运行多个协程并等待所有协程完成。当你需要运行多个协程时,它比创建单独的任务更简洁。

import asyncio
 
async def fetch_user(user_id):
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}
 
async def main():
    # Run multiple coroutines concurrently
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
        fetch_user(4),
        fetch_user(5)
    )
 
    print(results)
    # Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
 
asyncio.run(main())

gather() 按照输入协程的顺序返回结果,无论哪个先完成。

async def task_with_delay(delay, value):
    await asyncio.sleep(delay)
    return value
 
async def main():
    results = await asyncio.gather(
        task_with_delay(3, "slow"),
        task_with_delay(1, "fast"),
        task_with_delay(2, "medium")
    )
 
    print(results)
    # Output: ['slow', 'fast', 'medium'] (order preserved)
 
asyncio.run(main())

使用 gather() 进行错误处理

默认情况下,如果任何协程引发异常,gather() 会立即引发该异常并取消剩余的任务。

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("Task failed!")
 
async def successful_task():
    await asyncio.sleep(2)
    return "success"
 
async def main():
    try:
        results = await asyncio.gather(
            failing_task(),
            successful_task()
        )
    except ValueError as e:
        print(f"Error caught: {e}")
        # The successful_task is cancelled
 
asyncio.run(main())

使用 return_exceptions=True 来收集异常以及成功的结果:

async def main():
    results = await asyncio.gather(
        failing_task(),
        successful_task(),
        return_exceptions=True
    )
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

使用列表推导式进行动态 gather

gather() 与动态生成的协程配合得非常好:

async def process_item(item):
    await asyncio.sleep(1)
    return item * 2
 
async def main():
    items = [1, 2, 3, 4, 5]
 
    # Process all items concurrently
    results = await asyncio.gather(
        *[process_item(item) for item in items]
    )
 
    print(results)  # [2, 4, 6, 8, 10]
 
asyncio.run(main())

asyncio.wait() 和 asyncio.as_completed()

虽然 gather() 等待所有协程完成,但 wait()as_completed() 提供了更细粒度的控制,让你可以处理已完成的任务。

asyncio.wait()

wait() 允许你以不同的完成条件等待任务:所有任务、第一个任务或第一个异常。

import asyncio
 
async def task(delay, name):
    await asyncio.sleep(delay)
    return f"{name} completed"
 
async def main():
    tasks = [
        asyncio.create_task(task(1, "Task 1")),
        asyncio.create_task(task(2, "Task 2")),
        asyncio.create_task(task(3, "Task 3"))
    ]
 
    # Wait for all tasks to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )
 
    for task in done:
        print(task.result())
 
asyncio.run(main())

不同的完成条件:

async def main():
    tasks = [
        asyncio.create_task(task(1, "Fast")),
        asyncio.create_task(task(3, "Slow")),
        asyncio.create_task(task(2, "Medium"))
    ]
 
    # Return when the first task completes
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
 
    print(f"First completed: {done.pop().result()}")
    print(f"Still pending: {len(pending)} tasks")
 
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
 
asyncio.run(main())

asyncio.as_completed()

as_completed() 返回一个迭代器,在任务完成时产出任务,允许你在结果可用时立即处理它们。

import asyncio
 
async def fetch_data(url_id, delay):
    await asyncio.sleep(delay)
    return f"Data from URL {url_id}"
 
async def main():
    tasks = [
        fetch_data(1, 3),
        fetch_data(2, 1),
        fetch_data(3, 2)
    ]
 
    # Process results as they complete
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Received: {result}")
 
asyncio.run(main())

输出按完成顺序显示结果,而非提交顺序:

Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1

当你想尽快向用户显示结果时,这特别有用:

async def search_engine(query, engine_name, delay):
    await asyncio.sleep(delay)
    return f"{engine_name}: Results for '{query}'"
 
async def main():
    query = "python asyncio"
 
    searches = [
        search_engine(query, "Google", 1.5),
        search_engine(query, "Bing", 2.0),
        search_engine(query, "DuckDuckGo", 1.0)
    ]
 
    print("Searching...")
    for search in asyncio.as_completed(searches):
        result = await search
        print(result)  # Display each result as soon as it arrives
 
asyncio.run(main())

asyncio.sleep() 与 time.sleep()

这种区别对于异步代码的正确性至关重要。

time.sleep() 是一个阻塞操作,会暂停整个线程,包括事件循环。这会阻止所有异步任务运行。

asyncio.sleep() 是一个非阻塞协程,只暂停当前任务,允许其他任务运行。

import asyncio
import time
 
async def blocking_example():
    """Bad: Uses time.sleep() - blocks the entire event loop"""
    print("Starting blocking sleep")
    time.sleep(2)  # WRONG: Blocks everything!
    print("Finished blocking sleep")
 
async def non_blocking_example():
    """Good: Uses asyncio.sleep() - allows other tasks to run"""
    print("Starting non-blocking sleep")
    await asyncio.sleep(2)  # CORRECT: Only pauses this task
    print("Finished non-blocking sleep")
 
async def concurrent_task():
    for i in range(3):
        print(f"Concurrent task running: {i}")
        await asyncio.sleep(0.5)
 
async def demo_blocking():
    print("\n=== Blocking example (BAD) ===")
    await asyncio.gather(
        blocking_example(),
        concurrent_task()
    )
 
async def demo_non_blocking():
    print("\n=== Non-blocking example (GOOD) ===")
    await asyncio.gather(
        non_blocking_example(),
        concurrent_task()
    )
 
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())

在阻塞示例中,并发任务直到 time.sleep() 完成后才运行。在非阻塞示例中,两个任务并发运行。

经验法则:绝不要在异步代码中使用 time.sleep()。始终使用 await asyncio.sleep()

对于无法避免的 CPU 密集型操作,使用 loop.run_in_executor() 在单独的线程或进程中运行它们:

import asyncio
import time
 
def cpu_intensive_task():
    """Some blocking CPU work"""
    time.sleep(2)  # Simulate heavy computation
    return "CPU task completed"
 
async def main():
    loop = asyncio.get_event_loop()
 
    # Run blocking task in a thread pool
    result = await loop.run_in_executor(None, cpu_intensive_task)
    print(result)
 
asyncio.run(main())

async for 和 async with

Python 提供了 for 循环和上下文管理器的异步版本,用于处理异步可迭代对象和资源。

异步迭代器 (async for)

异步迭代器是实现 __aiter__()__anext__() 方法的对象,允许你迭代需要通过异步操作获取的项目。

import asyncio
 
class AsyncRange:
    """An async iterator that yields numbers with delays"""
    def __init__(self, start, end):
        self.current = start
        self.end = end
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
 
        # Simulate async operation to get next value
        await asyncio.sleep(0.5)
        value = self.current
        self.current += 1
        return value
 
async def main():
    async for number in AsyncRange(1, 5):
        print(f"Got number: {number}")
 
asyncio.run(main())

使用异步数据库游标的真实示例:

class AsyncDatabaseCursor:
    """Simulated async database cursor"""
    def __init__(self, query):
        self.query = query
        self.results = []
        self.index = 0
 
    async def execute(self):
        # Simulate database query
        await asyncio.sleep(1)
        self.results = [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"},
            {"id": 3, "name": "Charlie"}
        ]
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.results):
            raise StopAsyncIteration
 
        # Simulate fetching next row
        await asyncio.sleep(0.1)
        row = self.results[self.index]
        self.index += 1
        return row
 
async def fetch_users():
    cursor = AsyncDatabaseCursor("SELECT * FROM users")
    await cursor.execute()
 
    async for row in cursor:
        print(f"User: {row['name']}")
 
asyncio.run(fetch_users())

异步上下文管理器 (async with)

异步上下文管理器实现 __aenter__()__aexit__() 方法,用于管理需要异步设置和清理的资源。

import asyncio
 
class AsyncDatabaseConnection:
    """An async context manager for database connections"""
 
    async def __aenter__(self):
        print("Opening database connection...")
        await asyncio.sleep(1)  # Simulate connection time
        print("Database connection opened")
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)  # Simulate cleanup
        print("Database connection closed")
 
    async def query(self, sql):
        await asyncio.sleep(0.5)
        return f"Results for: {sql}"
 
async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(result)
    # Connection automatically closed after the with block
 
asyncio.run(main())

将异步上下文管理器与异步迭代器结合:

class AsyncFileReader:
    """Async context manager and iterator for reading files"""
 
    def __init__(self, filename):
        self.filename = filename
        self.lines = []
        self.index = 0
 
    async def __aenter__(self):
        # Simulate async file opening
        await asyncio.sleep(0.5)
        self.lines = ["Line 1", "Line 2", "Line 3"]
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.2)
        self.lines = []
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.lines):
            raise StopAsyncIteration
 
        await asyncio.sleep(0.1)
        line = self.lines[self.index]
        self.index += 1
        return line
 
async def read_file():
    async with AsyncFileReader("data.txt") as reader:
        async for line in reader:
            print(line)
 
asyncio.run(read_file())

用于生产者-消费者模式的 asyncio.Queue

asyncio.Queue 是一个线程安全的、支持异步的队列,非常适合在生产者和消费者协程之间协调工作。

import asyncio
import random
 
async def producer(queue, producer_id):
    """Produces items and puts them in the queue"""
    for i in range(5):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
 
async def consumer(queue, consumer_id):
    """Consumes items from the queue"""
    while True:
        item = await queue.get()
        print(f"Consumer {consumer_id} consuming: {item}")
 
        # Simulate processing time
        await asyncio.sleep(random.uniform(0.2, 0.8))
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue(maxsize=10)
 
    # Create producers and consumers
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
 
    # Wait for all producers to finish
    await asyncio.gather(*producers)
 
    # Wait for the queue to be fully processed
    await queue.join()
 
    # Cancel consumers (they run forever)
    for consumer_task in consumers:
        consumer_task.cancel()
 
asyncio.run(main())

真实示例:使用 URL 队列的网络爬虫:

import asyncio
from typing import Set
 
async def fetch_url(session, url):
    """Simulate fetching a URL"""
    await asyncio.sleep(1)
    return f"Content from {url}"
 
async def producer(queue: asyncio.Queue, start_urls: list):
    """Add URLs to the queue"""
    for url in start_urls:
        await queue.put(url)
 
async def consumer(queue: asyncio.Queue, visited: Set[str]):
    """Fetch URLs from the queue"""
    while True:
        url = await queue.get()
 
        if url in visited:
            queue.task_done()
            continue
 
        visited.add(url)
        print(f"Scraping: {url}")
 
        # Simulate fetching
        content = await fetch_url(None, url)
 
        # Simulate finding new URLs in the content
        # In real scraper, you'd parse HTML and extract links
        new_urls = []  # Would extract from content
 
        for new_url in new_urls:
            if new_url not in visited:
                await queue.put(new_url)
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue()
    visited = set()
 
    start_urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    # Add initial URLs
    await producer(queue, start_urls)
 
    # Create consumers
    consumers = [
        asyncio.create_task(consumer(queue, visited))
        for _ in range(3)
    ]
 
    # Wait for all URLs to be processed
    await queue.join()
 
    # Cancel consumers
    for consumer_task in consumers:
        consumer_task.cancel()
 
    print(f"Scraped {len(visited)} unique URLs")
 
asyncio.run(main())

用于速率限制的 Semaphores

asyncio.Semaphore 控制可以同时访问资源的协程数量。这对于限制 API 调用速率或限制并发数据库连接至关重要。

import asyncio
import time
 
async def call_api(semaphore, api_id):
    """Make an API call with rate limiting"""
    async with semaphore:
        print(f"API call {api_id} started")
        await asyncio.sleep(1)  # Simulate API call
        print(f"API call {api_id} completed")
        return f"Result {api_id}"
 
async def main():
    # Allow only 3 concurrent API calls
    semaphore = asyncio.Semaphore(3)
 
    start = time.time()
 
    # Create 10 API calls
    tasks = [call_api(semaphore, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
 
    elapsed = time.time() - start
    print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
    # With semaphore(3): ~4 seconds (10 calls in batches of 3)
    # Without semaphore: ~1 second (all concurrent)
 
asyncio.run(main())

用于 API 配额合规的速率限制:

import asyncio
from datetime import datetime
 
class RateLimiter:
    """Rate limiter that allows N requests per time period"""
 
    def __init__(self, max_requests, time_period):
        self.max_requests = max_requests
        self.time_period = time_period
        self.semaphore = asyncio.Semaphore(max_requests)
        self.request_times = []
 
    async def __aenter__(self):
        await self.semaphore.acquire()
 
        # Wait if we've hit the rate limit
        while len(self.request_times) >= self.max_requests:
            oldest = self.request_times[0]
            elapsed = datetime.now().timestamp() - oldest
 
            if elapsed < self.time_period:
                sleep_time = self.time_period - elapsed
                await asyncio.sleep(sleep_time)
 
            self.request_times.pop(0)
 
        self.request_times.append(datetime.now().timestamp())
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.semaphore.release()
 
async def fetch_with_rate_limit(rate_limiter, item_id):
    async with rate_limiter:
        print(f"Fetching item {item_id} at {datetime.now()}")
        await asyncio.sleep(0.5)
        return f"Item {item_id}"
 
async def main():
    # Allow 5 requests per 2 seconds
    rate_limiter = RateLimiter(max_requests=5, time_period=2)
 
    # Make 15 requests
    tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
    results = await asyncio.gather(*tasks)
 
    print(f"Completed {len(results)} requests")
 
asyncio.run(main())

用于异步 HTTP 请求的 aiohttp

aiohttp 库提供异步 HTTP 客户端和服务器功能。它是在异步代码中发出 HTTP 请求的标准选择。

import asyncio
import aiohttp
 
async def fetch_url(session, url):
    """Fetch a single URL"""
    async with session.get(url) as response:
        return await response.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch_url(session, "https://api.github.com")
        print(f"Fetched {len(html)} characters")
 
asyncio.run(main())

并发获取多个 URL:

import asyncio
import aiohttp
import time
 
async def fetch_url(session, url):
    """Fetch URL and return status code and content length"""
    async with session.get(url) as response:
        content = await response.text()
        return {
            "url": url,
            "status": response.status,
            "length": len(content)
        }
 
async def fetch_all_urls(urls):
    """Fetch multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
 
async def main():
    urls = [
        "https://api.github.com",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404"
    ]
 
    start = time.time()
    results = await fetch_all_urls(urls)
    elapsed = time.time() - start
 
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
 
    print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
 
asyncio.run(main())

带有错误处理和重试的实用示例:

import asyncio
import aiohttp
from typing import Optional
 
async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Optional[dict]:
    """Fetch URL with retry logic"""
 
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                if response.status == 200:
                    data = await response.json()
                    return data
                elif response.status == 404:
                    print(f"URL not found: {url}")
                    return None
                else:
                    print(f"Unexpected status {response.status} for {url}")
 
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1} for {url}")
 
        except aiohttp.ClientError as e:
            print(f"Client error on attempt {attempt + 1} for {url}: {e}")
 
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
 
    print(f"Failed to fetch {url} after {max_retries} attempts")
    return None
 
async def main():
    urls = [
        "https://api.github.com/users/github",
        "https://api.github.com/users/nonexistent",
        "https://httpbin.org/delay/5"
    ]
 
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
 
        successful = [r for r in results if r is not None]
        print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
 
asyncio.run(main())

用于异步文件 I/O 的 aiofiles

aiofiles 库提供异步文件操作,防止在文件读写期间发生阻塞。

import asyncio
import aiofiles
 
async def read_file(filename):
    """Read file asynchronously"""
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
        return contents
 
async def write_file(filename, content):
    """Write file asynchronously"""
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)
 
async def main():
    # Write data
    await write_file('test.txt', 'Hello, async file I/O!')
 
    # Read data
    content = await read_file('test.txt')
    print(f"Read: {content}")
 
asyncio.run(main())

并发处理多个文件:

import asyncio
import aiofiles
 
async def process_file(input_file, output_file):
    """Read, process, and write a file"""
    async with aiofiles.open(input_file, mode='r') as f:
        content = await f.read()
 
    # Process content (convert to uppercase)
    processed = content.upper()
 
    async with aiofiles.open(output_file, mode='w') as f:
        await f.write(processed)
 
    return f"Processed {input_file} -> {output_file}"
 
async def main():
    files = [
        ('input1.txt', 'output1.txt'),
        ('input2.txt', 'output2.txt'),
        ('input3.txt', 'output3.txt')
    ]
 
    tasks = [process_file(inp, out) for inp, out in files]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        print(result)
 
asyncio.run(main())

逐行读取大文件:

import asyncio
import aiofiles
 
async def process_large_file(filename):
    """Process a large file line by line without loading it all into memory"""
    line_count = 0
 
    async with aiofiles.open(filename, mode='r') as f:
        async for line in f:
            # Process each line
            line_count += 1
            if line.strip():
                # Do something with the line
                pass
 
    return line_count
 
async def main():
    count = await process_large_file('large_data.txt')
    print(f"Processed {count} lines")
 
asyncio.run(main())

异步代码中的错误处理

异步代码中的错误处理需要特别注意,以确保异常被正确捕获并且资源被清理。

import asyncio
 
async def risky_operation(item_id):
    """An operation that might fail"""
    await asyncio.sleep(1)
 
    if item_id % 2 == 0:
        raise ValueError(f"Item {item_id} caused an error")
 
    return f"Item {item_id} processed"
 
async def handle_with_try_except():
    """Handle errors with try/except"""
    try:
        result = await risky_operation(2)
        print(result)
    except ValueError as e:
        print(f"Error caught: {e}")
 
asyncio.run(handle_with_try_except())

处理并发任务中的错误:

async def safe_operation(item_id):
    """Wrapper that catches errors from risky_operation"""
    try:
        result = await risky_operation(item_id)
        return {"success": True, "result": result}
    except Exception as e:
        return {"success": False, "error": str(e), "item_id": item_id}
 
async def main():
    tasks = [safe_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        if result["success"]:
            print(f"Success: {result['result']}")
        else:
            print(f"Failed: Item {result['item_id']} - {result['error']}")
 
asyncio.run(main())

使用带有 return_exceptions=Truegather()

async def main():
    tasks = [risky_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

使用异步上下文管理器进行清理:

class AsyncResource:
    """A resource that needs cleanup even if errors occur"""
 
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(0.5)
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Cleaning up resource")
        await asyncio.sleep(0.5)
 
        if exc_type is not None:
            print(f"Exception during resource use: {exc_val}")
 
        # Return False to propagate exception, True to suppress
        return False
 
    async def do_work(self):
        await asyncio.sleep(1)
        raise ValueError("Something went wrong")
 
async def main():
    try:
        async with AsyncResource() as resource:
            await resource.do_work()
    except ValueError as e:
        print(f"Caught error: {e}")
 
asyncio.run(main())

性能基准测试:同步 vs 异步 I/O 密集型任务

让我们通过真实基准测试比较 I/O 密集型操作的同步和异步方法。

import asyncio
import time
import requests
import aiohttp
 
# Synchronous approach
def fetch_sync(url):
    response = requests.get(url)
    return len(response.text)
 
def benchmark_sync(urls):
    start = time.time()
    results = [fetch_sync(url) for url in urls]
    elapsed = time.time() - start
    return elapsed, results
 
# Asynchronous approach
async def fetch_async(session, url):
    async with session.get(url) as response:
        text = await response.text()
        return len(text)
 
async def benchmark_async(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    return elapsed, results
 
# Run benchmarks
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1"
]
 
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
 
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
 
print(f"\nSpeedup: {sync_time / async_time:.2f}x")

对于每个有 1 秒延迟的 5 个 URL,典型结果如下:

  • 同步:~5 秒(顺序执行)
  • 异步:~1 秒(并发执行)
  • 加速比:~5 倍
URL 数量同步时间异步时间加速比
55.2s1.1s4.7x
1010.4s1.2s8.7x
2020.8s1.4s14.9x
5052.1s2.1s24.8x
100104.5s3.8s27.5x

加速比随着并发操作数量的增加而增加,直到达到带宽或速率限制约束。

常见陷阱以及如何避免它们

忘记 await

# WRONG: Coroutine is created but not executed
async def wrong():
    result = fetch_data()  # Missing await!
    print(result)  # Prints coroutine object, not the result
 
# CORRECT: Await the coroutine
async def correct():
    result = await fetch_data()
    print(result)  # Prints actual result

阻塞事件循环

# WRONG: Blocks the entire event loop
async def blocking_code():
    time.sleep(5)  # Blocks everything!
    result = compute_something()  # Blocks if CPU-intensive
    return result
 
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
    await asyncio.sleep(5)  # Only pauses this task
 
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute_something)
    return result

不处理任务取消

# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
    resource = await acquire_resource()
    await long_operation()
    await release_resource(resource)  # May not execute if cancelled
 
# CORRECT: Use try/finally or async with
async def proper_cleanup():
    resource = await acquire_resource()
    try:
        await long_operation()
    finally:
        await release_resource(resource)  # Always executes

创建事件循环冲突

# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
    result = asyncio.run(some_coroutine())  # Error!
    return result
 
# CORRECT: Just await the coroutine
async def correct_nesting():
    result = await some_coroutine()
    return result

不限制并发

# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
    tasks = [process_item(item) for item in items]  # 10,000 tasks!
    return await asyncio.gather(*tasks)
 
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
 
    async def process_with_limit(item):
        async with semaphore:
            return await process_item(item)
 
    tasks = [process_with_limit(item) for item in items]
    return await asyncio.gather(*tasks)

真实示例

带速率限制的网络爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup
 
class AsyncWebScraper:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
 
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
 
    async def fetch_page(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
 
    async def scrape_urls(self, urls):
        tasks = [self.fetch_page(url) for url in urls]
        return await asyncio.gather(*tasks)
 
async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    async with AsyncWebScraper(max_concurrent=3) as scraper:
        pages = await scraper.scrape_urls(urls)
 
        for url, html in zip(urls, pages):
            if html:
                print(f"Scraped {url}: {len(html)} bytes")
 
asyncio.run(main())

异步 API 数据管道

import asyncio
import aiohttp
 
async def fetch_user_ids(session):
    """Fetch list of user IDs from API"""
    async with session.get("https://api.example.com/users") as response:
        data = await response.json()
        return [user["id"] for user in data["users"]]
 
async def fetch_user_details(session, user_id):
    """Fetch detailed info for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}") as response:
        return await response.json()
 
async def fetch_user_posts(session, user_id):
    """Fetch posts for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
        return await response.json()
 
async def process_user(session, user_id):
    """Fetch all data for a user concurrently"""
    details, posts = await asyncio.gather(
        fetch_user_details(session, user_id),
        fetch_user_posts(session, user_id)
    )
 
    return {
        "user": details,
        "posts": posts,
        "post_count": len(posts)
    }
 
async def main():
    async with aiohttp.ClientSession() as session:
        # Fetch user IDs
        user_ids = await fetch_user_ids(session)
 
        # Process all users concurrently
        user_data = await asyncio.gather(
            *[process_user(session, uid) for uid in user_ids[:10]]
        )
 
        for data in user_data:
            print(f"User {data['user']['name']}: {data['post_count']} posts")
 
asyncio.run(main())

异步聊天服务器

import asyncio
 
class ChatServer:
    def __init__(self):
        self.clients = set()
 
    async def handle_client(self, reader, writer):
        """Handle a single client connection"""
        addr = writer.get_extra_info('peername')
        print(f"Client connected: {addr}")
 
        self.clients.add(writer)
 
        try:
            while True:
                data = await reader.read(100)
                if not data:
                    break
 
                message = data.decode()
                print(f"Received from {addr}: {message}")
 
                # Broadcast to all clients
                await self.broadcast(f"{addr}: {message}", writer)
 
        except asyncio.CancelledError:
            pass
 
        finally:
            print(f"Client disconnected: {addr}")
            self.clients.remove(writer)
            writer.close()
            await writer.wait_closed()
 
    async def broadcast(self, message, sender):
        """Send message to all clients except sender"""
        for client in self.clients:
            if client != sender:
                try:
                    client.write(message.encode())
                    await client.drain()
                except Exception as e:
                    print(f"Error broadcasting: {e}")
 
    async def start(self, host='127.0.0.1', port=8888):
        """Start the chat server"""
        server = await asyncio.start_server(
            self.handle_client,
            host,
            port
        )
 
        addr = server.sockets[0].getsockname()
        print(f"Chat server running on {addr}")
 
        async with server:
            await server.serve_forever()
 
async def main():
    chat_server = ChatServer()
    await chat_server.start()
 
asyncio.run(main())

在 Jupyter 中试验 Asyncio

在 Jupyter notebook 中使用 asyncio 时,你可能会遇到事件循环冲突。Jupyter 已经运行着一个事件循环,这可能与 asyncio.run() 产生干扰。

为了在 Jupyter 环境中无缝进行异步试验,请考虑使用 RunCell (opens in a new tab),这是一个专为 Jupyter notebook 设计的 AI 代理。RunCell 自动处理事件循环管理并提供增强的异步调试功能,让你无需冲突即可交互式地测试 asyncio 模式。

在标准 Jupyter 中,你可以使用顶级 await

# In Jupyter, this works directly
async def fetch_data():
    await asyncio.sleep(1)
    return "data"
 
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)

或者使用 nest_asyncio 包来允许嵌套事件循环:

import nest_asyncio
nest_asyncio.apply()
 
# Now asyncio.run() works in Jupyter
asyncio.run(main())

FAQ

asyncio 和 Python 中的 threading 有什么区别?

Asyncio 在单线程上使用协作式多任务,任务使用 await 自愿让出控制权。Threading 使用多个操作系统线程的抢占式多任务,操作系统决定何时在线程之间切换。Asyncio 对于 I/O 密集型任务更高效,内存开销更低且没有竞态条件风险,而 threading 可以处理不支持异步的阻塞库。两者都受 Python GIL 对 CPU 密集型工作的限制,但 asyncio 避免了线程上下文切换的开销。

什么时候应该使用 asyncio 而不是 multiprocessing?

对于 I/O 密集型任务(如 API 调用、数据库查询、文件操作和网络通信)使用 asyncio。对于 CPU 密集型任务(如数据处理、数学计算、图像处理和机器学习模型训练)使用 multiprocessing。Multiprocessing 创建绕过 Python GIL 的独立进程,实现在多个 CPU 核心上的真正并行执行。Asyncio 擅长以最小的资源开销处理数千个并发 I/O 操作,而 multiprocessing 受 CPU 核心数限制但提供实际的并行计算。

我可以在同一个应用中混合使用异步和同步代码吗?

可以,但需要仔细规划。你可以使用 asyncio.run() 从同步代码调用异步函数,但不能从已经运行的事件循环内部调用它。要从异步代码调用同步阻塞函数,请使用 loop.run_in_executor() 在线程池中运行它们,防止它们阻塞事件循环。绝不要在异步函数中直接使用阻塞操作,如 time.sleep()requests.get() 或同步文件 I/O,因为它们会阻塞整个事件循环。相反,请使用异步等效物,如 asyncio.sleep()aiohttpaiofiles

如何有效地调试 asyncio 代码?

使用 asyncio.run(main(), debug=True) 或设置环境变量 PYTHONASYNCIODEBUG=1 启用 asyncio 调试模式。这会检测常见错误,如忘记 await、耗时过长的回调以及从未被等待的协程。使用大量日志记录来跟踪执行流程,因为传统调试器可能会让异步代码变得混乱。使用 asyncio.create_task(coro(), name="task-name") 添加任务名称以获得更清晰的错误消息。使用 asyncio.gather(..., return_exceptions=True) 防止一个失败的任务隐藏其他任务中的错误。使用 asyncio.all_tasks() 监控你的事件循环,检查未完成的任务。

asyncio 的性能限制是什么?

Asyncio 可以在单线程上处理数万个并发 I/O 操作,远远超过线程限制。主要限制是 asyncio 对 CPU 密集型操作没有好处,因为它使用单线程。如果你用同步 I/O 或繁重计算阻塞事件循环,性能会下降。网络带宽和 API 速率限制会在达到 asyncio 并发限制之前成为瓶颈。内存使用量随着并发任务的数量而扩展,但每个任务与线程相比开销最小。为了获得最大性能,将 asyncio 用于 I/O 并发,将 multiprocessing 用于 CPU 密集型工作,并始终使用信号量将并发操作限制在合理水平。

结论

Python asyncio 将 I/O 密集型应用从缓慢、阻塞的操作转变为快速、并发的系统。通过掌握 async/await 语法、理解事件循环,以及利用 gather()create_task() 和信号量等工具,你可以构建能够高效处理数千个并发操作的应用。

使用 asyncio 成功的关键是认识到它何时是正确的工具。将其用于网络请求、数据库查询、文件操作以及任何花费时间等待外部资源的任务。避免用同步操作阻塞事件循环,始终对协程使用 await,并在必要时使用信号量限制并发。

从将代码库的小部分转换为异步开始,测量性能改进,然后逐步扩展。I/O 密集型应用中显著的速度提升使学习投资物有所值。

📚