Python asyncio:异步编程完全指南
Updated on
你的 Python 应用发出 100 次 API 调用,每次耗时 2 秒。使用传统的顺序代码,这需要等待 200 秒。用户盯着加载屏幕。服务器空闲等待响应。这种阻塞行为是破坏应用性能和用户体验的问题。
当你扩展时,痛苦会加剧。数据库查询堆积。文件操作在队列中相互等待。网络爬虫以蜗牛般的速度爬行。每个 I/O 操作都成为瓶颈,在整个系统中连锁反应,将本应快速、响应迅速的应用变成迟缓、浪费资源的怪物。
Python asyncio 通过支持 I/O 密集型任务的并发执行来解决这个问题。代码不必等待每个操作完成后再开始下一个,asyncio 允许你启动多个操作并在等待期间在它们之间切换。这 100 次 API 调用?使用 asyncio,它们大约在 2 秒内完成,而不是 200 秒。本指南向你展示如何在 Python 中实现异步编程,通过实用示例将缓慢、阻塞的代码转换为快速、并发的应用。
什么是异步编程以及为什么它很重要
异步编程允许程序启动可能需要长时间运行的任务,并在这些任务完成之前继续执行其他工作,而不是等待每个任务完成后再开始下一个。
在传统的同步代码中,当你发出 API 请求时,程序会停止并等待响应。在这个等待期间,CPU 处于空闲状态,不做任何有效工作。这对于单个操作来说可以接受,但对于需要处理多个 I/O 操作的应用来说是灾难性的。
Asyncio 提供了一种使用 async/await 语法编写并发代码的方式。它对于以下 I/O 密集型操作特别有效:
- 向 API 发出 HTTP 请求
- 读写文件
- 数据库查询
- 网络通信
- WebSocket 连接
- 消息队列处理
性能提升是显著的。考虑从 50 个不同的 URL 获取数据:
同步方式:50 个请求 × 每个 2 秒 = 总共 100 秒 异步方式:50 个请求并发运行 ≈ 总共 2 秒
这种 50 倍的性能提升来自于更好的资源利用。asyncio 不是在 I/O 操作上阻塞,而是允许程序在等待 I/O 完成时继续执行其他任务。
并发 vs 并行 vs 异步
理解这些概念之间的区别对于有效使用 asyncio 至关重要。
并发意味着同时管理多个任务。任务轮流进行,但在任何给定时刻只有一个在执行。想象一个厨师准备多道菜肴,在每道菜等待烹饪时在任务之间切换。
并行意味着在不同的 CPU 核心上同时执行多个任务。这需要实际的并行处理硬件,非常适合 CPU 密集型任务,如数学计算或图像处理。
异步编程是专为 I/O 密集型任务设计的并发特定形式。它使用单线程并在等待 I/O 操作时在任务之间切换。
| 特性 | asyncio | 线程 (Threading) | 多进程 (Multiprocessing) |
|---|---|---|---|
| 执行模型 | 单线程,协作式多任务 | 多线程,抢占式多任务 | 多进程 |
| 最适合 | I/O 密集型任务 | 使用阻塞库的 I/O 密集型任务 | CPU 密集型任务 |
| 内存开销 | 最小 | 中等 | 高 |
| 上下文切换成本 | 非常低 | 低到中 | 高 |
| 复杂性 | 中等 (async/await 语法) | 高 (竞态条件、锁) | 高 (IPC、序列化) |
| GIL 限制 | 不受影响 (单线程) | 受 GIL 限制 | 不受限制 (独立进程) |
| I/O 的典型加速比 | 10-100x | 5-10x | N/A |
Python 全局解释器锁 (GIL) 阻止线程中 Python 字节码的真正并行执行,使得线程对于 CPU 密集型任务效果较差。Asyncio 通过使用具有协作式多任务的单线程来规避这一限制,而多进程则通过独立进程完全绕过它。
async def 和 await 关键字
asyncio 的基础建立在两个关键字上:async 和 await。
async def 关键字定义一个协程函数。当你调用协程函数时,它不会立即执行。相反,它返回一个可以被等待的协程对象。
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"status": "success"}
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine)) # <class 'coroutine'>
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine) # This would execute the coroutineawait 关键字暂停协程的执行,直到被等待的操作完成。在这个暂停期间,事件循环可以运行其他协程。你只能在 async def 函数内部使用 await。
async def process_user(user_id):
# Await an I/O operation
user_data = await fetch_user_from_database(user_id)
# Await another I/O operation
user_profile = await fetch_user_profile(user_id)
# Regular synchronous code runs normally
processed_data = transform_data(user_data, user_profile)
return processed_dataasync/await 的关键规则:
- 你只能
await协程、任务或 futures - 你只能在
async def函数内部使用await - 普通函数不能使用
await - 调用异步函数而不等待它会创建一个协程对象,但不执行代码
常见错误:
async def wrong_example():
# This creates a coroutine but doesn't execute it!
fetch_data() # Missing await
async def correct_example():
# This actually executes the coroutine
result = await fetch_data()asyncio.run() 入口点
asyncio.run() 函数是启动 asyncio 事件循环并运行主协程的标准方式。它在 Python 3.7 中引入,简化了从同步上下文运行异步代码的过程。
import asyncio
async def main():
print("Starting async operations")
await asyncio.sleep(1)
print("Finished")
# Run the main coroutine
asyncio.run(main())asyncio.run() 在后台执行的操作:
- 创建新的事件循环
- 运行提供的协程直到完成
- 关闭事件循环
- 返回协程的结果
import asyncio
async def main():
result = await compute_value()
return result
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)asyncio.run() 的重要特性:
- 不能从正在运行的事件循环内部调用:如果你已经在异步函数中,请改用
await - 每次都创建一个新的事件循环:除非你想要独立的事件循环实例,否则不要在同一个程序中多次调用
asyncio.run() - 总是关闭循环:执行后事件循环会被清理
对于 Jupyter notebook 或事件循环已在运行的环境,直接使用 await 或 asyncio.create_task()。像 RunCell (opens in a new tab) 这样的工具在 Jupyter 环境中提供增强的异步支持,使你无需事件循环冲突即可更轻松地交互式试验 asyncio 模式。
在 Python 3.7 之前,你必须手动管理事件循环:
# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# Modern way (Python 3.7+)
asyncio.run(main())协程、任务和 Futures
理解这三个核心概念对于掌握 asyncio 至关重要。
协程
协程是用 async def 定义的函数。它是一种特殊的函数,可以被暂停和恢复,允许在暂停期间运行其他代码。
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "completed"
# This creates a coroutine object
coro = my_coroutine()
# To run it
result = asyncio.run(coro)任务
任务是协程的包装器,用于在事件循环上调度执行。任务允许协程并发运行。
import asyncio
async def say_after(delay, message):
await asyncio.sleep(delay)
print(message)
return message
async def main():
# Create tasks to run concurrently
task1 = asyncio.create_task(say_after(1, "First"))
task2 = asyncio.create_task(say_after(2, "Second"))
# Wait for both tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())创建任务会立即将协程调度执行。事件循环会尽快开始运行它,甚至在你 await 该任务之前。
async def background_work():
print("Starting background work")
await asyncio.sleep(2)
print("Background work completed")
async def main():
# The coroutine starts running immediately
task = asyncio.create_task(background_work())
print("Task created, doing other work")
await asyncio.sleep(1)
print("Other work done")
# Wait for the background task to finish
await task
asyncio.run(main())输出:
Task created, doing other work
Starting background work
Other work done
Background work completedFutures
Future 是一个低级的可等待对象,表示异步操作的最终结果。你很少直接创建 futures;它们通常由 asyncio 内部或库创建。
async def set_future_result(future):
await asyncio.sleep(1)
future.set_result("Future completed")
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# Schedule a coroutine to set the future's result
asyncio.create_task(set_future_result(future))
# Wait for the future to get a result
result = await future
print(result)
asyncio.run(main())协程、任务和 futures 之间的关系:
- 协程是你编写的函数
- 任务包装协程并调度它们执行
- Futures 表示将在未来可用的结果
- 任务是 Future 的子类
用于并发执行的 asyncio.create_task()
asyncio.create_task() 函数是你使用 asyncio 实现真正并发的主要工具。它调度协程在事件循环上运行,而不会阻塞当前协程。
import asyncio
import time
async def download_file(file_id):
print(f"Starting download {file_id}")
await asyncio.sleep(2) # Simulate download time
print(f"Completed download {file_id}")
return f"file_{file_id}.dat"
async def sequential_downloads():
"""Downloads files one at a time"""
start = time.time()
file1 = await download_file(1)
file2 = await download_file(2)
file3 = await download_file(3)
elapsed = time.time() - start
print(f"Sequential: {elapsed:.2f} seconds")
# Output: ~6 seconds (2 + 2 + 2)
async def concurrent_downloads():
"""Downloads files concurrently"""
start = time.time()
# Create tasks for concurrent execution
task1 = asyncio.create_task(download_file(1))
task2 = asyncio.create_task(download_file(2))
task3 = asyncio.create_task(download_file(3))
# Wait for all tasks to complete
file1 = await task1
file2 = await task2
file3 = await task3
elapsed = time.time() - start
print(f"Concurrent: {elapsed:.2f} seconds")
# Output: ~2 seconds (all run at the same time)
asyncio.run(concurrent_downloads())调用 create_task() 时任务立即被调度。你不必立即等待它。
async def process_data():
# Start background tasks
task1 = asyncio.create_task(fetch_from_api_1())
task2 = asyncio.create_task(fetch_from_api_2())
# Do some work while tasks run in background
local_data = prepare_local_data()
# Now wait for the background tasks
api_data_1 = await task1
api_data_2 = await task2
# Combine all data
return combine_data(local_data, api_data_1, api_data_2)你还可以为任务命名以便更好地调试:
async def main():
task = asyncio.create_task(
long_running_operation(),
name="long-operation-task"
)
# Task name is accessible
print(f"Task name: {task.get_name()}")
await task用于运行多个协程的 asyncio.gather()
asyncio.gather() 函数并发运行多个协程并等待所有协程完成。当你需要运行多个协程时,它比创建单独的任务更简洁。
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
fetch_user(4),
fetch_user(5)
)
print(results)
# Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
asyncio.run(main())gather() 按照输入协程的顺序返回结果,无论哪个先完成。
async def task_with_delay(delay, value):
await asyncio.sleep(delay)
return value
async def main():
results = await asyncio.gather(
task_with_delay(3, "slow"),
task_with_delay(1, "fast"),
task_with_delay(2, "medium")
)
print(results)
# Output: ['slow', 'fast', 'medium'] (order preserved)
asyncio.run(main())使用 gather() 进行错误处理
默认情况下,如果任何协程引发异常,gather() 会立即引发该异常并取消剩余的任务。
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def successful_task():
await asyncio.sleep(2)
return "success"
async def main():
try:
results = await asyncio.gather(
failing_task(),
successful_task()
)
except ValueError as e:
print(f"Error caught: {e}")
# The successful_task is cancelled
asyncio.run(main())使用 return_exceptions=True 来收集异常以及成功的结果:
async def main():
results = await asyncio.gather(
failing_task(),
successful_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())使用列表推导式进行动态 gather
gather() 与动态生成的协程配合得非常好:
async def process_item(item):
await asyncio.sleep(1)
return item * 2
async def main():
items = [1, 2, 3, 4, 5]
# Process all items concurrently
results = await asyncio.gather(
*[process_item(item) for item in items]
)
print(results) # [2, 4, 6, 8, 10]
asyncio.run(main())asyncio.wait() 和 asyncio.as_completed()
虽然 gather() 等待所有协程完成,但 wait() 和 as_completed() 提供了更细粒度的控制,让你可以处理已完成的任务。
asyncio.wait()
wait() 允许你以不同的完成条件等待任务:所有任务、第一个任务或第一个异常。
import asyncio
async def task(delay, name):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
tasks = [
asyncio.create_task(task(1, "Task 1")),
asyncio.create_task(task(2, "Task 2")),
asyncio.create_task(task(3, "Task 3"))
]
# Wait for all tasks to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
print(task.result())
asyncio.run(main())不同的完成条件:
async def main():
tasks = [
asyncio.create_task(task(1, "Fast")),
asyncio.create_task(task(3, "Slow")),
asyncio.create_task(task(2, "Medium"))
]
# Return when the first task completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"First completed: {done.pop().result()}")
print(f"Still pending: {len(pending)} tasks")
# Cancel remaining tasks
for task in pending:
task.cancel()
asyncio.run(main())asyncio.as_completed()
as_completed() 返回一个迭代器,在任务完成时产出任务,允许你在结果可用时立即处理它们。
import asyncio
async def fetch_data(url_id, delay):
await asyncio.sleep(delay)
return f"Data from URL {url_id}"
async def main():
tasks = [
fetch_data(1, 3),
fetch_data(2, 1),
fetch_data(3, 2)
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Received: {result}")
asyncio.run(main())输出按完成顺序显示结果,而非提交顺序:
Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1当你想尽快向用户显示结果时,这特别有用:
async def search_engine(query, engine_name, delay):
await asyncio.sleep(delay)
return f"{engine_name}: Results for '{query}'"
async def main():
query = "python asyncio"
searches = [
search_engine(query, "Google", 1.5),
search_engine(query, "Bing", 2.0),
search_engine(query, "DuckDuckGo", 1.0)
]
print("Searching...")
for search in asyncio.as_completed(searches):
result = await search
print(result) # Display each result as soon as it arrives
asyncio.run(main())asyncio.sleep() 与 time.sleep()
这种区别对于异步代码的正确性至关重要。
time.sleep() 是一个阻塞操作,会暂停整个线程,包括事件循环。这会阻止所有异步任务运行。
asyncio.sleep() 是一个非阻塞协程,只暂停当前任务,允许其他任务运行。
import asyncio
import time
async def blocking_example():
"""Bad: Uses time.sleep() - blocks the entire event loop"""
print("Starting blocking sleep")
time.sleep(2) # WRONG: Blocks everything!
print("Finished blocking sleep")
async def non_blocking_example():
"""Good: Uses asyncio.sleep() - allows other tasks to run"""
print("Starting non-blocking sleep")
await asyncio.sleep(2) # CORRECT: Only pauses this task
print("Finished non-blocking sleep")
async def concurrent_task():
for i in range(3):
print(f"Concurrent task running: {i}")
await asyncio.sleep(0.5)
async def demo_blocking():
print("\n=== Blocking example (BAD) ===")
await asyncio.gather(
blocking_example(),
concurrent_task()
)
async def demo_non_blocking():
print("\n=== Non-blocking example (GOOD) ===")
await asyncio.gather(
non_blocking_example(),
concurrent_task()
)
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())在阻塞示例中,并发任务直到 time.sleep() 完成后才运行。在非阻塞示例中,两个任务并发运行。
经验法则:绝不要在异步代码中使用 time.sleep()。始终使用 await asyncio.sleep()。
对于无法避免的 CPU 密集型操作,使用 loop.run_in_executor() 在单独的线程或进程中运行它们:
import asyncio
import time
def cpu_intensive_task():
"""Some blocking CPU work"""
time.sleep(2) # Simulate heavy computation
return "CPU task completed"
async def main():
loop = asyncio.get_event_loop()
# Run blocking task in a thread pool
result = await loop.run_in_executor(None, cpu_intensive_task)
print(result)
asyncio.run(main())async for 和 async with
Python 提供了 for 循环和上下文管理器的异步版本,用于处理异步可迭代对象和资源。
异步迭代器 (async for)
异步迭代器是实现 __aiter__() 和 __anext__() 方法的对象,允许你迭代需要通过异步操作获取的项目。
import asyncio
class AsyncRange:
"""An async iterator that yields numbers with delays"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
# Simulate async operation to get next value
await asyncio.sleep(0.5)
value = self.current
self.current += 1
return value
async def main():
async for number in AsyncRange(1, 5):
print(f"Got number: {number}")
asyncio.run(main())使用异步数据库游标的真实示例:
class AsyncDatabaseCursor:
"""Simulated async database cursor"""
def __init__(self, query):
self.query = query
self.results = []
self.index = 0
async def execute(self):
# Simulate database query
await asyncio.sleep(1)
self.results = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.results):
raise StopAsyncIteration
# Simulate fetching next row
await asyncio.sleep(0.1)
row = self.results[self.index]
self.index += 1
return row
async def fetch_users():
cursor = AsyncDatabaseCursor("SELECT * FROM users")
await cursor.execute()
async for row in cursor:
print(f"User: {row['name']}")
asyncio.run(fetch_users())异步上下文管理器 (async with)
异步上下文管理器实现 __aenter__() 和 __aexit__() 方法,用于管理需要异步设置和清理的资源。
import asyncio
class AsyncDatabaseConnection:
"""An async context manager for database connections"""
async def __aenter__(self):
print("Opening database connection...")
await asyncio.sleep(1) # Simulate connection time
print("Database connection opened")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.5) # Simulate cleanup
print("Database connection closed")
async def query(self, sql):
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
# Connection automatically closed after the with block
asyncio.run(main())将异步上下文管理器与异步迭代器结合:
class AsyncFileReader:
"""Async context manager and iterator for reading files"""
def __init__(self, filename):
self.filename = filename
self.lines = []
self.index = 0
async def __aenter__(self):
# Simulate async file opening
await asyncio.sleep(0.5)
self.lines = ["Line 1", "Line 2", "Line 3"]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.2)
self.lines = []
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.lines):
raise StopAsyncIteration
await asyncio.sleep(0.1)
line = self.lines[self.index]
self.index += 1
return line
async def read_file():
async with AsyncFileReader("data.txt") as reader:
async for line in reader:
print(line)
asyncio.run(read_file())用于生产者-消费者模式的 asyncio.Queue
asyncio.Queue 是一个线程安全的、支持异步的队列,非常适合在生产者和消费者协程之间协调工作。
import asyncio
import random
async def producer(queue, producer_id):
"""Produces items and puts them in the queue"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
async def consumer(queue, consumer_id):
"""Consumes items from the queue"""
while True:
item = await queue.get()
print(f"Consumer {consumer_id} consuming: {item}")
# Simulate processing time
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for the queue to be fully processed
await queue.join()
# Cancel consumers (they run forever)
for consumer_task in consumers:
consumer_task.cancel()
asyncio.run(main())真实示例:使用 URL 队列的网络爬虫:
import asyncio
from typing import Set
async def fetch_url(session, url):
"""Simulate fetching a URL"""
await asyncio.sleep(1)
return f"Content from {url}"
async def producer(queue: asyncio.Queue, start_urls: list):
"""Add URLs to the queue"""
for url in start_urls:
await queue.put(url)
async def consumer(queue: asyncio.Queue, visited: Set[str]):
"""Fetch URLs from the queue"""
while True:
url = await queue.get()
if url in visited:
queue.task_done()
continue
visited.add(url)
print(f"Scraping: {url}")
# Simulate fetching
content = await fetch_url(None, url)
# Simulate finding new URLs in the content
# In real scraper, you'd parse HTML and extract links
new_urls = [] # Would extract from content
for new_url in new_urls:
if new_url not in visited:
await queue.put(new_url)
queue.task_done()
async def main():
queue = asyncio.Queue()
visited = set()
start_urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
# Add initial URLs
await producer(queue, start_urls)
# Create consumers
consumers = [
asyncio.create_task(consumer(queue, visited))
for _ in range(3)
]
# Wait for all URLs to be processed
await queue.join()
# Cancel consumers
for consumer_task in consumers:
consumer_task.cancel()
print(f"Scraped {len(visited)} unique URLs")
asyncio.run(main())用于速率限制的 Semaphores
asyncio.Semaphore 控制可以同时访问资源的协程数量。这对于限制 API 调用速率或限制并发数据库连接至关重要。
import asyncio
import time
async def call_api(semaphore, api_id):
"""Make an API call with rate limiting"""
async with semaphore:
print(f"API call {api_id} started")
await asyncio.sleep(1) # Simulate API call
print(f"API call {api_id} completed")
return f"Result {api_id}"
async def main():
# Allow only 3 concurrent API calls
semaphore = asyncio.Semaphore(3)
start = time.time()
# Create 10 API calls
tasks = [call_api(semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
# With semaphore(3): ~4 seconds (10 calls in batches of 3)
# Without semaphore: ~1 second (all concurrent)
asyncio.run(main())用于 API 配额合规的速率限制:
import asyncio
from datetime import datetime
class RateLimiter:
"""Rate limiter that allows N requests per time period"""
def __init__(self, max_requests, time_period):
self.max_requests = max_requests
self.time_period = time_period
self.semaphore = asyncio.Semaphore(max_requests)
self.request_times = []
async def __aenter__(self):
await self.semaphore.acquire()
# Wait if we've hit the rate limit
while len(self.request_times) >= self.max_requests:
oldest = self.request_times[0]
elapsed = datetime.now().timestamp() - oldest
if elapsed < self.time_period:
sleep_time = self.time_period - elapsed
await asyncio.sleep(sleep_time)
self.request_times.pop(0)
self.request_times.append(datetime.now().timestamp())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
async def fetch_with_rate_limit(rate_limiter, item_id):
async with rate_limiter:
print(f"Fetching item {item_id} at {datetime.now()}")
await asyncio.sleep(0.5)
return f"Item {item_id}"
async def main():
# Allow 5 requests per 2 seconds
rate_limiter = RateLimiter(max_requests=5, time_period=2)
# Make 15 requests
tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(main())用于异步 HTTP 请求的 aiohttp
aiohttp 库提供异步 HTTP 客户端和服务器功能。它是在异步代码中发出 HTTP 请求的标准选择。
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch_url(session, "https://api.github.com")
print(f"Fetched {len(html)} characters")
asyncio.run(main())并发获取多个 URL:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Fetch URL and return status code and content length"""
async with session.get(url) as response:
content = await response.text()
return {
"url": url,
"status": response.status,
"length": len(content)
}
async def fetch_all_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
start = time.time()
results = await fetch_all_urls(urls)
elapsed = time.time() - start
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())带有错误处理和重试的实用示例:
import asyncio
import aiohttp
from typing import Optional
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Optional[dict]:
"""Fetch URL with retry logic"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return data
elif response.status == 404:
print(f"URL not found: {url}")
return None
else:
print(f"Unexpected status {response.status} for {url}")
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1} for {url}")
except aiohttp.ClientError as e:
print(f"Client error on attempt {attempt + 1} for {url}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
print(f"Failed to fetch {url} after {max_retries} attempts")
return None
async def main():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/nonexistent",
"https://httpbin.org/delay/5"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
asyncio.run(main())用于异步文件 I/O 的 aiofiles
aiofiles 库提供异步文件操作,防止在文件读写期间发生阻塞。
import asyncio
import aiofiles
async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
return contents
async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# Write data
await write_file('test.txt', 'Hello, async file I/O!')
# Read data
content = await read_file('test.txt')
print(f"Read: {content}")
asyncio.run(main())并发处理多个文件:
import asyncio
import aiofiles
async def process_file(input_file, output_file):
"""Read, process, and write a file"""
async with aiofiles.open(input_file, mode='r') as f:
content = await f.read()
# Process content (convert to uppercase)
processed = content.upper()
async with aiofiles.open(output_file, mode='w') as f:
await f.write(processed)
return f"Processed {input_file} -> {output_file}"
async def main():
files = [
('input1.txt', 'output1.txt'),
('input2.txt', 'output2.txt'),
('input3.txt', 'output3.txt')
]
tasks = [process_file(inp, out) for inp, out in files]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())逐行读取大文件:
import asyncio
import aiofiles
async def process_large_file(filename):
"""Process a large file line by line without loading it all into memory"""
line_count = 0
async with aiofiles.open(filename, mode='r') as f:
async for line in f:
# Process each line
line_count += 1
if line.strip():
# Do something with the line
pass
return line_count
async def main():
count = await process_large_file('large_data.txt')
print(f"Processed {count} lines")
asyncio.run(main())异步代码中的错误处理
异步代码中的错误处理需要特别注意,以确保异常被正确捕获并且资源被清理。
import asyncio
async def risky_operation(item_id):
"""An operation that might fail"""
await asyncio.sleep(1)
if item_id % 2 == 0:
raise ValueError(f"Item {item_id} caused an error")
return f"Item {item_id} processed"
async def handle_with_try_except():
"""Handle errors with try/except"""
try:
result = await risky_operation(2)
print(result)
except ValueError as e:
print(f"Error caught: {e}")
asyncio.run(handle_with_try_except())处理并发任务中的错误:
async def safe_operation(item_id):
"""Wrapper that catches errors from risky_operation"""
try:
result = await risky_operation(item_id)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e), "item_id": item_id}
async def main():
tasks = [safe_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks)
for result in results:
if result["success"]:
print(f"Success: {result['result']}")
else:
print(f"Failed: Item {result['item_id']} - {result['error']}")
asyncio.run(main())使用带有 return_exceptions=True 的 gather():
async def main():
tasks = [risky_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())使用异步上下文管理器进行清理:
class AsyncResource:
"""A resource that needs cleanup even if errors occur"""
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Cleaning up resource")
await asyncio.sleep(0.5)
if exc_type is not None:
print(f"Exception during resource use: {exc_val}")
# Return False to propagate exception, True to suppress
return False
async def do_work(self):
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
try:
async with AsyncResource() as resource:
await resource.do_work()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())性能基准测试:同步 vs 异步 I/O 密集型任务
让我们通过真实基准测试比较 I/O 密集型操作的同步和异步方法。
import asyncio
import time
import requests
import aiohttp
# Synchronous approach
def fetch_sync(url):
response = requests.get(url)
return len(response.text)
def benchmark_sync(urls):
start = time.time()
results = [fetch_sync(url) for url in urls]
elapsed = time.time() - start
return elapsed, results
# Asynchronous approach
async def fetch_async(session, url):
async with session.get(url) as response:
text = await response.text()
return len(text)
async def benchmark_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# Run benchmarks
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
print(f"\nSpeedup: {sync_time / async_time:.2f}x")对于每个有 1 秒延迟的 5 个 URL,典型结果如下:
- 同步:~5 秒(顺序执行)
- 异步:~1 秒(并发执行)
- 加速比:~5 倍
| URL 数量 | 同步时间 | 异步时间 | 加速比 |
|---|---|---|---|
| 5 | 5.2s | 1.1s | 4.7x |
| 10 | 10.4s | 1.2s | 8.7x |
| 20 | 20.8s | 1.4s | 14.9x |
| 50 | 52.1s | 2.1s | 24.8x |
| 100 | 104.5s | 3.8s | 27.5x |
加速比随着并发操作数量的增加而增加,直到达到带宽或速率限制约束。
常见陷阱以及如何避免它们
忘记 await
# WRONG: Coroutine is created but not executed
async def wrong():
result = fetch_data() # Missing await!
print(result) # Prints coroutine object, not the result
# CORRECT: Await the coroutine
async def correct():
result = await fetch_data()
print(result) # Prints actual result阻塞事件循环
# WRONG: Blocks the entire event loop
async def blocking_code():
time.sleep(5) # Blocks everything!
result = compute_something() # Blocks if CPU-intensive
return result
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
await asyncio.sleep(5) # Only pauses this task
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute_something)
return result不处理任务取消
# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
resource = await acquire_resource()
await long_operation()
await release_resource(resource) # May not execute if cancelled
# CORRECT: Use try/finally or async with
async def proper_cleanup():
resource = await acquire_resource()
try:
await long_operation()
finally:
await release_resource(resource) # Always executes创建事件循环冲突
# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
result = asyncio.run(some_coroutine()) # Error!
return result
# CORRECT: Just await the coroutine
async def correct_nesting():
result = await some_coroutine()
return result不限制并发
# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
tasks = [process_item(item) for item in items] # 10,000 tasks!
return await asyncio.gather(*tasks)
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)真实示例
带速率限制的网络爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_page(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
tasks = [self.fetch_page(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
async with AsyncWebScraper(max_concurrent=3) as scraper:
pages = await scraper.scrape_urls(urls)
for url, html in zip(urls, pages):
if html:
print(f"Scraped {url}: {len(html)} bytes")
asyncio.run(main())异步 API 数据管道
import asyncio
import aiohttp
async def fetch_user_ids(session):
"""Fetch list of user IDs from API"""
async with session.get("https://api.example.com/users") as response:
data = await response.json()
return [user["id"] for user in data["users"]]
async def fetch_user_details(session, user_id):
"""Fetch detailed info for a user"""
async with session.get(f"https://api.example.com/users/{user_id}") as response:
return await response.json()
async def fetch_user_posts(session, user_id):
"""Fetch posts for a user"""
async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
return await response.json()
async def process_user(session, user_id):
"""Fetch all data for a user concurrently"""
details, posts = await asyncio.gather(
fetch_user_details(session, user_id),
fetch_user_posts(session, user_id)
)
return {
"user": details,
"posts": posts,
"post_count": len(posts)
}
async def main():
async with aiohttp.ClientSession() as session:
# Fetch user IDs
user_ids = await fetch_user_ids(session)
# Process all users concurrently
user_data = await asyncio.gather(
*[process_user(session, uid) for uid in user_ids[:10]]
)
for data in user_data:
print(f"User {data['user']['name']}: {data['post_count']} posts")
asyncio.run(main())异步聊天服务器
import asyncio
class ChatServer:
def __init__(self):
self.clients = set()
async def handle_client(self, reader, writer):
"""Handle a single client connection"""
addr = writer.get_extra_info('peername')
print(f"Client connected: {addr}")
self.clients.add(writer)
try:
while True:
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Received from {addr}: {message}")
# Broadcast to all clients
await self.broadcast(f"{addr}: {message}", writer)
except asyncio.CancelledError:
pass
finally:
print(f"Client disconnected: {addr}")
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
async def broadcast(self, message, sender):
"""Send message to all clients except sender"""
for client in self.clients:
if client != sender:
try:
client.write(message.encode())
await client.drain()
except Exception as e:
print(f"Error broadcasting: {e}")
async def start(self, host='127.0.0.1', port=8888):
"""Start the chat server"""
server = await asyncio.start_server(
self.handle_client,
host,
port
)
addr = server.sockets[0].getsockname()
print(f"Chat server running on {addr}")
async with server:
await server.serve_forever()
async def main():
chat_server = ChatServer()
await chat_server.start()
asyncio.run(main())在 Jupyter 中试验 Asyncio
在 Jupyter notebook 中使用 asyncio 时,你可能会遇到事件循环冲突。Jupyter 已经运行着一个事件循环,这可能与 asyncio.run() 产生干扰。
为了在 Jupyter 环境中无缝进行异步试验,请考虑使用 RunCell (opens in a new tab),这是一个专为 Jupyter notebook 设计的 AI 代理。RunCell 自动处理事件循环管理并提供增强的异步调试功能,让你无需冲突即可交互式地测试 asyncio 模式。
在标准 Jupyter 中,你可以使用顶级 await:
# In Jupyter, this works directly
async def fetch_data():
await asyncio.sleep(1)
return "data"
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)或者使用 nest_asyncio 包来允许嵌套事件循环:
import nest_asyncio
nest_asyncio.apply()
# Now asyncio.run() works in Jupyter
asyncio.run(main())FAQ
asyncio 和 Python 中的 threading 有什么区别?
Asyncio 在单线程上使用协作式多任务,任务使用 await 自愿让出控制权。Threading 使用多个操作系统线程的抢占式多任务,操作系统决定何时在线程之间切换。Asyncio 对于 I/O 密集型任务更高效,内存开销更低且没有竞态条件风险,而 threading 可以处理不支持异步的阻塞库。两者都受 Python GIL 对 CPU 密集型工作的限制,但 asyncio 避免了线程上下文切换的开销。
什么时候应该使用 asyncio 而不是 multiprocessing?
对于 I/O 密集型任务(如 API 调用、数据库查询、文件操作和网络通信)使用 asyncio。对于 CPU 密集型任务(如数据处理、数学计算、图像处理和机器学习模型训练)使用 multiprocessing。Multiprocessing 创建绕过 Python GIL 的独立进程,实现在多个 CPU 核心上的真正并行执行。Asyncio 擅长以最小的资源开销处理数千个并发 I/O 操作,而 multiprocessing 受 CPU 核心数限制但提供实际的并行计算。
我可以在同一个应用中混合使用异步和同步代码吗?
可以,但需要仔细规划。你可以使用 asyncio.run() 从同步代码调用异步函数,但不能从已经运行的事件循环内部调用它。要从异步代码调用同步阻塞函数,请使用 loop.run_in_executor() 在线程池中运行它们,防止它们阻塞事件循环。绝不要在异步函数中直接使用阻塞操作,如 time.sleep()、requests.get() 或同步文件 I/O,因为它们会阻塞整个事件循环。相反,请使用异步等效物,如 asyncio.sleep()、aiohttp 和 aiofiles。
如何有效地调试 asyncio 代码?
使用 asyncio.run(main(), debug=True) 或设置环境变量 PYTHONASYNCIODEBUG=1 启用 asyncio 调试模式。这会检测常见错误,如忘记 await、耗时过长的回调以及从未被等待的协程。使用大量日志记录来跟踪执行流程,因为传统调试器可能会让异步代码变得混乱。使用 asyncio.create_task(coro(), name="task-name") 添加任务名称以获得更清晰的错误消息。使用 asyncio.gather(..., return_exceptions=True) 防止一个失败的任务隐藏其他任务中的错误。使用 asyncio.all_tasks() 监控你的事件循环,检查未完成的任务。
asyncio 的性能限制是什么?
Asyncio 可以在单线程上处理数万个并发 I/O 操作,远远超过线程限制。主要限制是 asyncio 对 CPU 密集型操作没有好处,因为它使用单线程。如果你用同步 I/O 或繁重计算阻塞事件循环,性能会下降。网络带宽和 API 速率限制会在达到 asyncio 并发限制之前成为瓶颈。内存使用量随着并发任务的数量而扩展,但每个任务与线程相比开销最小。为了获得最大性能,将 asyncio 用于 I/O 并发,将 multiprocessing 用于 CPU 密集型工作,并始终使用信号量将并发操作限制在合理水平。
结论
Python asyncio 将 I/O 密集型应用从缓慢、阻塞的操作转变为快速、并发的系统。通过掌握 async/await 语法、理解事件循环,以及利用 gather()、create_task() 和信号量等工具,你可以构建能够高效处理数千个并发操作的应用。
使用 asyncio 成功的关键是认识到它何时是正确的工具。将其用于网络请求、数据库查询、文件操作以及任何花费时间等待外部资源的任务。避免用同步操作阻塞事件循环,始终对协程使用 await,并在必要时使用信号量限制并发。
从将代码库的小部分转换为异步开始,测量性能改进,然后逐步扩展。I/O 密集型应用中显著的速度提升使学习投资物有所值。