Skip to content

Python asyncio: Kompletter Leitfaden zur asynchronen Programmierung

Updated on

Ihre Python-Anwendung führt 100 API-Aufrufe aus, jeder dauert 2 Sekunden. Mit traditionellem sequenziellem Code sind das 200 Sekunden Wartezeit. Ihre Nutzer starren auf Ladebildschirme. Ihre Server sitzen untätig herum, verbrauchen Ressourcen während sie auf Antworten warten. Dieses blockierende Verhalten ist das Problem, das die Anwendungsleistung und Benutzererfahrung zerstört.

Der Schmerz verstärkt sich, wenn Sie skalieren. Datenbankabfragen stapeln sich. Dateioperationen reihen sich hintereinander an. Web-Scraper kriechen im Schneckentempo. Jede I/O-Operation wird zu einem Engpass, der sich durch Ihr gesamtes System fortsetzt und aus dem, was eine schnelle, responsive Anwendung sein sollte, eine träge, ressourcenverschwendende Monsteranwendung macht.

Python asyncio löst dies, indem es die nebenläufige Ausführung von I/O-bound Tasks ermöglicht. Anstatt auf den Abschluss jeder Operation zu warten, bevor die nächste beginnt, erlaubt asyncio Ihrem Code, mehrere Operationen zu starten und zwischen ihnen zu wechseln, während sie warten. Diese 100 API-Aufrufe? Mit asyncio werden sie in etwa 2 Sekunden statt 200 abgeschlossen. Dieser Leitfaden zeigt Ihnen genau, wie Sie asynchrone Programmierung in Python implementieren, mit praktischen Beispielen, die langsamen, blockierenden Code in schnelle, nebenläufige Anwendungen transformieren.

📚

Was ist asynchrone Programmierung und warum ist sie wichtig

Asynchrone Programmierung ermöglicht es einem Programm, potenziell langlaufende Tasks zu starten und zu anderen Arbeiten überzugehen, bevor diese Tasks abgeschlossen sind, anstatt auf den Abschluss jedes Tasks zu warten, bevor der nächste beginnt.

Im traditionellen synchronen Code stoppt Ihr Programm und wartet auf die Antwort, wenn Sie einen API-Request machen. Während dieser Wartezeit sitzt Ihre CPU untätig herum und macht nichts Produktives. Das ist für einzelne Operationen akzeptabel, aber katastrophal für Anwendungen, die mehrere I/O-Operationen verarbeiten müssen.

Asyncio bietet eine Möglichkeit, nebenläufigen Code mit der async/await-Syntax zu schreiben. Es ist besonders effektiv für I/O-bound Operationen wie:

  • API-HTTP-Requests machen
  • Dateien lesen und schreiben
  • Datenbankabfragen
  • Netzwerkkommunikation
  • WebSocket-Verbindungen
  • Message-Queue-Verarbeitung

Die Leistungsverbesserung ist dramatisch. Betrachten Sie das Abrufen von Daten von 50 verschiedenen URLs:

Synchroner Ansatz: 50 Requests × 2 Sekunden each = 100 Sekunden total Asynchroner Ansatz: Alle 50 Requests laufen nebenläufig ≈ 2 Sekunden total

Diese 50-fache Leistungsverbesserung resultiert aus einer besseren Ressourcennutzung. Anstatt bei I/O-Operationen zu blockieren, erlaubt asyncio Ihrem Programm, andere Tasks weiter auszuführen, während auf I/O gewartet wird.

Nebenläufigkeit vs Parallelität vs Async

Das Verständnis der Unterscheidung zwischen diesen Konzepten ist essenziell für die effektive Nutzung von asyncio.

Nebenläufigkeit bedeutet, mehrere Tasks gleichzeitig zu verwalten. Die Tasks machen abwechselnd Fortschritt, aber nur einer wird zu einem gegebenen Zeitpunkt ausgeführt. Stellen Sie sich einen Koch vor, der mehrere Gerichte zubereitet und zwischen den Aufgaben wechselt, während jedes auf das Garen wartet.

Parallelität bedeutet, mehrere Tasks simultan auf verschiedenen CPU-Kernen auszuführen. Dies erfordert echte parallele Verarbeitungshardware und ist ideal für CPU-bound Tasks wie mathematische Berechnungen oder Bildverarbeitung.

Asynchrone Programmierung ist eine spezifische Form der Nebenläufigkeit, die für I/O-bound Tasks entwickelt wurde. Sie verwendet einen einzelnen Thread und wechselt zwischen Tasks, wenn diese auf I/O-Operationen warten.

MerkmalasyncioThreadingMultiprocessing
AusführungsmodellEinzelner Thread, kooperatives MultitaskingMehrere Threads, präemptives MultitaskingMehrere Prozesse
Am besten fürI/O-bound TasksI/O-bound Tasks mit blockierenden BibliothekenCPU-bound Tasks
Speicher-OverheadMinimalModeratHoch
Kosten für KontextwechselSehr niedrigNiedrig bis moderatHoch
KomplexitätModerat (async/await-Syntax)Hoch (Race Conditions, Locks)Hoch (IPC, Serialisierung)
GIL-LimitierungNicht betroffen (einzelner Thread)Durch GIL limitiertNicht limitiert (separate Prozesse)
Typische Beschleunigung für I/O10-100x5-10xN/A

Die Python Global Interpreter Lock (GIL) verhindert die echte parallele Ausführung von Python-Bytecode in Threads, was Threading weniger effektiv für CPU-bound Tasks macht. Asyncio umgeht diese Limitierung durch die Verwendung eines einzelnen Threads mit kooperativem Multitasking, während Multiprocessing sie durch separate Prozesse vollständig umgeht.

Die Schlüsselwörter async def und await

Das Fundament von asyncio basiert auf zwei Schlüsselwörtern: async und await.

