Python asyncio: 非同期プログラミング完全ガイド
Updated on
あなたのPythonアプリケーションが100個のAPIコールを行い、それぞれ2秒かかるとします。従来の逐次処理コードでは、200秒もの待ち時間が発生します。ユーザーはローディング画面を見つめ、サーバーはレスポンスを待ちながらリソースを浪費し、アイドル状態になります。このブロッキング動作は、アプリケーションのパフォーマンスとユーザー体験を破壊する問題です。
スケールすると痛みは増します。データベースクエリが積み重なり、ファイル操作が互いの後ろにキューイングされ、Webスクレイパーはカタツムリのように動きます。すべてのI/O操作がボトルネックとなり、高速で応答性の高いはずのアプリケーションを、もたついたリソースを浪費する怪物に変えてしまいます。
Python asyncioは、I/Oバウンドなタスクの並行実行を可能にすることでこれを解決します。各操作が完了するのを待ってから次を開始するのではなく、asyncioを使用すると、複数の操作を開始し、待機中にそれらの間で切り替えることができます。あの100個のAPIコールはどうでしょうか?asyncioを使用すれば、200秒ではなくおおよそ2秒で完了します。このガイドでは、Pythonでの非同期プログラミングを実装する方法を、実用的な例とともに正確に示し、遅いブロッキングコードを高速で並行したアプリケーションに変換します。
非同期プログラミングとは何か、そしてなぜ重要なのか
非同期プログラミングは、プログラムが長時間実行される可能性のあるタスクを開始し、それらのタスクが完了する前に他の作業に進むことを可能にします。各タスクを開始する前に完了を待つのではなく、並行して実行できるのです。
従来の同期コードでは、APIリクエストを行うと、プログラムはレスポンスを待つために停止します。この待機期間中、CPUはアイドル状態になり、生産的な作業は行われません。これは単一の操作では許容可能ですが、複数のI/O操作を処理する必要があるアプリケーションには壊滅的です。
Asyncioはasync/await構文を使用して並行コードを書く方法を提供します。これは以下のようなI/Oバウンドな操作に特に有効です:
- APIへのHTTPリクエスト
- ファイルの読み書き
- データベースクエリ
- ネットワーク通信
- WebSocket接続
- メッセージキューの処理
パフォーマンスの向上は劇的です。50個の異なるURLからデータを取得することを考えてみましょう:
同期アプローチ: 50リクエスト × 各2秒 = 合計100秒 非同期アプローチ: 50リクエストを並行実行 ≈ 合計2秒
この50倍のパフォーマンス向上は、より良いリソース活用から来ています。I/O操作でブロッキングするのではなく、asyncioはI/Oが完了するのを待っている間にプログラムが他のタスクを継続実行できるようにします。
並行性 vs 並列性 vs 非同期
これらの概念の違いを理解することは、asyncioを効果的に使用するために不可欠です。
**並行性(Concurrency)**は、一度に複数のタスクを管理することを意味します。タスクは順番に進行しますが、任意の時点で実行されるのは1つだけです。複数の料理を準備するシェフを想像してください。各料理が調理を待っている間にタスクを切り替えます。
**並列性(Parallelism)**は、異なるCPUコア上で複数のタスクを同時に実行することを意味します。これには実際の並列処理ハードウェアが必要であり、数学的計算や画像処理などのCPUバウンドなタスクに最適です。
非同期プログラミングは、I/Oバウンドなタスクのために設計された並行性の特定の形態です。単一のスレッドを使用し、I/O操作を待っている間にタスクを切り替えます。
| 機能 | asyncio | スレッド | マルチプロセス |
|---|---|---|---|
| 実行モデル | 単一スレッド、協調的マルチタスク | 複数スレッド、プリエンプティブマルチタスク | 複数プロセス |
| 最適な用途 | I/Oバウンドタスク | ブロッキングライブラリを使用するI/Oバウンドタスク | CPUバウンドタスク |
| メモリオーバーヘッド | 最小限 | 中程度 | 高い |
| コンテキストスイッチングコスト | 非常に低い | 低〜中程度 | 高い |
| 複雑さ | 中程度(async/await構文) | 高い(競合状態、ロック) | 高い(IPC、シリアライゼーション) |
| GILの制限 | 影響なし(単一スレッド) | GILにより制限 | 制限なし(別プロセス) |
| I/Oに対する典型的な高速化 | 10-100倍 | 5-10倍 | 該当なし |
Pythonのグローバルインタプリタロック(GIL)は、スレッド内でのPythonバイトコードの真の並列実行を防ぎ、スレッドを使用したCPUバウンドなタスクの効果を低下させます。Asyncioは協調的マルチタスクを使用した単一スレッドを使用することでこの制限を回避し、マルチプロセスは別プロセスを使用することで完全にこれをバイパスします。
async def と await キーワード
Asyncioの基盤は2つのキーワードで構築されています:asyncとawaitです。
async defキーワードはコルーチン関数を定義します。コルーチン関数を呼び出しても、すぐには実行されません。代わりに、待機可能なコルーチンオブジェクトを返します。
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"status": "success"}
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine)) # <class 'coroutine'>
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine) # This would execute the coroutineawaitキーワードは、待機されている操作が完了するまでコルーチンの実行を一時停止します。この一時停止中、イベントループは他のコルーチンを実行できます。awaitはasync def関数内でのみ使用できます。
async def process_user(user_id):
# Await an I/O operation
user_data = await fetch_user_from_database(user_id)
# Await another I/O operation
user_profile = await fetch_user_profile(user_id)
# Regular synchronous code runs normally
processed_data = transform_data(user_data, user_profile)
return processed_dataasync/awaitの重要なルール:
- コルーチン、タスク、またはFutureのみを
awaitできます awaitはasync def関数内でのみ使用できます- 通常の関数は
awaitを使用できません awaitなしで非同期関数を呼び出すと、コルーチンオブジェクトは作成されますが、コードは実行されません
一般的なミス:
async def wrong_example():
# This creates a coroutine but doesn't execute it!
fetch_data() # Missing await
async def correct_example():
# This actually executes the coroutine
result = await fetch_data()asyncio.run() エントリーポイント
asyncio.run()関数は、asyncioイベントループを開始し、メインのコルーチンを実行する標準的な方法です。Python 3.7で導入され、同期コンテキストから非同期コードを実行するのを簡素化します。
import asyncio
async def main():
print("Starting async operations")
await asyncio.sleep(1)
print("Finished")
# Run the main coroutine
asyncio.run(main())asyncio.run()が裏で行うこと:
- 新しいイベントループを作成
- 提供されたコルーチンを完了まで実行
- イベントループを閉じる
- コルーチンの結果を返す
import asyncio
async def main():
result = await compute_value()
return result
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)asyncio.run()の重要な特性:
- 実行中のイベントループ内から呼び出せない: 既に非同期関数内にいる場合は、代わりに
awaitを使用してください - 毎回新しいイベントループを作成: 同じプログラムで複数回
asyncio.run()を呼び出さないでください。別々のイベントループインスタンスが必要な場合を除きます - 常にループを閉じる: 実行後、イベントループはクリーンアップされます
Jupyterノートブックやイベントループが既に実行されている環境では、awaitを直接使用するかasyncio.create_task()を使用してください。RunCell (opens in a new tab)のようなツールは、Jupyter環境での非同期サポートを強化し、イベントループの競合なしに非同期パターンを対話的に実験しやすくします。
Python 3.7より前は、イベントループを手動で管理する必要がありました:
# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# Modern way (Python 3.7+)
asyncio.run(main())コルーチン、タスク、およびFuture
これら3つのコア概念を理解することは、asyncioをマスターするために不可欠です。
コルーチン
コルーチンはasync defで定義される関数です。一時停止と再開が可能な特殊な関数で、一時停止中に他のコードを実行できるようにします。
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "completed"
# This creates a coroutine object
coro = my_coroutine()
# To run it
result = asyncio.run(coro)タスク
タスクは、イベントループ上で実行されるようにスケジュールされたコルーチンのラッパーです。タスクはコルーチンを並行して実行できるようにします。
import asyncio
async def say_after(delay, message):
await asyncio.sleep(delay)
print(message)
return message
async def main():
# Create tasks to run concurrently
task1 = asyncio.create_task(say_after(1, "First"))
task2 = asyncio.create_task(say_after(2, "Second"))
# Wait for both tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())タスクを作成すると、コルーチンはすぐに実行を開始するようにスケジュールされます。タスクをawaitする前でさえ、イベントループはできるだけ早くそれを実行し始めます。
async def background_work():
print("Starting background work")
await asyncio.sleep(2)
print("Background work completed")
async def main():
# The coroutine starts running immediately
task = asyncio.create_task(background_work())
print("Task created, doing other work")
await asyncio.sleep(1)
print("Other work done")
# Wait for the background task to finish
await task
asyncio.run(main())出力:
Task created, doing other work
Starting background work
Other work done
Background work completedFuture
Futureは、非同期操作の最終結果を表す低レベルの待機可能オブジェクトです。Futureを直接作成することはめったにありません。通常、asyncioの内部やライブラリによって作成されます。
async def set_future_result(future):
await asyncio.sleep(1)
future.set_result("Future completed")
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# Schedule a coroutine to set the future's result
asyncio.create_task(set_future_result(future))
# Wait for the future to get a result
result = await future
print(result)
asyncio.run(main())コルーチン、タスク、Futureの関係:
- コルーチンはあなたが書く関数です
- タスクはコルーチンをラップし、実行をスケジュールします
- Futureは将来利用可能になる結果を表します
- タスクはFutureのサブクラスです
並行実行のための asyncio.create_task()
asyncio.create_task()関数は、asyncioで真の並行性を実現するための主要なツールです。現在のコルーチンをブロックすることなく、コルーチンをイベントループで実行するようにスケジュールします。
import asyncio
import time
async def download_file(file_id):
print(f"Starting download {file_id}")
await asyncio.sleep(2) # Simulate download time
print(f"Completed download {file_id}")
return f"file_{file_id}.dat"
async def sequential_downloads():
"""Downloads files one at a time"""
start = time.time()
file1 = await download_file(1)
file2 = await download_file(2)
file3 = await download_file(3)
elapsed = time.time() - start
print(f"Sequential: {elapsed:.2f} seconds")
# Output: ~6 seconds (2 + 2 + 2)
async def concurrent_downloads():
"""Downloads files concurrently"""
start = time.time()
# Create tasks for concurrent execution
task1 = asyncio.create_task(download_file(1))
task2 = asyncio.create_task(download_file(2))
task3 = asyncio.create_task(download_file(3))
# Wait for all tasks to complete
file1 = await task1
file2 = await task2
file3 = await task3
elapsed = time.time() - start
print(f"Concurrent: {elapsed:.2f} seconds")
# Output: ~2 seconds (all run at the same time)
asyncio.run(concurrent_downloads())タスクはcreate_task()が呼び出されたときにすぐにスケジュールされます。すぐにawaitする必要はありません。
async def process_data():
# Start background tasks
task1 = asyncio.create_task(fetch_from_api_1())
task2 = asyncio.create_task(fetch_from_api_2())
# Do some work while tasks run in background
local_data = prepare_local_data()
# Now wait for the background tasks
api_data_1 = await task1
api_data_2 = await task2
# Combine all data
return combine_data(local_data, api_data_1, api_data_2)タスクに名前を付けてデバッグを容易にすることもできます:
async def main():
task = asyncio.create_task(
long_running_operation(),
name="long-operation-task"
)
# Task name is accessible
print(f"Task name: {task.get_name()}")
await task複数のコルーチンを実行するための asyncio.gather()
asyncio.gather()関数は、複数のコルーチンを並行して実行し、すべてが完了するのを待ちます。多くのコルーチンを実行する必要がある場合、個別のタスクを作成するよりもクリーンです。
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
fetch_user(4),
fetch_user(5)
)
print(results)
# Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
asyncio.run(main())gather()は、どれが先に完了したかに関係なく、入力コルーチンと同じ順序で結果を返します。
async def task_with_delay(delay, value):
await asyncio.sleep(delay)
return value
async def main():
results = await asyncio.gather(
task_with_delay(3, "slow"),
task_with_delay(1, "fast"),
task_with_delay(2, "medium")
)
print(results)
# Output: ['slow', 'fast', 'medium'] (order preserved)
asyncio.run(main())gather()でのエラーハンドリング
デフォルトでは、いずれかのコルーチンが例外を発生させると、gather()はすぐにその例外を発生させ、残りのタスクをキャンセルします。
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def successful_task():
await asyncio.sleep(2)
return "success"
async def main():
try:
results = await asyncio.gather(
failing_task(),
successful_task()
)
except ValueError as e:
print(f"Error caught: {e}")
# The successful_task is cancelled
asyncio.run(main())成功した結果とともに例外を収集するには、return_exceptions=Trueを使用します:
async def main():
results = await asyncio.gather(
failing_task(),
successful_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())リスト内包表記を使用した動的なgather
gather()は動的に生成されたコルーチンと完璧に動作します:
async def process_item(item):
await asyncio.sleep(1)
return item * 2
async def main():
items = [1, 2, 3, 4, 5]
# Process all items concurrently
results = await asyncio.gather(
*[process_item(item) for item in items]
)
print(results) # [2, 4, 6, 8, 10]
asyncio.run(main())asyncio.wait() と asyncio.as_completed()
gather()がすべてのコルーチンの完了を待つ間、wait()とas_completed()は完了したタスクを処理する方法についてより細かな制御を提供します。
asyncio.wait()
wait()は、タスクがすべて完了する、最初のタスクが完了する、または最初の例外が発生するなど、異なる完了条件でタスクを待つことを可能にします。
import asyncio
async def task(delay, name):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
tasks = [
asyncio.create_task(task(1, "Task 1")),
asyncio.create_task(task(2, "Task 2")),
asyncio.create_task(task(3, "Task 3"))
]
# Wait for all tasks to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
print(task.result())
asyncio.run(main())異なる完了条件:
async def main():
tasks = [
asyncio.create_task(task(1, "Fast")),
asyncio.create_task(task(3, "Slow")),
asyncio.create_task(task(2, "Medium"))
]
# Return when the first task completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"First completed: {done.pop().result()}")
print(f"Still pending: {len(pending)} tasks")
# Cancel remaining tasks
for task in pending:
task.cancel()
asyncio.run(main())asyncio.as_completed()
as_completed()は、タスクが完了する順にyieldするイテレータを返し、結果が利用可能になったらすぐに処理できるようにします。
import asyncio
async def fetch_data(url_id, delay):
await asyncio.sleep(delay)
return f"Data from URL {url_id}"
async def main():
tasks = [
fetch_data(1, 3),
fetch_data(2, 1),
fetch_data(3, 2)
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Received: {result}")
asyncio.run(main())出力は送信順ではなく完了順に結果を表示します:
Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1これは、できるだけ早くユーザーに結果を表示したい場合に特に有用です:
async def search_engine(query, engine_name, delay):
await asyncio.sleep(delay)
return f"{engine_name}: Results for '{query}'"
async def main():
query = "python asyncio"
searches = [
search_engine(query, "Google", 1.5),
search_engine(query, "Bing", 2.0),
search_engine(query, "DuckDuckGo", 1.0)
]
print("Searching...")
for search in asyncio.as_completed(searches):
result = await search
print(result) # Display each result as soon as it arrives
asyncio.run(main())asyncio.sleep() vs time.sleep()
この区別は非同期コードの正確性にとって重要です。
time.sleep()は、イベントループを含むスレッド全体を一時停止するブロッキング操作です。これによりすべての非同期タスクの実行が防止されます。
asyncio.sleep()は、現在のタスクのみを一時停止し、他のタスクを実行できるようにする非ブロッキングなコルーチンです。
import asyncio
import time
async def blocking_example():
"""Bad: Uses time.sleep() - blocks the entire event loop"""
print("Starting blocking sleep")
time.sleep(2) # WRONG: Blocks everything!
print("Finished blocking sleep")
async def non_blocking_example():
"""Good: Uses asyncio.sleep() - allows other tasks to run"""
print("Starting non-blocking sleep")
await asyncio.sleep(2) # CORRECT: Only pauses this task
print("Finished non-blocking sleep")
async def concurrent_task():
for i in range(3):
print(f"Concurrent task running: {i}")
await asyncio.sleep(0.5)
async def demo_blocking():
print("\n=== Blocking example (BAD) ===")
await asyncio.gather(
blocking_example(),
concurrent_task()
)
async def demo_non_blocking():
print("\n=== Non-blocking example (GOOD) ===")
await asyncio.gather(
non_blocking_example(),
concurrent_task()
)
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())ブロッキングの例では、time.sleep()が完了するまで並行タスクは実行されません。非ブロッキングの例では、両方のタスクが並行して実行されます。
経験則: 非同期コードではtime.sleep()を使用しないでください。常にawait asyncio.sleep()を使用してください。
回避できないCPUバウンドな操作の場合は、別スレッドまたはプロセスで実行するためにloop.run_in_executor()を使用してください:
import asyncio
import time
def cpu_intensive_task():
"""Some blocking CPU work"""
time.sleep(2) # Simulate heavy computation
return "CPU task completed"
async def main():
loop = asyncio.get_event_loop()
# Run blocking task in a thread pool
result = await loop.run_in_executor(None, cpu_intensive_task)
print(result)
asyncio.run(main())async for と async with
Pythonは、非同期イテレータおよびリソースを扱うためのforループとコンテキストマネージャの非同期バージョンを提供しています。
非同期イテレータ(async for)
非同期イテレータは、__aiter__()および__anext__()メソッドを実装し、非同期操作を使用してアイテムをフェッチする必要があるものをイテレートできるようにするオブジェクトです。
import asyncio
class AsyncRange:
"""An async iterator that yields numbers with delays"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
# Simulate async operation to get next value
await asyncio.sleep(0.5)
value = self.current
self.current += 1
return value
async def main():
async for number in AsyncRange(1, 5):
print(f"Got number: {number}")
asyncio.run(main())非同期データベースカーソルを使用した実世界の例:
class AsyncDatabaseCursor:
"""Simulated async database cursor"""
def __init__(self, query):
self.query = query
self.results = []
self.index = 0
async def execute(self):
# Simulate database query
await asyncio.sleep(1)
self.results = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.results):
raise StopAsyncIteration
# Simulate fetching next row
await asyncio.sleep(0.1)
row = self.results[self.index]
self.index += 1
return row
async def fetch_users():
cursor = AsyncDatabaseCursor("SELECT * FROM users")
await cursor.execute()
async for row in cursor:
print(f"User: {row['name']}")
asyncio.run(fetch_users())非同期コンテキストマネージャ(async with)
非同期コンテキストマネージャは、非同期セットアップとクリーンアップを必要とするリソースを管理するための__aenter__()および__aexit__()メソッドを実装します。
import asyncio
class AsyncDatabaseConnection:
"""An async context manager for database connections"""
async def __aenter__(self):
print("Opening database connection...")
await asyncio.sleep(1) # Simulate connection time
print("Database connection opened")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.5) # Simulate cleanup
print("Database connection closed")
async def query(self, sql):
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
# Connection automatically closed after the with block
asyncio.run(main())非同期イテレータと非同期コンテキストマネージャの組み合わせ:
class AsyncFileReader:
"""Async context manager and iterator for reading files"""
def __init__(self, filename):
self.filename = filename
self.lines = []
self.index = 0
async def __aenter__(self):
# Simulate async file opening
await asyncio.sleep(0.5)
self.lines = ["Line 1", "Line 2", "Line 3"]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.2)
self.lines = []
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.lines):
raise StopAsyncIteration
await asyncio.sleep(0.1)
line = self.lines[self.index]
self.index += 1
return line
async def read_file():
async with AsyncFileReader("data.txt") as reader:
async for line in reader:
print(line)
asyncio.run(read_file())プロデューサー・コンシューマーパターンのための asyncio.Queue
asyncio.Queueは、プロデューサーとコンシューマーのコルーチン間の作業を調整するのに最適な、スレッドセーフで非同期対応のキューです。
import asyncio
import random
async def producer(queue, producer_id):
"""Produces items and puts them in the queue"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
async def consumer(queue, consumer_id):
"""Consumes items from the queue"""
while True:
item = await queue.get()
print(f"Consumer {consumer_id} consuming: {item}")
# Simulate processing time
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for the queue to be fully processed
await queue.join()
# Cancel consumers (they run forever)
for consumer_task in consumers:
consumer_task.cancel()
asyncio.run(main())実世界の例:URLキューを使用したWebスクレイパー:
import asyncio
from typing import Set
async def fetch_url(session, url):
"""Simulate fetching a URL"""
await asyncio.sleep(1)
return f"Content from {url}"
async def producer(queue: asyncio.Queue, start_urls: list):
"""Add URLs to the queue"""
for url in start_urls:
await queue.put(url)
async def consumer(queue: asyncio.Queue, visited: Set[str]):
"""Fetch URLs from the queue"""
while True:
url = await queue.get()
if url in visited:
queue.task_done()
continue
visited.add(url)
print(f"Scraping: {url}")
# Simulate fetching
content = await fetch_url(None, url)
# Simulate finding new URLs in the content
# In real scraper, you'd parse HTML and extract links
new_urls = [] # Would extract from content
for new_url in new_urls:
if new_url not in visited:
await queue.put(new_url)
queue.task_done()
async def main():
queue = asyncio.Queue()
visited = set()
start_urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
# Add initial URLs
await producer(queue, start_urls)
# Create consumers
consumers = [
asyncio.create_task(consumer(queue, visited))
for _ in range(3)
]
# Wait for all URLs to be processed
await queue.join()
# Cancel consumers
for consumer_task in consumers:
consumer_task.cancel()
print(f"Scraped {len(visited)} unique URLs")
asyncio.run(main())レート制限のためのセマフォ
asyncio.Semaphoreは、同時にリソースにアクセスできるコルーチンの数を制御します。これはAPIコールのレート制限や、並行データベース接続の制限に不可欠です。
import asyncio
import time
async def call_api(semaphore, api_id):
"""Make an API call with rate limiting"""
async with semaphore:
print(f"API call {api_id} started")
await asyncio.sleep(1) # Simulate API call
print(f"API call {api_id} completed")
return f"Result {api_id}"
async def main():
# Allow only 3 concurrent API calls
semaphore = asyncio.Semaphore(3)
start = time.time()
# Create 10 API calls
tasks = [call_api(semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
# With semaphore(3): ~4 seconds (10 calls in batches of 3)
# Without semaphore: ~1 second (all concurrent)
asyncio.run(main())APIクォータ遵守のためのレート制限:
import asyncio
from datetime import datetime
class RateLimiter:
"""Rate limiter that allows N requests per time period"""
def __init__(self, max_requests, time_period):
self.max_requests = max_requests
self.time_period = time_period
self.semaphore = asyncio.Semaphore(max_requests)
self.request_times = []
async def __aenter__(self):
await self.semaphore.acquire()
# Wait if we've hit the rate limit
while len(self.request_times) >= self.max_requests:
oldest = self.request_times[0]
elapsed = datetime.now().timestamp() - oldest
if elapsed < self.time_period:
sleep_time = self.time_period - elapsed
await asyncio.sleep(sleep_time)
self.request_times.pop(0)
self.request_times.append(datetime.now().timestamp())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
async def fetch_with_rate_limit(rate_limiter, item_id):
async with rate_limiter:
print(f"Fetching item {item_id} at {datetime.now()}")
await asyncio.sleep(0.5)
return f"Item {item_id}"
async def main():
# Allow 5 requests per 2 seconds
rate_limiter = RateLimiter(max_requests=5, time_period=2)
# Make 15 requests
tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(main())非同期HTTPリクエストのための aiohttp
aiohttpライブラリは非同期HTTPクライアントおよびサーバー機能を提供します。これは非同期コードでHTTPリクエストを行うための標準的な選択肢です。
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch_url(session, "https://api.github.com")
print(f"Fetched {len(html)} characters")
asyncio.run(main())複数のURLを並行して取得:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Fetch URL and return status code and content length"""
async with session.get(url) as response:
content = await response.text()
return {
"url": url,
"status": response.status,
"length": len(content)
}
async def fetch_all_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
start = time.time()
results = await fetch_all_urls(urls)
elapsed = time.time() - start
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())エラーハンドリングとリトライを含む実用的な例:
import asyncio
import aiohttp
from typing import Optional
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Optional[dict]:
"""Fetch URL with retry logic"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return data
elif response.status == 404:
print(f"URL not found: {url}")
return None
else:
print(f"Unexpected status {response.status} for {url}")
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1} for {url}")
except aiohttp.ClientError as e:
print(f"Client error on attempt {attempt + 1} for {url}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
print(f"Failed to fetch {url} after {max_retries} attempts")
return None
async def main():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/nonexistent",
"https://httpbin.org/delay/5"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
asyncio.run(main())非同期ファイルI/Oのための aiofiles
aiofilesライブラリは、ファイルの読み書き中のブロックを防ぐ非同期ファイル操作を提供します。
import asyncio
import aiofiles
async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
return contents
async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# Write data
await write_file('test.txt', 'Hello, async file I/O!')
# Read data
content = await read_file('test.txt')
print(f"Read: {content}")
asyncio.run(main())複数のファイルを並行して処理:
import asyncio
import aiofiles
async def process_file(input_file, output_file):
"""Read, process, and write a file"""
async with aiofiles.open(input_file, mode='r') as f:
content = await f.read()
# Process content (convert to uppercase)
processed = content.upper()
async with aiofiles.open(output_file, mode='w') as f:
await f.write(processed)
return f"Processed {input_file} -> {output_file}"
async def main():
files = [
('input1.txt', 'output1.txt'),
('input2.txt', 'output2.txt'),
('input3.txt', 'output3.txt')
]
tasks = [process_file(inp, out) for inp, out in files]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())大きなファイルを行ごとに読み込む:
import asyncio
import aiofiles
async def process_large_file(filename):
"""Process a large file line by line without loading it all into memory"""
line_count = 0
async with aiofiles.open(filename, mode='r') as f:
async for line in f:
# Process each line
line_count += 1
if line.strip():
# Do something with the line
pass
return line_count
async def main():
count = await process_large_file('large_data.txt')
print(f"Processed {count} lines")
asyncio.run(main())非同期コードでのエラーハンドリング
非同期コードでのエラーハンドリングは、例外が適切にキャッチされ、リソースがクリーンアップされるように特別な注意を必要とします。
import asyncio
async def risky_operation(item_id):
"""An operation that might fail"""
await asyncio.sleep(1)
if item_id % 2 == 0:
raise ValueError(f"Item {item_id} caused an error")
return f"Item {item_id} processed"
async def handle_with_try_except():
"""Handle errors with try/except"""
try:
result = await risky_operation(2)
print(result)
except ValueError as e:
print(f"Error caught: {e}")
asyncio.run(handle_with_try_except())並行タスクでのエラーハンドリング:
async def safe_operation(item_id):
"""Wrapper that catches errors from risky_operation"""
try:
result = await risky_operation(item_id)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e), "item_id": item_id}
async def main():
tasks = [safe_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks)
for result in results:
if result["success"]:
print(f"Success: {result['result']}")
else:
print(f"Failed: Item {result['item_id']} - {result['error']}")
asyncio.run(main())return_exceptions=Trueを使用したgather()の使用:
async def main():
tasks = [risky_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())非同期コンテキストマネージャでのクリーンアップ:
class AsyncResource:
"""A resource that needs cleanup even if errors occur"""
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Cleaning up resource")
await asyncio.sleep(0.5)
if exc_type is not None:
print(f"Exception during resource use: {exc_val}")
# Return False to propagate exception, True to suppress
return False
async def do_work(self):
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
try:
async with AsyncResource() as resource:
await resource.do_work()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())パフォーマンスベンチマーク: I/Oバウンドタスクの同期 vs 非同期
実際のベンチマークを使用して、I/Oバウンド操作における同期アプローチと非同期アプローチを比較しましょう。
import asyncio
import time
import requests
import aiohttp
# Synchronous approach
def fetch_sync(url):
response = requests.get(url)
return len(response.text)
def benchmark_sync(urls):
start = time.time()
results = [fetch_sync(url) for url in urls]
elapsed = time.time() - start
return elapsed, results
# Asynchronous approach
async def fetch_async(session, url):
async with session.get(url) as response:
text = await response.text()
return len(text)
async def benchmark_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# Run benchmarks
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
print(f"\nSpeedup: {sync_time / async_time:.2f}x")それぞれ2秒の遅延がある5個のURLに対する典型的な結果:
- 同期処理:約5秒(逐次実行)
- 非同期処理:約1秒(並行実行)
- 高速化:約5倍
| URL数 | 同期時間 | 非同期時間 | 高速化 |
|---|---|---|---|
| 5 | 5.2s | 1.1s | 4.7倍 |
| 10 | 10.4s | 1.2s | 8.7倍 |
| 20 | 20.8s | 1.4s | 14.9倍 |
| 50 | 52.1s | 2.1s | 24.8倍 |
| 100 | 104.5s | 3.8s | 27.5倍 |
高速化は、帯域幅またはレート制限の制約に達するまで、並行操作の数とともに増加します。
一般的な落とし穴とその回避方法
awaitし忘れ
# WRONG: Coroutine is created but not executed
async def wrong():
result = fetch_data() # Missing await!
print(result) # Prints coroutine object, not the result
# CORRECT: Await the coroutine
async def correct():
result = await fetch_data()
print(result) # Prints actual resultイベントループのブロック
# WRONG: Blocks the entire event loop
async def blocking_code():
time.sleep(5) # Blocks everything!
result = compute_something() # Blocks if CPU-intensive
return result
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
await asyncio.sleep(5) # Only pauses this task
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute_something)
return resultタスクキャンセレーションの処理不足
# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
resource = await acquire_resource()
await long_operation()
await release_resource(resource) # May not execute if cancelled
# CORRECT: Use try/finally or async with
async def proper_cleanup():
resource = await acquire_resource()
try:
await long_operation()
finally:
await release_resource(resource) # Always executesイベントループの競合の作成
# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
result = asyncio.run(some_coroutine()) # Error!
return result
# CORRECT: Just await the coroutine
async def correct_nesting():
result = await some_coroutine()
return result並行性の制限不足
# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
tasks = [process_item(item) for item in items] # 10,000 tasks!
return await asyncio.gather(*tasks)
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)実世界の例
レート制限付きWebスクレイピング
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_page(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
tasks = [self.fetch_page(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
async with AsyncWebScraper(max_concurrent=3) as scraper:
pages = await scraper.scrape_urls(urls)
for url, html in zip(urls, pages):
if html:
print(f"Scraped {url}: {len(html)} bytes")
asyncio.run(main())非同期APIデータパイプライン
import asyncio
import aiohttp
async def fetch_user_ids(session):
"""Fetch list of user IDs from API"""
async with session.get("https://api.example.com/users") as response:
data = await response.json()
return [user["id"] for user in data["users"]]
async def fetch_user_details(session, user_id):
"""Fetch detailed info for a user"""
async with session.get(f"https://api.example.com/users/{user_id}") as response:
return await response.json()
async def fetch_user_posts(session, user_id):
"""Fetch posts for a user"""
async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
return await response.json()
async def process_user(session, user_id):
"""Fetch all data for a user concurrently"""
details, posts = await asyncio.gather(
fetch_user_details(session, user_id),
fetch_user_posts(session, user_id)
)
return {
"user": details,
"posts": posts,
"post_count": len(posts)
}
async def main():
async with aiohttp.ClientSession() as session:
# Fetch user IDs
user_ids = await fetch_user_ids(session)
# Process all users concurrently
user_data = await asyncio.gather(
*[process_user(session, uid) for uid in user_ids[:10]]
)
for data in user_data:
print(f"User {data['user']['name']}: {data['post_count']} posts")
asyncio.run(main())非同期チャットサーバー
import asyncio
class ChatServer:
def __init__(self):
self.clients = set()
async def handle_client(self, reader, writer):
"""Handle a single client connection"""
addr = writer.get_extra_info('peername')
print(f"Client connected: {addr}")
self.clients.add(writer)
try:
while True:
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Received from {addr}: {message}")
# Broadcast to all clients
await self.broadcast(f"{addr}: {message}", writer)
except asyncio.CancelledError:
pass
finally:
print(f"Client disconnected: {addr}")
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
async def broadcast(self, message, sender):
"""Send message to all clients except sender"""
for client in self.clients:
if client != sender:
try:
client.write(message.encode())
await client.drain()
except Exception as e:
print(f"Error broadcasting: {e}")
async def start(self, host='127.0.0.1', port=8888):
"""Start the chat server"""
server = await asyncio.start_server(
self.handle_client,
host,
port
)
addr = server.sockets[0].getsockname()
print(f"Chat server running on {addr}")
async with server:
await server.serve_forever()
async def main():
chat_server = ChatServer()
await chat_server.start()
asyncio.run(main())JupyterでのAsyncioの実験
Jupyterノートブックでasyncioを使用する際、イベントループの競合が発生する可能性があります。Jupyterは既にイベントループを実行しており、これがasyncio.run()と干渉することがあります。
Jupyter環境でのシームレスな非同期実験のために、RunCell (opens in a new tab)の使用を検討してください。RunCellは、Jupyterノートブック専用に設計されたAIエージェントで、イベントループ管理を自動的に処理し、強化された非同期デバッグ機能を提供し、競合なしに非同期パターンを対話的にテストできるようにします。
標準のJupyterでは、トップレベルのawaitを使用できます:
# In Jupyter, this works directly
async def fetch_data():
await asyncio.sleep(1)
return "data"
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)または、ネストされたイベントループを許可するためにnest_asyncioパッケージを使用します:
import nest_asyncio
nest_asyncio.apply()
# Now asyncio.run() works in Jupyter
asyncio.run(main())よくある質問
Pythonでのasyncioとスレッドの違いは何ですか?
Asyncioは単一スレッド上で協調的マルチタスクを使用し、タスクはawaitを使用して自発的に制御を譲ります。スレッドは、OSがスレッド間の切り替えを決定するプリエンプティブマルティタスクを複数のOSスレッドで使用します。Asyncioはメモリオーバーヘッドが少なく競合状態のリスクがないため、I/Oバウンドなタスクに効率的で、スレッドは非同期をサポートしないブロッキングライブラリを処理できます。両方ともCPUバウンドな作業についてはPythonのGILによって制限されますが、asyncioはスレッドコンテキストスイッチングのオーバーヘッドを回避します。
いつasyncioの代わりにマルチプロセスを使用すべきですか?
APIコール、データベースクエリ、ファイル操作、ネットワーク通信などのI/Oバウンドなタスクにはasyncioを使用してください。データ処理、数学的計算、画像操作、機械学習モデルのトレーニングなどのCPUバウンドなタスクにはマルチプロセスを使用してください。Asyncioは最小限のリソースオーバーヘッドで数千の並行I/O操作を処理することで優れており、マルチプロセスは複数のCPUコアで実際の並列計算を提供します。
同じアプリケーションで非同期コードと同期コードを混在させることはできますか?
はい、ただし慎重な計画が必要です。asyncio.run()を使用して同期コードから非同期関数を呼び出すことができますが、既に実行中のイベントループ内からは呼び出せません。同期ブロッキング関数を非同期コードから呼び出すには、スレッドプールで実行するためにloop.run_in_executor()を使用し、イベントループをブロックしないようにします。time.sleep()や同期的なファイルI/Oなどのブロッキング操作を非同期関数内で直接使用しないでください。これらはイベントループ全体をブロックします。代わりにasyncio.sleep()、aiohttp、aiofilesなどの非同期等価物を使用してください。
非同期コードを効果的にデバッグするにはどうすればよいですか?
asyncio.run(main(), debug=True)または環境変数PYTHONASYNCIODEBUG=1を設定してasyncioデバッグモードを有効にします。これにより、awaitし忘れやawaitされなかったコルーチンなどの一般的なミスが検出されます。従来のデバッガーは非同期コードでは混乱する可能性があるため、実行フローをトレースするために広範なロギングを使用してください。より明確なエラーメッセージのためにタスク名を追加し、他のタスクのエラーを隠さないためにasyncio.gather(..., return_exceptions=True)を使用します。タスクが完了しないかどうかを確認するためにasyncio.all_tasks()でイベントループを監視します。
asyncioのパフォーマンス制限は何ですか?
Asyncioは単一スレッドで数万の並行I/O操作を処理できます。主な制約は、単一スレッドを使用するため、asyncioはCPUバウンドな操作にはメリットを提供しないことです。同期I/Oや重い計算でイベントループをブロックすると、パフォーマンスが低下します。ネットワーク帯域幅とAPIレート制限が、asyncioの並行性制限の前にボトルネックになります。メモリ使用量は並行タスクの数に比例しますが、各タスクのオーバーヘッドはスレッドに比べて最小限です。最大パフォーマンスを得るには、I/O並行性にはasyncioを、CPUバウンドな作業にはマルチプロセスを組み合わせ、常にセマフォを使用して並行操作を適切なレベルに制限してください。
結論
Python asyncioは、I/Oバウンドなアプリケーションを遅いブロッキング操作から、高速で並行したシステムに変換します。async/await構文をマスターし、イベントループを理解し、gather()、create_task()、セマフォなどのツールを活用することで、数千の並行操作を効率的に処理できるアプリケーションを構築できます。
Asyncioでの成功の鍵は、それが適切なツールであることを認識することです。ネットワークリクエスト、データベースクエリ、ファイル操作、外部リソースを待つ時間がかかるタスクに使用してください。同期操作でイベントループをブロックしないようにし、常にコルーチンでawaitを使用し、必要に応じてセマフォで並行性を制限してください。
コードベースの小さなセクションを非同期に変換することから始め、パフォーマンスの改善を測定し、徐々に拡張してください。I/O負荷の高いアプリケーションでの劇的な速度向上は、学習への投資に値するものです。