Python asyncio: Kompletter Leitfaden zur asynchronen Programmierung
Updated on
Ihre Python-Anwendung führt 100 API-Aufrufe aus, jeder dauert 2 Sekunden. Mit traditionellem sequenziellem Code sind das 200 Sekunden Wartezeit. Ihre Nutzer starren auf Ladebildschirme. Ihre Server sitzen untätig herum, verbrauchen Ressourcen während sie auf Antworten warten. Dieses blockierende Verhalten ist das Problem, das die Anwendungsleistung und Benutzererfahrung zerstört.
Der Schmerz verstärkt sich, wenn Sie skalieren. Datenbankabfragen stapeln sich. Dateioperationen reihen sich hintereinander an. Web-Scraper kriechen im Schneckentempo. Jede I/O-Operation wird zu einem Engpass, der sich durch Ihr gesamtes System fortsetzt und aus dem, was eine schnelle, responsive Anwendung sein sollte, eine träge, ressourcenverschwendende Monsteranwendung macht.
Python asyncio löst dies, indem es die nebenläufige Ausführung von I/O-bound Tasks ermöglicht. Anstatt auf den Abschluss jeder Operation zu warten, bevor die nächste beginnt, erlaubt asyncio Ihrem Code, mehrere Operationen zu starten und zwischen ihnen zu wechseln, während sie warten. Diese 100 API-Aufrufe? Mit asyncio werden sie in etwa 2 Sekunden statt 200 abgeschlossen. Dieser Leitfaden zeigt Ihnen genau, wie Sie asynchrone Programmierung in Python implementieren, mit praktischen Beispielen, die langsamen, blockierenden Code in schnelle, nebenläufige Anwendungen transformieren.
Was ist asynchrone Programmierung und warum ist sie wichtig
Asynchrone Programmierung ermöglicht es einem Programm, potenziell langlaufende Tasks zu starten und zu anderen Arbeiten überzugehen, bevor diese Tasks abgeschlossen sind, anstatt auf den Abschluss jedes Tasks zu warten, bevor der nächste beginnt.
Im traditionellen synchronen Code stoppt Ihr Programm und wartet auf die Antwort, wenn Sie einen API-Request machen. Während dieser Wartezeit sitzt Ihre CPU untätig herum und macht nichts Produktives. Das ist für einzelne Operationen akzeptabel, aber katastrophal für Anwendungen, die mehrere I/O-Operationen verarbeiten müssen.
Asyncio bietet eine Möglichkeit, nebenläufigen Code mit der async/await-Syntax zu schreiben. Es ist besonders effektiv für I/O-bound Operationen wie:
- API-HTTP-Requests machen
- Dateien lesen und schreiben
- Datenbankabfragen
- Netzwerkkommunikation
- WebSocket-Verbindungen
- Message-Queue-Verarbeitung
Die Leistungsverbesserung ist dramatisch. Betrachten Sie das Abrufen von Daten von 50 verschiedenen URLs:
Synchroner Ansatz: 50 Requests × 2 Sekunden each = 100 Sekunden total Asynchroner Ansatz: Alle 50 Requests laufen nebenläufig ≈ 2 Sekunden total
Diese 50-fache Leistungsverbesserung resultiert aus einer besseren Ressourcennutzung. Anstatt bei I/O-Operationen zu blockieren, erlaubt asyncio Ihrem Programm, andere Tasks weiter auszuführen, während auf I/O gewartet wird.
Nebenläufigkeit vs Parallelität vs Async
Das Verständnis der Unterscheidung zwischen diesen Konzepten ist essenziell für die effektive Nutzung von asyncio.
Nebenläufigkeit bedeutet, mehrere Tasks gleichzeitig zu verwalten. Die Tasks machen abwechselnd Fortschritt, aber nur einer wird zu einem gegebenen Zeitpunkt ausgeführt. Stellen Sie sich einen Koch vor, der mehrere Gerichte zubereitet und zwischen den Aufgaben wechselt, während jedes auf das Garen wartet.
Parallelität bedeutet, mehrere Tasks simultan auf verschiedenen CPU-Kernen auszuführen. Dies erfordert echte parallele Verarbeitungshardware und ist ideal für CPU-bound Tasks wie mathematische Berechnungen oder Bildverarbeitung.
Asynchrone Programmierung ist eine spezifische Form der Nebenläufigkeit, die für I/O-bound Tasks entwickelt wurde. Sie verwendet einen einzelnen Thread und wechselt zwischen Tasks, wenn diese auf I/O-Operationen warten.
| Merkmal | asyncio | Threading | Multiprocessing |
|---|---|---|---|
| Ausführungsmodell | Einzelner Thread, kooperatives Multitasking | Mehrere Threads, präemptives Multitasking | Mehrere Prozesse |
| Am besten für | I/O-bound Tasks | I/O-bound Tasks mit blockierenden Bibliotheken | CPU-bound Tasks |
| Speicher-Overhead | Minimal | Moderat | Hoch |
| Kosten für Kontextwechsel | Sehr niedrig | Niedrig bis moderat | Hoch |
| Komplexität | Moderat (async/await-Syntax) | Hoch (Race Conditions, Locks) | Hoch (IPC, Serialisierung) |
| GIL-Limitierung | Nicht betroffen (einzelner Thread) | Durch GIL limitiert | Nicht limitiert (separate Prozesse) |
| Typische Beschleunigung für I/O | 10-100x | 5-10x | N/A |
Die Python Global Interpreter Lock (GIL) verhindert die echte parallele Ausführung von Python-Bytecode in Threads, was Threading weniger effektiv für CPU-bound Tasks macht. Asyncio umgeht diese Limitierung durch die Verwendung eines einzelnen Threads mit kooperativem Multitasking, während Multiprocessing sie durch separate Prozesse vollständig umgeht.
Die Schlüsselwörter async def und await
Das Fundament von asyncio basiert auf zwei Schlüsselwörtern: async und await.
Das Schlüsselwort async def definiert eine Coroutine-Funktion. Wenn Sie eine Coroutine-Funktion aufrufen, wird sie nicht sofort ausgeführt. Stattdessen gibt sie ein Coroutine-Objekt zurück, das awaited werden kann.
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"status": "success"}
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine)) # <class 'coroutine'>
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine) # This would execute the coroutineDas Schlüsselwort await pausiert die Ausführung einer Coroutine, bis die awaited Operation abgeschlossen ist. Während dieser Pause kann die Event Loop andere Coroutines ausführen. Sie können await nur innerhalb einer async def Funktion verwenden.
async def process_user(user_id):
# Await an I/O operation
user_data = await fetch_user_from_database(user_id)
# Await another I/O operation
user_profile = await fetch_user_profile(user_id)
# Regular synchronous code runs normally
processed_data = transform_data(user_data, user_profile)
return processed_dataWichtige Regeln für async/await:
- Sie können nur Coroutines, Tasks oder Futures
awaiten - Sie können
awaitnur innerhalb einerasync defFunktion verwenden - Reguläre Funktionen können kein
awaitverwenden - Das Aufrufen einer async-Funktion ohne await erstellt ein Coroutine-Objekt, führt aber den Code nicht aus
Häufiger Fehler:
async def wrong_example():
# This creates a coroutine but doesn't execute it!
fetch_data() # Missing await
async def correct_example():
# This actually executes the coroutine
result = await fetch_data()Der Einstiegspunkt asyncio.run()
Die Funktion asyncio.run() ist die Standardmethode, um die asyncio Event Loop zu starten und Ihre Main-Coroutine auszuführen. Sie wurde in Python 3.7 eingeführt und vereinfacht die Ausführung von async-Code aus synchronen Kontexten.
import asyncio
async def main():
print("Starting async operations")
await asyncio.sleep(1)
print("Finished")
# Run the main coroutine
asyncio.run(main())Was asyncio.run() im Hintergrund macht:
- Erstellt eine neue Event Loop
- Führt die bereitgestellte Coroutine bis zum Abschluss aus
- Schließt die Event Loop
- Gibt das Ergebnis der Coroutine zurück
import asyncio
async def main():
result = await compute_value()
return result
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)Wichtige Eigenschaften von asyncio.run():
- Kann nicht von innerhalb einer laufenden Event Loop aufgerufen werden: Wenn Sie sich bereits in einer async-Funktion befinden, verwenden Sie stattdessen
await - Erstellt jedes Mal eine frische Event Loop: Rufen Sie
asyncio.run()nicht mehrmals im selben Programm auf, es sei denn, Sie möchten separate Event-Loop-Instanzen - Schließt immer die Loop: Die Event Loop wird nach der Ausführung aufgeräumt
Für Jupyter Notebooks oder Umgebungen, in denen bereits eine Event Loop läuft, verwenden Sie await direkt oder asyncio.create_task(). Tools wie RunCell (opens in a new tab) bieten erweiterte Async-Unterstützung in Jupyter-Umgebungen, was es einfacher macht, interaktiv mit asyncio-Patterns zu experimentieren, ohne Event-Loop-Konflikte.
Vor Python 3.7 mussten Sie die Event Loop manuell verwalten:
# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# Modern way (Python 3.7+)
asyncio.run(main())Coroutines, Tasks und Futures
Das Verständnis dieser drei Kernkonzepte ist essenziell für die Beherrschung von asyncio.
Coroutines
Eine Coroutine ist eine Funktion, die mit async def definiert wurde. Es ist eine spezielle Funktion, die pausiert und fortgesetzt werden kann, wodurch anderer Code während der Pause laufen kann.
import asyncio
async def my_coroutine():
await asyncio.sleep(1)
return "completed"
# This creates a coroutine object
coro = my_coroutine()
# To run it
result = asyncio.run(coro)Tasks
Ein Task ist ein Wrapper um eine Coroutine, der sie für die Ausführung in der Event Loop einplant. Tasks ermöglichen es Coroutines, nebenläufig zu laufen.
import asyncio
async def say_after(delay, message):
await asyncio.sleep(delay)
print(message)
return message
async def main():
# Create tasks to run concurrently
task1 = asyncio.create_task(say_after(1, "First"))
task2 = asyncio.create_task(say_after(2, "Second"))
# Wait for both tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())Ein Task wird erstellt und die Coroutine sofort für die Ausführung eingeplant. Die Event Loop startet sie so bald wie möglich, noch bevor Sie den Task awaiten.
async def background_work():
print("Starting background work")
await asyncio.sleep(2)
print("Background work completed")
async def main():
# The coroutine starts running immediately
task = asyncio.create_task(background_work())
print("Task created, doing other work")
await asyncio.sleep(1)
print("Other work done")
# Wait for the background task to finish
await task
asyncio.run(main())Output:
Task created, doing other work
Starting background work
Other work done
Background work completedFutures
Ein Future ist ein awaitables Objekt auf niedriger Ebene, das ein späteres Ergebnis einer asynchronen Operation repräsentiert. Sie erstellen Futures selten direkt; sie werden typischerweise von asyncio-Interna oder Bibliotheken erstellt.
async def set_future_result(future):
await asyncio.sleep(1)
future.set_result("Future completed")
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# Schedule a coroutine to set the future's result
asyncio.create_task(set_future_result(future))
# Wait for the future to get a result
result = await future
print(result)
asyncio.run(main())Beziehung zwischen Coroutines, Tasks und Futures:
- Coroutines sind die Funktionen, die Sie schreiben
- Tasks wrappen Coroutines und planen sie für die Ausführung ein
- Futures repräsentieren Ergebnisse, die in der Zukunft verfügbar sein werden
- Tasks sind eine Subklasse von Future
asyncio.create_task() für nebenläufige Ausführung
Die Funktion asyncio.create_task() ist Ihr primäres Werkzeug, um echte Nebenläufigkeit mit asyncio zu erreichen. Sie plant eine Coroutine für die Ausführung in der Event Loop ein, ohne die aktuelle Coroutine zu blockieren.
import asyncio
import time
async def download_file(file_id):
print(f"Starting download {file_id}")
await asyncio.sleep(2) # Simulate download time
print(f"Completed download {file_id}")
return f"file_{file_id}.dat"
async def sequential_downloads():
"""Downloads files one at a time"""
start = time.time()
file1 = await download_file(1)
file2 = await download_file(2)
file3 = await download_file(3)
elapsed = time.time() - start
print(f"Sequential: {elapsed:.2f} seconds")
# Output: ~6 seconds (2 + 2 + 2)
async def concurrent_downloads():
"""Downloads files concurrently"""
start = time.time()
# Create tasks for concurrent execution
task1 = asyncio.create_task(download_file(1))
task2 = asyncio.create_task(download_file(2))
task3 = asyncio.create_task(download_file(3))
# Wait for all tasks to complete
file1 = await task1
file2 = await task2
file3 = await task3
elapsed = time.time() - start
print(f"Concurrent: {elapsed:.2f} seconds")
# Output: ~2 seconds (all run at the same time)
asyncio.run(concurrent_downloads())Der Task wird sofort eingeplant, wenn create_task() aufgerufen wird. Sie müssen ihn nicht sofort awaiten.
async def process_data():
# Start background tasks
task1 = asyncio.create_task(fetch_from_api_1())
task2 = asyncio.create_task(fetch_from_api_2())
# Do some work while tasks run in background
local_data = prepare_local_data()
# Now wait for the background tasks
api_data_1 = await task1
api_data_2 = await task2
# Combine all data
return combine_data(local_data, api_data_1, api_data_2)Sie können Tasks auch für besseres Debugging benennen:
async def main():
task = asyncio.create_task(
long_running_operation(),
name="long-operation-task"
)
# Task name is accessible
print(f"Task name: {task.get_name()}")
await taskasyncio.gather() zur Ausführung mehrerer Coroutines
Die Funktion asyncio.gather() führt mehrere Coroutines nebenläufig aus und wartet, bis alle abgeschlossen sind. Es ist sauberer als die Erstellung individueller Tasks, wenn Sie viele Coroutines ausführen müssen.
import asyncio
async def fetch_user(user_id):
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
fetch_user(4),
fetch_user(5)
)
print(results)
# Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
asyncio.run(main())gather() gibt Ergebnisse in der gleichen Reihenfolge wie die Input-Coroutines zurück, unabhängig davon, welche zuerst abschließt.
async def task_with_delay(delay, value):
await asyncio.sleep(delay)
return value
async def main():
results = await asyncio.gather(
task_with_delay(3, "slow"),
task_with_delay(1, "fast"),
task_with_delay(2, "medium")
)
print(results)
# Output: ['slow', 'fast', 'medium'] (order preserved)
asyncio.run(main())Fehlerbehandlung mit gather()
Standardmäßig wirft gather() sofort eine Exception, wenn eine Coroutine eine Exception wirft, und bricht die verbleibenden Tasks ab.
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def successful_task():
await asyncio.sleep(2)
return "success"
async def main():
try:
results = await asyncio.gather(
failing_task(),
successful_task()
)
except ValueError as e:
print(f"Error caught: {e}")
# The successful_task is cancelled
asyncio.run(main())Verwenden Sie return_exceptions=True, um Exceptions zusammen mit erfolgreichen Ergebnissen zu sammeln:
async def main():
results = await asyncio.gather(
failing_task(),
successful_task(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())Dynamisches gather mit Listenkomprehension
gather() funktioniert hervorragend mit dynamisch generierten Coroutines:
async def process_item(item):
await asyncio.sleep(1)
return item * 2
async def main():
items = [1, 2, 3, 4, 5]
# Process all items concurrently
results = await asyncio.gather(
*[process_item(item) for item in items]
)
print(results) # [2, 4, 6, 8, 10]
asyncio.run(main())asyncio.wait() und asyncio.as_completed()
Während gather() auf alle Coroutines wartet, bieten wait() und as_completed() granulärere Kontrolle darüber, wie Sie abgeschlossene Tasks behandeln.
asyncio.wait()
wait() ermöglicht es Ihnen, auf Tasks mit verschiedenen Abschlussbedingungen zu warten: alle Tasks, erster Task oder erste Exception.
import asyncio
async def task(delay, name):
await asyncio.sleep(delay)
return f"{name} completed"
async def main():
tasks = [
asyncio.create_task(task(1, "Task 1")),
asyncio.create_task(task(2, "Task 2")),
asyncio.create_task(task(3, "Task 3"))
]
# Wait for all tasks to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
print(task.result())
asyncio.run(main())Unterschiedliche Abschlussbedingungen:
async def main():
tasks = [
asyncio.create_task(task(1, "Fast")),
asyncio.create_task(task(3, "Slow")),
asyncio.create_task(task(2, "Medium"))
]
# Return when the first task completes
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"First completed: {done.pop().result()}")
print(f"Still pending: {len(pending)} tasks")
# Cancel remaining tasks
for task in pending:
task.cancel()
asyncio.run(main())asyncio.as_completed()
as_completed() gibt einen Iterator zurück, der Tasks ausgibt, sobald sie abgeschlossen sind, was es Ihnen ermöglicht, Ergebnisse zu verarbeiten, sobald sie verfügbar sind.
import asyncio
async def fetch_data(url_id, delay):
await asyncio.sleep(delay)
return f"Data from URL {url_id}"
async def main():
tasks = [
fetch_data(1, 3),
fetch_data(2, 1),
fetch_data(3, 2)
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Received: {result}")
asyncio.run(main())Die Ausgabe zeigt Ergebnisse in Abschlussreihenfolge, nicht in Einreichungsreihenfolge:
Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1Dies ist besonders nützlich, wenn Sie Ergebnisse so schnell wie möglich an Nutzer anzeigen möchten:
async def search_engine(query, engine_name, delay):
await asyncio.sleep(delay)
return f"{engine_name}: Results for '{query}'"
async def main():
query = "python asyncio"
searches = [
search_engine(query, "Google", 1.5),
search_engine(query, "Bing", 2.0),
search_engine(query, "DuckDuckGo", 1.0)
]
print("Searching...")
for search in asyncio.as_completed(searches):
result = await search
print(result) # Display each result as soon as it arrives
asyncio.run(main())asyncio.sleep() vs time.sleep()
Diese Unterscheidung ist kritisch für die Korrektheit von async-Code.
time.sleep() ist eine blockierende Operation, die den gesamten Thread pausiert, einschließlich der Event Loop. Dies verhindert, dass alle async-Tasks laufen.
asyncio.sleep() ist eine nicht-blockierende Coroutine, die nur den aktuellen Task pausiert und anderen Tasks erlaubt zu laufen.
import asyncio
import time
async def blocking_example():
"""Bad: Uses time.sleep() - blocks the entire event loop"""
print("Starting blocking sleep")
time.sleep(2) # WRONG: Blocks everything!
print("Finished blocking sleep")
async def non_blocking_example():
"""Good: Uses asyncio.sleep() - allows other tasks to run"""
print("Starting non-blocking sleep")
await asyncio.sleep(2) # CORRECT: Only pauses this task
print("Finished non-blocking sleep")
async def concurrent_task():
for i in range(3):
print(f"Concurrent task running: {i}")
await asyncio.sleep(0.5)
async def demo_blocking():
print("\n=== Blocking example (BAD) ===")
await asyncio.gather(
blocking_example(),
concurrent_task()
)
async def demo_non_blocking():
print("\n=== Non-blocking example (GOOD) ===")
await asyncio.gather(
non_blocking_example(),
concurrent_task()
)
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())Im blockierenden Beispiel läuft der nebenläufige Task nicht, bis time.sleep() abgeschlossen ist. Im nicht-blockierenden Beispiel laufen beide Tasks nebenläufig.
Faustregel: Verwenden Sie niemals time.sleep() in async-Code. Verwenden Sie immer await asyncio.sleep().
Für CPU-bound Operationen, die Sie nicht vermeiden können, verwenden Sie loop.run_in_executor(), um sie in einem separaten Thread oder Prozess laufen zu lassen:
import asyncio
import time
def cpu_intensive_task():
"""Some blocking CPU work"""
time.sleep(2) # Simulate heavy computation
return "CPU task completed"
async def main():
loop = asyncio.get_event_loop()
# Run blocking task in a thread pool
result = await loop.run_in_executor(None, cpu_intensive_task)
print(result)
asyncio.run(main())async for und async with
Python bietet asynchrone Versionen von for-Schleifen und Kontextmanagern für die Arbeit mit asynchronen Iterables und Ressourcen.
Asynchrone Iteratoren (async for)
Ein asynchroner Iterator ist ein Objekt, das __aiter__() und __anext__() Methoden implementiert, was es Ihnen ermöglicht, über Items zu iterieren, deren Abruf async-Operationen erfordert.
import asyncio
class AsyncRange:
"""An async iterator that yields numbers with delays"""
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
# Simulate async operation to get next value
await asyncio.sleep(0.5)
value = self.current
self.current += 1
return value
async def main():
async for number in AsyncRange(1, 5):
print(f"Got number: {number}")
asyncio.run(main())Praxisbeispiel mit asynchronem Datenbank-Cursor:
class AsyncDatabaseCursor:
"""Simulated async database cursor"""
def __init__(self, query):
self.query = query
self.results = []
self.index = 0
async def execute(self):
# Simulate database query
await asyncio.sleep(1)
self.results = [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.results):
raise StopAsyncIteration
# Simulate fetching next row
await asyncio.sleep(0.1)
row = self.results[self.index]
self.index += 1
return row
async def fetch_users():
cursor = AsyncDatabaseCursor("SELECT * FROM users")
await cursor.execute()
async for row in cursor:
print(f"User: {row['name']}")
asyncio.run(fetch_users())Asynchrone Kontextmanager (async with)
Ein asynchroner Kontextmanager implementiert __aenter__() und __aexit__() Methoden für das Management von Ressourcen, die async-Setup und -Cleanup erfordern.
import asyncio
class AsyncDatabaseConnection:
"""An async context manager for database connections"""
async def __aenter__(self):
print("Opening database connection...")
await asyncio.sleep(1) # Simulate connection time
print("Database connection opened")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
await asyncio.sleep(0.5) # Simulate cleanup
print("Database connection closed")
async def query(self, sql):
await asyncio.sleep(0.5)
return f"Results for: {sql}"
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
# Connection automatically closed after the with block
asyncio.run(main())Kombination asynchroner Kontextmanager mit asynchronen Iteratoren:
class AsyncFileReader:
"""Async context manager and iterator for reading files"""
def __init__(self, filename):
self.filename = filename
self.lines = []
self.index = 0
async def __aenter__(self):
# Simulate async file opening
await asyncio.sleep(0.5)
self.lines = ["Line 1", "Line 2", "Line 3"]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.2)
self.lines = []
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.lines):
raise StopAsyncIteration
await asyncio.sleep(0.1)
line = self.lines[self.index]
self.index += 1
return line
async def read_file():
async with AsyncFileReader("data.txt") as reader:
async for line in reader:
print(line)
asyncio.run(read_file())asyncio.Queue für Producer-Consumer-Patterns
asyncio.Queue ist eine thread-sichere, async-fähige Queue, die perfekt für die Koordinierung von Arbeit zwischen Producer- und Consumer-Coroutines ist.
import asyncio
import random
async def producer(queue, producer_id):
"""Produces items and puts them in the queue"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
async def consumer(queue, consumer_id):
"""Consumes items from the queue"""
while True:
item = await queue.get()
print(f"Consumer {consumer_id} consuming: {item}")
# Simulate processing time
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for the queue to be fully processed
await queue.join()
# Cancel consumers (they run forever)
for consumer_task in consumers:
consumer_task.cancel()
asyncio.run(main())Praxisbeispiel: Web-Scraper mit URL-Queue:
import asyncio
from typing import Set
async def fetch_url(session, url):
"""Simulate fetching a URL"""
await asyncio.sleep(1)
return f"Content from {url}"
async def producer(queue: asyncio.Queue, start_urls: list):
"""Add URLs to the queue"""
for url in start_urls:
await queue.put(url)
async def consumer(queue: asyncio.Queue, visited: Set[str]):
"""Fetch URLs from the queue"""
while True:
url = await queue.get()
if url in visited:
queue.task_done()
continue
visited.add(url)
print(f"Scraping: {url}")
# Simulate fetching
content = await fetch_url(None, url)
# Simulate finding new URLs in the content
# In real scraper, you'd parse HTML and extract links
new_urls = [] # Would extract from content
for new_url in new_urls:
if new_url not in visited:
await queue.put(new_url)
queue.task_done()
async def main():
queue = asyncio.Queue()
visited = set()
start_urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
# Add initial URLs
await producer(queue, start_urls)
# Create consumers
consumers = [
asyncio.create_task(consumer(queue, visited))
for _ in range(3)
]
# Wait for all URLs to be processed
await queue.join()
# Cancel consumers
for consumer_task in consumers:
consumer_task.cancel()
print(f"Scraped {len(visited)} unique URLs")
asyncio.run(main())Semaphores für Rate Limiting
asyncio.Semaphore steuert die Anzahl der Coroutines, die gleichzeitig auf eine Ressource zugreifen können. Dies ist essenziell für das Rate Limiting von API-Aufrufen oder das Limitieren gleichzeitiger Datenbankverbindungen.
import asyncio
import time
async def call_api(semaphore, api_id):
"""Make an API call with rate limiting"""
async with semaphore:
print(f"API call {api_id} started")
await asyncio.sleep(1) # Simulate API call
print(f"API call {api_id} completed")
return f"Result {api_id}"
async def main():
# Allow only 3 concurrent API calls
semaphore = asyncio.Semaphore(3)
start = time.time()
# Create 10 API calls
tasks = [call_api(semaphore, i) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
# With semaphore(3): ~4 seconds (10 calls in batches of 3)
# Without semaphore: ~1 second (all concurrent)
asyncio.run(main())Rate Limiting zur Einhaltung von API-Kontingenten:
import asyncio
from datetime import datetime
class RateLimiter:
"""Rate limiter that allows N requests per time period"""
def __init__(self, max_requests, time_period):
self.max_requests = max_requests
self.time_period = time_period
self.semaphore = asyncio.Semaphore(max_requests)
self.request_times = []
async def __aenter__(self):
await self.semaphore.acquire()
# Wait if we've hit the rate limit
while len(self.request_times) >= self.max_requests:
oldest = self.request_times[0]
elapsed = datetime.now().timestamp() - oldest
if elapsed < self.time_period:
sleep_time = self.time_period - elapsed
await asyncio.sleep(sleep_time)
self.request_times.pop(0)
self.request_times.append(datetime.now().timestamp())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.semaphore.release()
async def fetch_with_rate_limit(rate_limiter, item_id):
async with rate_limiter:
print(f"Fetching item {item_id} at {datetime.now()}")
await asyncio.sleep(0.5)
return f"Item {item_id}"
async def main():
# Allow 5 requests per 2 seconds
rate_limiter = RateLimiter(max_requests=5, time_period=2)
# Make 15 requests
tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests")
asyncio.run(main())aiohttp für asynchrone HTTP-Anfragen
Die Bibliothek aiohttp bietet async HTTP Client- und Server-Funktionalität. Es ist die Standardwahl für HTTP-Requests in async-Code.
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch_url(session, "https://api.github.com")
print(f"Fetched {len(html)} characters")
asyncio.run(main())Gleichzeitiges Abrufen mehrerer URLs:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""Fetch URL and return status code and content length"""
async with session.get(url) as response:
content = await response.text()
return {
"url": url,
"status": response.status,
"length": len(content)
}
async def fetch_all_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
urls = [
"https://api.github.com",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
start = time.time()
results = await fetch_all_urls(urls)
elapsed = time.time() - start
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())Praktisches Beispiel mit Fehlerbehandlung und Retries:
import asyncio
import aiohttp
from typing import Optional
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Optional[dict]:
"""Fetch URL with retry logic"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return data
elif response.status == 404:
print(f"URL not found: {url}")
return None
else:
print(f"Unexpected status {response.status} for {url}")
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1} for {url}")
except aiohttp.ClientError as e:
print(f"Client error on attempt {attempt + 1} for {url}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
print(f"Failed to fetch {url} after {max_retries} attempts")
return None
async def main():
urls = [
"https://api.github.com/users/github",
"https://api.github.com/users/nonexistent",
"https://httpbin.org/delay/5"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks)
successful = [r for r in results if r is not None]
print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
asyncio.run(main())aiofiles für asynchrone Datei-I/O
Die Bibliothek aiofiles bietet asynchrone Dateioperationen, die Blockieren während Datei-Lese- und -Schreiboperationen verhindert.
import asyncio
import aiofiles
async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, mode='r') as f:
contents = await f.read()
return contents
async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, mode='w') as f:
await f.write(content)
async def main():
# Write data
await write_file('test.txt', 'Hello, async file I/O!')
# Read data
content = await read_file('test.txt')
print(f"Read: {content}")
asyncio.run(main())Gleichzeitige Verarbeitung mehrerer Dateien:
import asyncio
import aiofiles
async def process_file(input_file, output_file):
"""Read, process, and write a file"""
async with aiofiles.open(input_file, mode='r') as f:
content = await f.read()
# Process content (convert to uppercase)
processed = content.upper()
async with aiofiles.open(output_file, mode='w') as f:
await f.write(processed)
return f"Processed {input_file} -> {output_file}"
async def main():
files = [
('input1.txt', 'output1.txt'),
('input2.txt', 'output2.txt'),
('input3.txt', 'output3.txt')
]
tasks = [process_file(inp, out) for inp, out in files]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())Zeilenweises Lesen großer Dateien:
import asyncio
import aiofiles
async def process_large_file(filename):
"""Process a large file line by line without loading it all into memory"""
line_count = 0
async with aiofiles.open(filename, mode='r') as f:
async for line in f:
# Process each line
line_count += 1
if line.strip():
# Do something with the line
pass
return line_count
async def main():
count = await process_large_file('large_data.txt')
print(f"Processed {count} lines")
asyncio.run(main())Fehlerbehandlung in asynchronem Code
Die Fehlerbehandlung in async-Code erfordert besondere Aufmerksamkeit, um sicherzustellen, dass Exceptions ordnungsgemäß abgefangen und Ressourcen bereinigt werden.
import asyncio
async def risky_operation(item_id):
"""An operation that might fail"""
await asyncio.sleep(1)
if item_id % 2 == 0:
raise ValueError(f"Item {item_id} caused an error")
return f"Item {item_id} processed"
async def handle_with_try_except():
"""Handle errors with try/except"""
try:
result = await risky_operation(2)
print(result)
except ValueError as e:
print(f"Error caught: {e}")
asyncio.run(handle_with_try_except())Fehlerbehandlung in nebenläufigen Tasks:
async def safe_operation(item_id):
"""Wrapper that catches errors from risky_operation"""
try:
result = await risky_operation(item_id)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e), "item_id": item_id}
async def main():
tasks = [safe_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks)
for result in results:
if result["success"]:
print(f"Success: {result['result']}")
else:
print(f"Failed: Item {result['item_id']} - {result['error']}")
asyncio.run(main())Verwendung von gather() mit return_exceptions=True:
async def main():
tasks = [risky_operation(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())Bereinigung mit asynchronen Kontextmanagern:
class AsyncResource:
"""A resource that needs cleanup even if errors occur"""
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Cleaning up resource")
await asyncio.sleep(0.5)
if exc_type is not None:
print(f"Exception during resource use: {exc_val}")
# Return False to propagate exception, True to suppress
return False
async def do_work(self):
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
try:
async with AsyncResource() as resource:
await resource.do_work()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())Leistungsbenchmarks: Sync vs Async für I/O-bound Tasks
Vergleichen wir synchrone und asynchrone Ansätze für I/O-bound Operationen mit echten Benchmarks.
import asyncio
import time
import requests
import aiohttp
# Synchronous approach
def fetch_sync(url):
response = requests.get(url)
return len(response.text)
def benchmark_sync(urls):
start = time.time()
results = [fetch_sync(url) for url in urls]
elapsed = time.time() - start
return elapsed, results
# Asynchronous approach
async def fetch_async(session, url):
async with session.get(url) as response:
text = await response.text()
return len(text)
async def benchmark_async(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# Run benchmarks
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
print(f"\nSpeedup: {sync_time / async_time:.2f}x")Typische Ergebnisse für 5 URLs mit jeweils 1 Sekunde Verzögerung:
- Synchron: ~5 Sekunden (sequentielle Ausführung)
- Asynchron: ~1 Sekunde (nebenläufige Ausführung)
- Beschleunigung: ~5x
| Anzahl URLs | Sync-Zeit | Async-Zeit | Beschleunigung |
|---|---|---|---|
| 5 | 5.2s | 1.1s | 4.7x |
| 10 | 10.4s | 1.2s | 8.7x |
| 20 | 20.8s | 1.4s | 14.9x |
| 50 | 52.1s | 2.1s | 24.8x |
| 100 | 104.5s | 3.8s | 27.5x |
Die Beschleunigung nimmt mit der Anzahl der nebenläufigen Operationen zu, bis Sie Bandbreiten- oder Rate-Limiting-Beschränkungen erreichen.
Häufige Fallstricke und wie man sie vermeidet
Vergessen von await
# WRONG: Coroutine is created but not executed
async def wrong():
result = fetch_data() # Missing await!
print(result) # Prints coroutine object, not the result
# CORRECT: Await the coroutine
async def correct():
result = await fetch_data()
print(result) # Prints actual resultBlockieren der Event Loop
# WRONG: Blocks the entire event loop
async def blocking_code():
time.sleep(5) # Blocks everything!
result = compute_something() # Blocks if CPU-intensive
return result
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
await asyncio.sleep(5) # Only pauses this task
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, compute_something)
return resultNichtbehandlung der Task-Abbruchanforderung (Cancellation)
# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
resource = await acquire_resource()
await long_operation()
await release_resource(resource) # May not execute if cancelled
# CORRECT: Use try/finally or async with
async def proper_cleanup():
resource = await acquire_resource()
try:
await long_operation()
finally:
await release_resource(resource) # Always executesErzeugen von Event-Loop-Konflikten
# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
result = asyncio.run(some_coroutine()) # Error!
return result
# CORRECT: Just await the coroutine
async def correct_nesting():
result = await some_coroutine()
return resultNichtbegrenzung der Nebenläufigkeit
# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
tasks = [process_item(item) for item in items] # 10,000 tasks!
return await asyncio.gather(*tasks)
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
semaphore = asyncio.Semaphore(10) # Max 10 concurrent
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)Praxisbeispiele
Web-Scraping mit Rate Limiting
import asyncio
import aiohttp
from bs4 import BeautifulSoup
class AsyncWebScraper:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_page(self, url):
async with self.semaphore:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def scrape_urls(self, urls):
tasks = [self.fetch_page(url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3"
]
async with AsyncWebScraper(max_concurrent=3) as scraper:
pages = await scraper.scrape_urls(urls)
for url, html in zip(urls, pages):
if html:
print(f"Scraped {url}: {len(html)} bytes")
asyncio.run(main())Asynchrone API-Datenpipeline
import asyncio
import aiohttp
async def fetch_user_ids(session):
"""Fetch list of user IDs from API"""
async with session.get("https://api.example.com/users") as response:
data = await response.json()
return [user["id"] for user in data["users"]]
async def fetch_user_details(session, user_id):
"""Fetch detailed info for a user"""
async with session.get(f"https://api.example.com/users/{user_id}") as response:
return await response.json()
async def fetch_user_posts(session, user_id):
"""Fetch posts for a user"""
async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
return await response.json()
async def process_user(session, user_id):
"""Fetch all data for a user concurrently"""
details, posts = await asyncio.gather(
fetch_user_details(session, user_id),
fetch_user_posts(session, user_id)
)
return {
"user": details,
"posts": posts,
"post_count": len(posts)
}
async def main():
async with aiohttp.ClientSession() as session:
# Fetch user IDs
user_ids = await fetch_user_ids(session)
# Process all users concurrently
user_data = await asyncio.gather(
*[process_user(session, uid) for uid in user_ids[:10]]
)
for data in user_data:
print(f"User {data['user']['name']}: {data['post_count']} posts")
asyncio.run(main())Asynchroner Chat-Server
import asyncio
class ChatServer:
def __init__(self):
self.clients = set()
async def handle_client(self, reader, writer):
"""Handle a single client connection"""
addr = writer.get_extra_info('peername')
print(f"Client connected: {addr}")
self.clients.add(writer)
try:
while True:
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Received from {addr}: {message}")
# Broadcast to all clients
await self.broadcast(f"{addr}: {message}", writer)
except asyncio.CancelledError:
pass
finally:
print(f"Client disconnected: {addr}")
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
async def broadcast(self, message, sender):
"""Send message to all clients except sender"""
for client in self.clients:
if client != sender:
try:
client.write(message.encode())
await client.drain()
except Exception as e:
print(f"Error broadcasting: {e}")
async def start(self, host='127.0.0.1', port=8888):
"""Start the chat server"""
server = await asyncio.start_server(
self.handle_client,
host,
port
)
addr = server.sockets[0].getsockname()
print(f"Chat server running on {addr}")
async with server:
await server.serve_forever()
async def main():
chat_server = ChatServer()
await chat_server.start()
asyncio.run(main())Experimentieren mit Asyncio in Jupyter
Bei der Arbeit mit asyncio in Jupyter Notebooks können Sie auf Event-Loop-Konflikte stoßen. Jupyter führt bereits eine Event Loop aus, was mit asyncio.run() interferieren kann.
Für nahtloses asynchrones Experimentieren in Jupyter-Umgebungen erwägen Sie die Verwendung von RunCell (opens in a new tab), einem speziell für Jupyter Notebooks entwickelten AI-Agenten. RunCell verwaltet das Event-Loop-Management automatisch und bietet erweiterte Async-Debugging-Fähigkeiten, was es Ihnen ermöglicht, interaktiv asyncio-Patterns zu testen, ohne Konflikte.
Im Standard-Jupyter können Sie Top-Level await verwenden:
# In Jupyter, this works directly
async def fetch_data():
await asyncio.sleep(1)
return "data"
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)Oder verwenden Sie das Paket nest_asyncio, um verschachtelte Event Loops zu erlauben:
import nest_asyncio
nest_asyncio.apply()
# Now asyncio.run() works in Jupyter
asyncio.run(main())FAQ
Fazit
Python asyncio transformiert I/O-bound Anwendungen von langsamen, blockierenden Operationen in schnelle, nebenläufige Systeme. Indem Sie die async/await-Syntax beherrschen, die Event Loop verstehen und Tools wie gather(), create_task() und Semaphores nutzen, können Sie Anwendungen bauen, die Tausende von nebenläufigen Operationen effizient verarbeiten.
Der Schlüssel zum Erfolg mit asyncio ist die Erkenntnis, wann es das richtige Werkzeug ist. Verwenden Sie es für Netzwerk-Requests, Datenbankabfragen, Dateioperationen und jede Aufgabe, die Zeit damit verbringt, auf externe Ressourcen zu warten. Vermeiden Sie es, die Event Loop mit synchronen Operationen zu blockieren, verwenden Sie immer await mit Coroutines, und begrenzen Sie bei Bedarf die Nebenläufigkeit mit Semaphores.
Beginnen Sie damit, kleine Abschnitte Ihrer Codebasis auf async umzustellen, messen Sie die Leistungsverbesserung, und erweitern Sie schrittlich. Die dramatischen Geschwindigkeitssteigerungen in I/O-intensiven Anwendungen machen die Lerninvestition wertvoll.