Das Schlüsselwort async def definiert eine Coroutine-Funktion. Wenn Sie eine Coroutine-Funktion aufrufen, wird sie nicht sofort ausgeführt. Stattdessen gibt sie ein Coroutine-Objekt zurück, das awaited werden kann.

import asyncio
 
async def fetch_data():
    print("Fetching data...")
    await asyncio.sleep(2)  # Simulate I/O operation
    print("Data fetched!")
    return {"status": "success"}
 
# Calling the function returns a coroutine object
coroutine = fetch_data()
print(type(coroutine))  # <class 'coroutine'>
 
# To actually run it, you need to await it or use asyncio.run()
# asyncio.run(coroutine)  # This would execute the coroutine

Das Schlüsselwort await pausiert die Ausführung einer Coroutine, bis die awaited Operation abgeschlossen ist. Während dieser Pause kann die Event Loop andere Coroutines ausführen. Sie können await nur innerhalb einer async def Funktion verwenden.

async def process_user(user_id):
    # Await an I/O operation
    user_data = await fetch_user_from_database(user_id)
 
    # Await another I/O operation
    user_profile = await fetch_user_profile(user_id)
 
    # Regular synchronous code runs normally
    processed_data = transform_data(user_data, user_profile)
 
    return processed_data

Wichtige Regeln für async/await:

  1. Sie können nur Coroutines, Tasks oder Futures awaiten
  2. Sie können await nur innerhalb einer async def Funktion verwenden
  3. Reguläre Funktionen können kein await verwenden
  4. Das Aufrufen einer async-Funktion ohne await erstellt ein Coroutine-Objekt, führt aber den Code nicht aus

Häufiger Fehler:

async def wrong_example():
    # This creates a coroutine but doesn't execute it!
    fetch_data()  # Missing await
 
async def correct_example():
    # This actually executes the coroutine
    result = await fetch_data()

Der Einstiegspunkt asyncio.run()

Die Funktion asyncio.run() ist die Standardmethode, um die asyncio Event Loop zu starten und Ihre Main-Coroutine auszuführen. Sie wurde in Python 3.7 eingeführt und vereinfacht die Ausführung von async-Code aus synchronen Kontexten.

import asyncio
 
async def main():
    print("Starting async operations")
    await asyncio.sleep(1)
    print("Finished")
 
# Run the main coroutine
asyncio.run(main())

Was asyncio.run() im Hintergrund macht:

  1. Erstellt eine neue Event Loop
  2. Führt die bereitgestellte Coroutine bis zum Abschluss aus
  3. Schließt die Event Loop
  4. Gibt das Ergebnis der Coroutine zurück
import asyncio
 
async def main():
    result = await compute_value()
    return result
 
# The return value is accessible
final_result = asyncio.run(main())
print(final_result)

Wichtige Eigenschaften von asyncio.run():

  • Kann nicht von innerhalb einer laufenden Event Loop aufgerufen werden: Wenn Sie sich bereits in einer async-Funktion befinden, verwenden Sie stattdessen await
  • Erstellt jedes Mal eine frische Event Loop: Rufen Sie asyncio.run() nicht mehrmals im selben Programm auf, es sei denn, Sie möchten separate Event-Loop-Instanzen
  • Schließt immer die Loop: Die Event Loop wird nach der Ausführung aufgeräumt

Für Jupyter Notebooks oder Umgebungen, in denen bereits eine Event Loop läuft, verwenden Sie await direkt oder asyncio.create_task(). Tools wie RunCell (opens in a new tab) bieten erweiterte Async-Unterstützung in Jupyter-Umgebungen, was es einfacher macht, interaktiv mit asyncio-Patterns zu experimentieren, ohne Event-Loop-Konflikte.

Vor Python 3.7 mussten Sie die Event Loop manuell verwalten:

# Old way (pre-Python 3.7)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()
 
# Modern way (Python 3.7+)
asyncio.run(main())

Coroutines, Tasks und Futures

Das Verständnis dieser drei Kernkonzepte ist essenziell für die Beherrschung von asyncio.

Coroutines

Eine Coroutine ist eine Funktion, die mit async def definiert wurde. Es ist eine spezielle Funktion, die pausiert und fortgesetzt werden kann, wodurch anderer Code während der Pause laufen kann.

import asyncio
 
async def my_coroutine():
    await asyncio.sleep(1)
    return "completed"
 
# This creates a coroutine object
coro = my_coroutine()
 
# To run it
result = asyncio.run(coro)

Tasks

Ein Task ist ein Wrapper um eine Coroutine, der sie für die Ausführung in der Event Loop einplant. Tasks ermöglichen es Coroutines, nebenläufig zu laufen.

import asyncio
 
async def say_after(delay, message):
    await asyncio.sleep(delay)
    print(message)
    return message
 
async def main():
    # Create tasks to run concurrently
    task1 = asyncio.create_task(say_after(1, "First"))
    task2 = asyncio.create_task(say_after(2, "Second"))
 
    # Wait for both tasks to complete
    result1 = await task1
    result2 = await task2
 
    print(f"Results: {result1}, {result2}")
 
asyncio.run(main())

Ein Task wird erstellt und die Coroutine sofort für die Ausführung eingeplant. Die Event Loop startet sie so bald wie möglich, noch bevor Sie den Task awaiten.

async def background_work():
    print("Starting background work")
    await asyncio.sleep(2)
    print("Background work completed")
 
async def main():
    # The coroutine starts running immediately
    task = asyncio.create_task(background_work())
 
    print("Task created, doing other work")
    await asyncio.sleep(1)
    print("Other work done")
 
    # Wait for the background task to finish
    await task
 
asyncio.run(main())

Output:

Task created, doing other work
Starting background work
Other work done
Background work completed

Futures

Ein Future ist ein awaitables Objekt auf niedriger Ebene, das ein späteres Ergebnis einer asynchronen Operation repräsentiert. Sie erstellen Futures selten direkt; sie werden typischerweise von asyncio-Interna oder Bibliotheken erstellt.

async def set_future_result(future):
    await asyncio.sleep(1)
    future.set_result("Future completed")
 
async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
 
    # Schedule a coroutine to set the future's result
    asyncio.create_task(set_future_result(future))
 
    # Wait for the future to get a result
    result = await future
    print(result)
 
asyncio.run(main())

Beziehung zwischen Coroutines, Tasks und Futures:

  • Coroutines sind die Funktionen, die Sie schreiben
  • Tasks wrappen Coroutines und planen sie für die Ausführung ein
  • Futures repräsentieren Ergebnisse, die in der Zukunft verfügbar sein werden
  • Tasks sind eine Subklasse von Future

asyncio.create_task() für nebenläufige Ausführung

Die Funktion asyncio.create_task() ist Ihr primäres Werkzeug, um echte Nebenläufigkeit mit asyncio zu erreichen. Sie plant eine Coroutine für die Ausführung in der Event Loop ein, ohne die aktuelle Coroutine zu blockieren.

import asyncio
import time
 
async def download_file(file_id):
    print(f"Starting download {file_id}")
    await asyncio.sleep(2)  # Simulate download time
    print(f"Completed download {file_id}")
    return f"file_{file_id}.dat"
 
async def sequential_downloads():
    """Downloads files one at a time"""
    start = time.time()
 
    file1 = await download_file(1)
    file2 = await download_file(2)
    file3 = await download_file(3)
 
    elapsed = time.time() - start
    print(f"Sequential: {elapsed:.2f} seconds")
    # Output: ~6 seconds (2 + 2 + 2)
 
async def concurrent_downloads():
    """Downloads files concurrently"""
    start = time.time()
 
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(download_file(1))
    task2 = asyncio.create_task(download_file(2))
    task3 = asyncio.create_task(download_file(3))
 
    # Wait for all tasks to complete
    file1 = await task1
    file2 = await task2
    file3 = await task3
 
    elapsed = time.time() - start
    print(f"Concurrent: {elapsed:.2f} seconds")
    # Output: ~2 seconds (all run at the same time)
 
asyncio.run(concurrent_downloads())

Der Task wird sofort eingeplant, wenn create_task() aufgerufen wird. Sie müssen ihn nicht sofort awaiten.

async def process_data():
    # Start background tasks
    task1 = asyncio.create_task(fetch_from_api_1())
    task2 = asyncio.create_task(fetch_from_api_2())
 
    # Do some work while tasks run in background
    local_data = prepare_local_data()
 
    # Now wait for the background tasks
    api_data_1 = await task1
    api_data_2 = await task2
 
    # Combine all data
    return combine_data(local_data, api_data_1, api_data_2)

Sie können Tasks auch für besseres Debugging benennen:

async def main():
    task = asyncio.create_task(
        long_running_operation(),
        name="long-operation-task"
    )
 
    # Task name is accessible
    print(f"Task name: {task.get_name()}")
 
    await task

asyncio.gather() zur Ausführung mehrerer Coroutines

Die Funktion asyncio.gather() führt mehrere Coroutines nebenläufig aus und wartet, bis alle abgeschlossen sind. Es ist sauberer als die Erstellung individueller Tasks, wenn Sie viele Coroutines ausführen müssen.

import asyncio
 
async def fetch_user(user_id):
    await asyncio.sleep(1)
    return {"id": user_id, "name": f"User {user_id}"}
 
async def main():
    # Run multiple coroutines concurrently
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
        fetch_user(4),
        fetch_user(5)
    )
 
    print(results)
    # Output: [{'id': 1, 'name': 'User 1'}, {'id': 2, 'name': 'User 2'}, ...]
 
asyncio.run(main())

gather() gibt Ergebnisse in der gleichen Reihenfolge wie die Input-Coroutines zurück, unabhängig davon, welche zuerst abschließt.

async def task_with_delay(delay, value):
    await asyncio.sleep(delay)
    return value
 
async def main():
    results = await asyncio.gather(
        task_with_delay(3, "slow"),
        task_with_delay(1, "fast"),
        task_with_delay(2, "medium")
    )
 
    print(results)
    # Output: ['slow', 'fast', 'medium'] (order preserved)
 
asyncio.run(main())

Fehlerbehandlung mit gather()

Standardmäßig wirft gather() sofort eine Exception, wenn eine Coroutine eine Exception wirft, und bricht die verbleibenden Tasks ab.

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("Task failed!")
 
async def successful_task():
    await asyncio.sleep(2)
    return "success"
 
async def main():
    try:
        results = await asyncio.gather(
            failing_task(),
            successful_task()
        )
    except ValueError as e:
        print(f"Error caught: {e}")
        # The successful_task is cancelled
 
asyncio.run(main())

Verwenden Sie return_exceptions=True, um Exceptions zusammen mit erfolgreichen Ergebnissen zu sammeln:

async def main():
    results = await asyncio.gather(
        failing_task(),
        successful_task(),
        return_exceptions=True
    )
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

Dynamisches gather mit Listenkomprehension

gather() funktioniert hervorragend mit dynamisch generierten Coroutines:

async def process_item(item):
    await asyncio.sleep(1)
    return item * 2
 
async def main():
    items = [1, 2, 3, 4, 5]
 
    # Process all items concurrently
    results = await asyncio.gather(
        *[process_item(item) for item in items]
    )
 
    print(results)  # [2, 4, 6, 8, 10]
 
asyncio.run(main())

asyncio.wait() und asyncio.as_completed()

Während gather() auf alle Coroutines wartet, bieten wait() und as_completed() granulärere Kontrolle darüber, wie Sie abgeschlossene Tasks behandeln.

asyncio.wait()

wait() ermöglicht es Ihnen, auf Tasks mit verschiedenen Abschlussbedingungen zu warten: alle Tasks, erster Task oder erste Exception.

import asyncio
 
async def task(delay, name):
    await asyncio.sleep(delay)
    return f"{name} completed"
 
async def main():
    tasks = [
        asyncio.create_task(task(1, "Task 1")),
        asyncio.create_task(task(2, "Task 2")),
        asyncio.create_task(task(3, "Task 3"))
    ]
 
    # Wait for all tasks to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )
 
    for task in done:
        print(task.result())
 
asyncio.run(main())

Unterschiedliche Abschlussbedingungen:

async def main():
    tasks = [
        asyncio.create_task(task(1, "Fast")),
        asyncio.create_task(task(3, "Slow")),
        asyncio.create_task(task(2, "Medium"))
    ]
 
    # Return when the first task completes
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
 
    print(f"First completed: {done.pop().result()}")
    print(f"Still pending: {len(pending)} tasks")
 
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
 
asyncio.run(main())

asyncio.as_completed()

as_completed() gibt einen Iterator zurück, der Tasks ausgibt, sobald sie abgeschlossen sind, was es Ihnen ermöglicht, Ergebnisse zu verarbeiten, sobald sie verfügbar sind.

import asyncio
 
async def fetch_data(url_id, delay):
    await asyncio.sleep(delay)
    return f"Data from URL {url_id}"
 
async def main():
    tasks = [
        fetch_data(1, 3),
        fetch_data(2, 1),
        fetch_data(3, 2)
    ]
 
    # Process results as they complete
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Received: {result}")
 
asyncio.run(main())

Die Ausgabe zeigt Ergebnisse in Abschlussreihenfolge, nicht in Einreichungsreihenfolge:

Received: Data from URL 2
Received: Data from URL 3
Received: Data from URL 1

Dies ist besonders nützlich, wenn Sie Ergebnisse so schnell wie möglich an Nutzer anzeigen möchten:

async def search_engine(query, engine_name, delay):
    await asyncio.sleep(delay)
    return f"{engine_name}: Results for '{query}'"
 
async def main():
    query = "python asyncio"
 
    searches = [
        search_engine(query, "Google", 1.5),
        search_engine(query, "Bing", 2.0),
        search_engine(query, "DuckDuckGo", 1.0)
    ]
 
    print("Searching...")
    for search in asyncio.as_completed(searches):
        result = await search
        print(result)  # Display each result as soon as it arrives
 
asyncio.run(main())

asyncio.sleep() vs time.sleep()

Diese Unterscheidung ist kritisch für die Korrektheit von async-Code.

time.sleep() ist eine blockierende Operation, die den gesamten Thread pausiert, einschließlich der Event Loop. Dies verhindert, dass alle async-Tasks laufen.

asyncio.sleep() ist eine nicht-blockierende Coroutine, die nur den aktuellen Task pausiert und anderen Tasks erlaubt zu laufen.

import asyncio
import time
 
async def blocking_example():
    """Bad: Uses time.sleep() - blocks the entire event loop"""
    print("Starting blocking sleep")
    time.sleep(2)  # WRONG: Blocks everything!
    print("Finished blocking sleep")
 
async def non_blocking_example():
    """Good: Uses asyncio.sleep() - allows other tasks to run"""
    print("Starting non-blocking sleep")
    await asyncio.sleep(2)  # CORRECT: Only pauses this task
    print("Finished non-blocking sleep")
 
async def concurrent_task():
    for i in range(3):
        print(f"Concurrent task running: {i}")
        await asyncio.sleep(0.5)
 
async def demo_blocking():
    print("\n=== Blocking example (BAD) ===")
    await asyncio.gather(
        blocking_example(),
        concurrent_task()
    )
 
async def demo_non_blocking():
    print("\n=== Non-blocking example (GOOD) ===")
    await asyncio.gather(
        non_blocking_example(),
        concurrent_task()
    )
 
asyncio.run(demo_blocking())
asyncio.run(demo_non_blocking())

Im blockierenden Beispiel läuft der nebenläufige Task nicht, bis time.sleep() abgeschlossen ist. Im nicht-blockierenden Beispiel laufen beide Tasks nebenläufig.

Faustregel: Verwenden Sie niemals time.sleep() in async-Code. Verwenden Sie immer await asyncio.sleep().

Für CPU-bound Operationen, die Sie nicht vermeiden können, verwenden Sie loop.run_in_executor(), um sie in einem separaten Thread oder Prozess laufen zu lassen:

import asyncio
import time
 
def cpu_intensive_task():
    """Some blocking CPU work"""
    time.sleep(2)  # Simulate heavy computation
    return "CPU task completed"
 
async def main():
    loop = asyncio.get_event_loop()
 
    # Run blocking task in a thread pool
    result = await loop.run_in_executor(None, cpu_intensive_task)
    print(result)
 
asyncio.run(main())

async for und async with

Python bietet asynchrone Versionen von for-Schleifen und Kontextmanagern für die Arbeit mit asynchronen Iterables und Ressourcen.

Asynchrone Iteratoren (async for)

Ein asynchroner Iterator ist ein Objekt, das __aiter__() und __anext__() Methoden implementiert, was es Ihnen ermöglicht, über Items zu iterieren, deren Abruf async-Operationen erfordert.

import asyncio
 
class AsyncRange:
    """An async iterator that yields numbers with delays"""
    def __init__(self, start, end):
        self.current = start
        self.end = end
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
 
        # Simulate async operation to get next value
        await asyncio.sleep(0.5)
        value = self.current
        self.current += 1
        return value
 
async def main():
    async for number in AsyncRange(1, 5):
        print(f"Got number: {number}")
 
asyncio.run(main())

Praxisbeispiel mit asynchronem Datenbank-Cursor:

class AsyncDatabaseCursor:
    """Simulated async database cursor"""
    def __init__(self, query):
        self.query = query
        self.results = []
        self.index = 0
 
    async def execute(self):
        # Simulate database query
        await asyncio.sleep(1)
        self.results = [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"},
            {"id": 3, "name": "Charlie"}
        ]
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.results):
            raise StopAsyncIteration
 
        # Simulate fetching next row
        await asyncio.sleep(0.1)
        row = self.results[self.index]
        self.index += 1
        return row
 
async def fetch_users():
    cursor = AsyncDatabaseCursor("SELECT * FROM users")
    await cursor.execute()
 
    async for row in cursor:
        print(f"User: {row['name']}")
 
asyncio.run(fetch_users())

Asynchrone Kontextmanager (async with)

Ein asynchroner Kontextmanager implementiert __aenter__() und __aexit__() Methoden für das Management von Ressourcen, die async-Setup und -Cleanup erfordern.

import asyncio
 
class AsyncDatabaseConnection:
    """An async context manager for database connections"""
 
    async def __aenter__(self):
        print("Opening database connection...")
        await asyncio.sleep(1)  # Simulate connection time
        print("Database connection opened")
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        await asyncio.sleep(0.5)  # Simulate cleanup
        print("Database connection closed")
 
    async def query(self, sql):
        await asyncio.sleep(0.5)
        return f"Results for: {sql}"
 
async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(result)
    # Connection automatically closed after the with block
 
asyncio.run(main())

Kombination asynchroner Kontextmanager mit asynchronen Iteratoren:

class AsyncFileReader:
    """Async context manager and iterator for reading files"""
 
    def __init__(self, filename):
        self.filename = filename
        self.lines = []
        self.index = 0
 
    async def __aenter__(self):
        # Simulate async file opening
        await asyncio.sleep(0.5)
        self.lines = ["Line 1", "Line 2", "Line 3"]
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.2)
        self.lines = []
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.index >= len(self.lines):
            raise StopAsyncIteration
 
        await asyncio.sleep(0.1)
        line = self.lines[self.index]
        self.index += 1
        return line
 
async def read_file():
    async with AsyncFileReader("data.txt") as reader:
        async for line in reader:
            print(line)
 
asyncio.run(read_file())

asyncio.Queue für Producer-Consumer-Patterns

asyncio.Queue ist eine thread-sichere, async-fähige Queue, die perfekt für die Koordinierung von Arbeit zwischen Producer- und Consumer-Coroutines ist.

import asyncio
import random
 
async def producer(queue, producer_id):
    """Produces items and puts them in the queue"""
    for i in range(5):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
 
async def consumer(queue, consumer_id):
    """Consumes items from the queue"""
    while True:
        item = await queue.get()
        print(f"Consumer {consumer_id} consuming: {item}")
 
        # Simulate processing time
        await asyncio.sleep(random.uniform(0.2, 0.8))
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue(maxsize=10)
 
    # Create producers and consumers
    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
 
    # Wait for all producers to finish
    await asyncio.gather(*producers)
 
    # Wait for the queue to be fully processed
    await queue.join()
 
    # Cancel consumers (they run forever)
    for consumer_task in consumers:
        consumer_task.cancel()
 
asyncio.run(main())

Praxisbeispiel: Web-Scraper mit URL-Queue:

import asyncio
from typing import Set
 
async def fetch_url(session, url):
    """Simulate fetching a URL"""
    await asyncio.sleep(1)
    return f"Content from {url}"
 
async def producer(queue: asyncio.Queue, start_urls: list):
    """Add URLs to the queue"""
    for url in start_urls:
        await queue.put(url)
 
async def consumer(queue: asyncio.Queue, visited: Set[str]):
    """Fetch URLs from the queue"""
    while True:
        url = await queue.get()
 
        if url in visited:
            queue.task_done()
            continue
 
        visited.add(url)
        print(f"Scraping: {url}")
 
        # Simulate fetching
        content = await fetch_url(None, url)
 
        # Simulate finding new URLs in the content
        # In real scraper, you'd parse HTML and extract links
        new_urls = []  # Would extract from content
 
        for new_url in new_urls:
            if new_url not in visited:
                await queue.put(new_url)
 
        queue.task_done()
 
async def main():
    queue = asyncio.Queue()
    visited = set()
 
    start_urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    # Add initial URLs
    await producer(queue, start_urls)
 
    # Create consumers
    consumers = [
        asyncio.create_task(consumer(queue, visited))
        for _ in range(3)
    ]
 
    # Wait for all URLs to be processed
    await queue.join()
 
    # Cancel consumers
    for consumer_task in consumers:
        consumer_task.cancel()
 
    print(f"Scraped {len(visited)} unique URLs")
 
asyncio.run(main())

Semaphores für Rate Limiting

asyncio.Semaphore steuert die Anzahl der Coroutines, die gleichzeitig auf eine Ressource zugreifen können. Dies ist essenziell für das Rate Limiting von API-Aufrufen oder das Limitieren gleichzeitiger Datenbankverbindungen.

import asyncio
import time
 
async def call_api(semaphore, api_id):
    """Make an API call with rate limiting"""
    async with semaphore:
        print(f"API call {api_id} started")
        await asyncio.sleep(1)  # Simulate API call
        print(f"API call {api_id} completed")
        return f"Result {api_id}"
 
async def main():
    # Allow only 3 concurrent API calls
    semaphore = asyncio.Semaphore(3)
 
    start = time.time()
 
    # Create 10 API calls
    tasks = [call_api(semaphore, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
 
    elapsed = time.time() - start
    print(f"\nCompleted {len(results)} API calls in {elapsed:.2f} seconds")
    # With semaphore(3): ~4 seconds (10 calls in batches of 3)
    # Without semaphore: ~1 second (all concurrent)
 
asyncio.run(main())

Rate Limiting zur Einhaltung von API-Kontingenten:

import asyncio
from datetime import datetime
 
class RateLimiter:
    """Rate limiter that allows N requests per time period"""
 
    def __init__(self, max_requests, time_period):
        self.max_requests = max_requests
        self.time_period = time_period
        self.semaphore = asyncio.Semaphore(max_requests)
        self.request_times = []
 
    async def __aenter__(self):
        await self.semaphore.acquire()
 
        # Wait if we've hit the rate limit
        while len(self.request_times) >= self.max_requests:
            oldest = self.request_times[0]
            elapsed = datetime.now().timestamp() - oldest
 
            if elapsed < self.time_period:
                sleep_time = self.time_period - elapsed
                await asyncio.sleep(sleep_time)
 
            self.request_times.pop(0)
 
        self.request_times.append(datetime.now().timestamp())
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.semaphore.release()
 
async def fetch_with_rate_limit(rate_limiter, item_id):
    async with rate_limiter:
        print(f"Fetching item {item_id} at {datetime.now()}")
        await asyncio.sleep(0.5)
        return f"Item {item_id}"
 
async def main():
    # Allow 5 requests per 2 seconds
    rate_limiter = RateLimiter(max_requests=5, time_period=2)
 
    # Make 15 requests
    tasks = [fetch_with_rate_limit(rate_limiter, i) for i in range(15)]
    results = await asyncio.gather(*tasks)
 
    print(f"Completed {len(results)} requests")
 
asyncio.run(main())

aiohttp für asynchrone HTTP-Anfragen

Die Bibliothek aiohttp bietet async HTTP Client- und Server-Funktionalität. Es ist die Standardwahl für HTTP-Requests in async-Code.

import asyncio
import aiohttp
 
async def fetch_url(session, url):
    """Fetch a single URL"""
    async with session.get(url) as response:
        return await response.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch_url(session, "https://api.github.com")
        print(f"Fetched {len(html)} characters")
 
asyncio.run(main())

Gleichzeitiges Abrufen mehrerer URLs:

import asyncio
import aiohttp
import time
 
async def fetch_url(session, url):
    """Fetch URL and return status code and content length"""
    async with session.get(url) as response:
        content = await response.text()
        return {
            "url": url,
            "status": response.status,
            "length": len(content)
        }
 
async def fetch_all_urls(urls):
    """Fetch multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
 
async def main():
    urls = [
        "https://api.github.com",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404"
    ]
 
    start = time.time()
    results = await fetch_all_urls(urls)
    elapsed = time.time() - start
 
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
 
    print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
 
asyncio.run(main())

Praktisches Beispiel mit Fehlerbehandlung und Retries:

import asyncio
import aiohttp
from typing import Optional
 
async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Optional[dict]:
    """Fetch URL with retry logic"""
 
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                if response.status == 200:
                    data = await response.json()
                    return data
                elif response.status == 404:
                    print(f"URL not found: {url}")
                    return None
                else:
                    print(f"Unexpected status {response.status} for {url}")
 
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1} for {url}")
 
        except aiohttp.ClientError as e:
            print(f"Client error on attempt {attempt + 1} for {url}: {e}")
 
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
 
    print(f"Failed to fetch {url} after {max_retries} attempts")
    return None
 
async def main():
    urls = [
        "https://api.github.com/users/github",
        "https://api.github.com/users/nonexistent",
        "https://httpbin.org/delay/5"
    ]
 
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
 
        successful = [r for r in results if r is not None]
        print(f"\nSuccessfully fetched {len(successful)}/{len(urls)} URLs")
 
asyncio.run(main())

aiofiles für asynchrone Datei-I/O

Die Bibliothek aiofiles bietet asynchrone Dateioperationen, die Blockieren während Datei-Lese- und -Schreiboperationen verhindert.

import asyncio
import aiofiles
 
async def read_file(filename):
    """Read file asynchronously"""
    async with aiofiles.open(filename, mode='r') as f:
        contents = await f.read()
        return contents
 
async def write_file(filename, content):
    """Write file asynchronously"""
    async with aiofiles.open(filename, mode='w') as f:
        await f.write(content)
 
async def main():
    # Write data
    await write_file('test.txt', 'Hello, async file I/O!')
 
    # Read data
    content = await read_file('test.txt')
    print(f"Read: {content}")
 
asyncio.run(main())

Gleichzeitige Verarbeitung mehrerer Dateien:

import asyncio
import aiofiles
 
async def process_file(input_file, output_file):
    """Read, process, and write a file"""
    async with aiofiles.open(input_file, mode='r') as f:
        content = await f.read()
 
    # Process content (convert to uppercase)
    processed = content.upper()
 
    async with aiofiles.open(output_file, mode='w') as f:
        await f.write(processed)
 
    return f"Processed {input_file} -> {output_file}"
 
async def main():
    files = [
        ('input1.txt', 'output1.txt'),
        ('input2.txt', 'output2.txt'),
        ('input3.txt', 'output3.txt')
    ]
 
    tasks = [process_file(inp, out) for inp, out in files]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        print(result)
 
asyncio.run(main())

Zeilenweises Lesen großer Dateien:

import asyncio
import aiofiles
 
async def process_large_file(filename):
    """Process a large file line by line without loading it all into memory"""
    line_count = 0
 
    async with aiofiles.open(filename, mode='r') as f:
        async for line in f:
            # Process each line
            line_count += 1
            if line.strip():
                # Do something with the line
                pass
 
    return line_count
 
async def main():
    count = await process_large_file('large_data.txt')
    print(f"Processed {count} lines")
 
asyncio.run(main())

Fehlerbehandlung in asynchronem Code

Die Fehlerbehandlung in async-Code erfordert besondere Aufmerksamkeit, um sicherzustellen, dass Exceptions ordnungsgemäß abgefangen und Ressourcen bereinigt werden.

import asyncio
 
async def risky_operation(item_id):
    """An operation that might fail"""
    await asyncio.sleep(1)
 
    if item_id % 2 == 0:
        raise ValueError(f"Item {item_id} caused an error")
 
    return f"Item {item_id} processed"
 
async def handle_with_try_except():
    """Handle errors with try/except"""
    try:
        result = await risky_operation(2)
        print(result)
    except ValueError as e:
        print(f"Error caught: {e}")
 
asyncio.run(handle_with_try_except())

Fehlerbehandlung in nebenläufigen Tasks:

async def safe_operation(item_id):
    """Wrapper that catches errors from risky_operation"""
    try:
        result = await risky_operation(item_id)
        return {"success": True, "result": result}
    except Exception as e:
        return {"success": False, "error": str(e), "item_id": item_id}
 
async def main():
    tasks = [safe_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
 
    for result in results:
        if result["success"]:
            print(f"Success: {result['result']}")
        else:
            print(f"Failed: Item {result['item_id']} - {result['error']}")
 
asyncio.run(main())

Verwendung von gather() mit return_exceptions=True:

async def main():
    tasks = [risky_operation(i) for i in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
 
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
 
asyncio.run(main())

Bereinigung mit asynchronen Kontextmanagern:

class AsyncResource:
    """A resource that needs cleanup even if errors occur"""
 
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(0.5)
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Cleaning up resource")
        await asyncio.sleep(0.5)
 
        if exc_type is not None:
            print(f"Exception during resource use: {exc_val}")
 
        # Return False to propagate exception, True to suppress
        return False
 
    async def do_work(self):
        await asyncio.sleep(1)
        raise ValueError("Something went wrong")
 
async def main():
    try:
        async with AsyncResource() as resource:
            await resource.do_work()
    except ValueError as e:
        print(f"Caught error: {e}")
 
asyncio.run(main())

Leistungsbenchmarks: Sync vs Async für I/O-bound Tasks

Vergleichen wir synchrone und asynchrone Ansätze für I/O-bound Operationen mit echten Benchmarks.

import asyncio
import time
import requests
import aiohttp
 
# Synchronous approach
def fetch_sync(url):
    response = requests.get(url)
    return len(response.text)
 
def benchmark_sync(urls):
    start = time.time()
    results = [fetch_sync(url) for url in urls]
    elapsed = time.time() - start
    return elapsed, results
 
# Asynchronous approach
async def fetch_async(session, url):
    async with session.get(url) as response:
        text = await response.text()
        return len(text)
 
async def benchmark_async(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_async(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    return elapsed, results
 
# Run benchmarks
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1"
]
 
print("Benchmarking synchronous approach...")
sync_time, sync_results = benchmark_sync(urls)
print(f"Synchronous: {sync_time:.2f} seconds")
 
print("\nBenchmarking asynchronous approach...")
async_time, async_results = asyncio.run(benchmark_async(urls))
print(f"Asynchronous: {async_time:.2f} seconds")
 
print(f"\nSpeedup: {sync_time / async_time:.2f}x")

Typische Ergebnisse für 5 URLs mit jeweils 1 Sekunde Verzögerung:

  • Synchron: ~5 Sekunden (sequentielle Ausführung)
  • Asynchron: ~1 Sekunde (nebenläufige Ausführung)
  • Beschleunigung: ~5x
Anzahl URLsSync-ZeitAsync-ZeitBeschleunigung
55.2s1.1s4.7x
1010.4s1.2s8.7x
2020.8s1.4s14.9x
5052.1s2.1s24.8x
100104.5s3.8s27.5x

Die Beschleunigung nimmt mit der Anzahl der nebenläufigen Operationen zu, bis Sie Bandbreiten- oder Rate-Limiting-Beschränkungen erreichen.

Häufige Fallstricke und wie man sie vermeidet

Vergessen von await

# WRONG: Coroutine is created but not executed
async def wrong():
    result = fetch_data()  # Missing await!
    print(result)  # Prints coroutine object, not the result
 
# CORRECT: Await the coroutine
async def correct():
    result = await fetch_data()
    print(result)  # Prints actual result

Blockieren der Event Loop

# WRONG: Blocks the entire event loop
async def blocking_code():
    time.sleep(5)  # Blocks everything!
    result = compute_something()  # Blocks if CPU-intensive
    return result
 
# CORRECT: Use asyncio.sleep and run_in_executor
async def non_blocking_code():
    await asyncio.sleep(5)  # Only pauses this task
 
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, compute_something)
    return result

Nichtbehandlung der Task-Abbruchanforderung (Cancellation)

# WRONG: Doesn't handle cancellation
async def incomplete_cleanup():
    resource = await acquire_resource()
    await long_operation()
    await release_resource(resource)  # May not execute if cancelled
 
# CORRECT: Use try/finally or async with
async def proper_cleanup():
    resource = await acquire_resource()
    try:
        await long_operation()
    finally:
        await release_resource(resource)  # Always executes

Erzeugen von Event-Loop-Konflikten

# WRONG: Calling asyncio.run() in an async function
async def wrong_nesting():
    result = asyncio.run(some_coroutine())  # Error!
    return result
 
# CORRECT: Just await the coroutine
async def correct_nesting():
    result = await some_coroutine()
    return result

Nichtbegrenzung der Nebenläufigkeit

# WRONG: Launches too many tasks simultaneously
async def unlimited_concurrency(items):
    tasks = [process_item(item) for item in items]  # 10,000 tasks!
    return await asyncio.gather(*tasks)
 
# CORRECT: Use semaphore to limit concurrency
async def limited_concurrency(items):
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
 
    async def process_with_limit(item):
        async with semaphore:
            return await process_item(item)
 
    tasks = [process_with_limit(item) for item in items]
    return await asyncio.gather(*tasks)

Praxisbeispiele

Web-Scraping mit Rate Limiting

import asyncio
import aiohttp
from bs4 import BeautifulSoup
 
class AsyncWebScraper:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
 
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
 
    async def fetch_page(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    return await response.text()
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                return None
 
    async def scrape_urls(self, urls):
        tasks = [self.fetch_page(url) for url in urls]
        return await asyncio.gather(*tasks)
 
async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3"
    ]
 
    async with AsyncWebScraper(max_concurrent=3) as scraper:
        pages = await scraper.scrape_urls(urls)
 
        for url, html in zip(urls, pages):
            if html:
                print(f"Scraped {url}: {len(html)} bytes")
 
asyncio.run(main())

Asynchrone API-Datenpipeline

import asyncio
import aiohttp
 
async def fetch_user_ids(session):
    """Fetch list of user IDs from API"""
    async with session.get("https://api.example.com/users") as response:
        data = await response.json()
        return [user["id"] for user in data["users"]]
 
async def fetch_user_details(session, user_id):
    """Fetch detailed info for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}") as response:
        return await response.json()
 
async def fetch_user_posts(session, user_id):
    """Fetch posts for a user"""
    async with session.get(f"https://api.example.com/users/{user_id}/posts") as response:
        return await response.json()
 
async def process_user(session, user_id):
    """Fetch all data for a user concurrently"""
    details, posts = await asyncio.gather(
        fetch_user_details(session, user_id),
        fetch_user_posts(session, user_id)
    )
 
    return {
        "user": details,
        "posts": posts,
        "post_count": len(posts)
    }
 
async def main():
    async with aiohttp.ClientSession() as session:
        # Fetch user IDs
        user_ids = await fetch_user_ids(session)
 
        # Process all users concurrently
        user_data = await asyncio.gather(
            *[process_user(session, uid) for uid in user_ids[:10]]
        )
 
        for data in user_data:
            print(f"User {data['user']['name']}: {data['post_count']} posts")
 
asyncio.run(main())

Asynchroner Chat-Server

import asyncio
 
class ChatServer:
    def __init__(self):
        self.clients = set()
 
    async def handle_client(self, reader, writer):
        """Handle a single client connection"""
        addr = writer.get_extra_info('peername')
        print(f"Client connected: {addr}")
 
        self.clients.add(writer)
 
        try:
            while True:
                data = await reader.read(100)
                if not data:
                    break
 
                message = data.decode()
                print(f"Received from {addr}: {message}")
 
                # Broadcast to all clients
                await self.broadcast(f"{addr}: {message}", writer)
 
        except asyncio.CancelledError:
            pass
 
        finally:
            print(f"Client disconnected: {addr}")
            self.clients.remove(writer)
            writer.close()
            await writer.wait_closed()
 
    async def broadcast(self, message, sender):
        """Send message to all clients except sender"""
        for client in self.clients:
            if client != sender:
                try:
                    client.write(message.encode())
                    await client.drain()
                except Exception as e:
                    print(f"Error broadcasting: {e}")
 
    async def start(self, host='127.0.0.1', port=8888):
        """Start the chat server"""
        server = await asyncio.start_server(
            self.handle_client,
            host,
            port
        )
 
        addr = server.sockets[0].getsockname()
        print(f"Chat server running on {addr}")
 
        async with server:
            await server.serve_forever()
 
async def main():
    chat_server = ChatServer()
    await chat_server.start()
 
asyncio.run(main())

Experimentieren mit Asyncio in Jupyter

Bei der Arbeit mit asyncio in Jupyter Notebooks können Sie auf Event-Loop-Konflikte stoßen. Jupyter führt bereits eine Event Loop aus, was mit asyncio.run() interferieren kann.

Für nahtloses asynchrones Experimentieren in Jupyter-Umgebungen erwägen Sie die Verwendung von RunCell (opens in a new tab), einem speziell für Jupyter Notebooks entwickelten AI-Agenten. RunCell verwaltet das Event-Loop-Management automatisch und bietet erweiterte Async-Debugging-Fähigkeiten, was es Ihnen ermöglicht, interaktiv asyncio-Patterns zu testen, ohne Konflikte.

Im Standard-Jupyter können Sie Top-Level await verwenden:

# In Jupyter, this works directly
async def fetch_data():
    await asyncio.sleep(1)
    return "data"
 
# Just await directly, no asyncio.run() needed
result = await fetch_data()
print(result)

Oder verwenden Sie das Paket nest_asyncio, um verschachtelte Event Loops zu erlauben:

import nest_asyncio
nest_asyncio.apply()
 
# Now asyncio.run() works in Jupyter
asyncio.run(main())

FAQ

Fazit

Python asyncio transformiert I/O-bound Anwendungen von langsamen, blockierenden Operationen in schnelle, nebenläufige Systeme. Indem Sie die async/await-Syntax beherrschen, die Event Loop verstehen und Tools wie gather(), create_task() und Semaphores nutzen, können Sie Anwendungen bauen, die Tausende von nebenläufigen Operationen effizient verarbeiten.

Der Schlüssel zum Erfolg mit asyncio ist die Erkenntnis, wann es das richtige Werkzeug ist. Verwenden Sie es für Netzwerk-Requests, Datenbankabfragen, Dateioperationen und jede Aufgabe, die Zeit damit verbringt, auf externe Ressourcen zu warten. Vermeiden Sie es, die Event Loop mit synchronen Operationen zu blockieren, verwenden Sie immer await mit Coroutines, und begrenzen Sie bei Bedarf die Nebenläufigkeit mit Semaphores.

Beginnen Sie damit, kleine Abschnitte Ihrer Codebasis auf async umzustellen, messen Sie die Leistungsverbesserung, und erweitern Sie schrittlich. Die dramatischen Geschwindigkeitssteigerungen in I/O-intensiven Anwendungen machen die Lerninvestition wertvoll.

📚