Python Threading: Kompletter Leitfaden zum Multithreading mit Beispielen
Updated on
Ihr Python-Programm führt 50 API-Aufrufe durch, einen nach dem anderen. Jeder Aufruf wartet 200 Millisekunden. Die Rechnung ist brutal: 10 Sekunden des Programmlebens werden verschwendet mit dem Starren auf Netzwerkantworten. Ihre CPU sitzt untätig bei nahezu Null-Auslastung, während Ihr Skript durch I/O-gebundene Operationen kriecht, die gleichzeitig laufen könnten.
Dieses Problem eskaliert schnell. Web-Scraper, die tausende Seiten sequenziell abrufen. Dateiverarbeitungsskripte, die eine Datei nach der anderen lesen und schreiben. Datenbankabfragen, die die gesamte Anwendung blockieren, während sie auf Ergebnisse warten. Jede Sekunde des untätigen Wartens ist eine Sekunde, in der Ihr Programm nützliche Arbeit verrichten könnte.
Pythons threading-Modul löst dies, indem es mehrere Operationen gleichzeitig innerhalb eines einzigen Prozesses ausführt. Threads teilen sich den Speicher, starten schnell und zeichnen sich bei I/O-gebundenen Workloads aus, bei denen das Programm die meiste Zeit wartet. Dieser Leitfaden deckt alles ab von der grundlegenden Thread-Erstellung bis zu fortgeschrittenen Synchronisationsmustern, mit produktionsreifen Code-Beispielen, die Sie sofort einsetzen können.
Was ist Threading in Python?
Threading ermöglicht einem Programm, mehrere Operationen gleichzeitig innerhalb desselben Prozesses auszuführen. Jeder Thread teilt denselben Speicherbereich, was die Kommunikation zwischen Threads schnell und unkompliziert macht.
Pythons threading-Modul bietet eine High-Level-Schnittstelle zum Erstellen und Verwalten von Threads. Aber es gibt einen wichtigen Vorbehalt: der Global Interpreter Lock (GIL).
Der Global Interpreter Lock (GIL)
Der GIL ist ein Mutex in CPython, der nur einem Thread erlaubt, Python-Bytecode zur gleichen Zeit auszuführen. Das bedeutet, dass Threads keine echte Parallelität für CPU-gebundene Operationen erreichen können. Allerdings gibt der GIL während I/O-Operationen frei (Netzwerkaufrufe, Datei-Lesevorgänge, Datenbankabfragen), was anderen Threads erlaubt zu laufen, während einer auf I/O wartet.
import threading
import time
def cpu_bound(n):
"""CPU-bound: GIL prevents parallel execution"""
total = 0
for i in range(n):
total += i * i
return total
def io_bound(url):
"""I/O-bound: GIL releases during network wait"""
import urllib.request
return urllib.request.urlopen(url).read()
# CPU-bound: 4 threads run one-at-a-time (no speedup)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound with threads: {time.time() - start:.2f}s")
# I/O-bound: 4 threads overlap their waiting time (big speedup)Das bedeutet, dass Threading ideal für I/O-gebundene Aufgaben ist, aber nicht für rechenintensive Berechnungen. Für CPU-gebundene Arbeit verwenden Sie stattdessen das multiprocessing-Modul.
Wann sollte man Threading vs Multiprocessing vs Asyncio verwenden
| Feature | threading | multiprocessing | asyncio |
|---|---|---|---|
| Best geeignet für | I/O-gebundene Aufgaben | CPU-gebundene Aufgaben | Hochgradig gleichzeitiges I/O |
| Parallelität | Gleichzeitig (GIL-beschränkt) | Echte Parallelität | Gleichzeitig (Single Thread) |
| Speicher | Geteilt (leichtgewichtig) | Separat pro Prozess | Geteilt (leichtgewichtig) |
| Startkosten | Gering (~1ms) | Hoch (~50-100ms) | Sehr gering |
| Kommunikation | Direkter Speicherzugriff | Pipes, Queues, Shared Memory | Awaitable Coroutines |
| Skalierbarkeit | Zehn- bis Hunderte von Threads | Begrenzt durch CPU-Kerne | Tausende von Coroutines |
| Komplexität | Mittel (Locking erforderlich) | Mittel (Serialisierung) | Hoch (async/await-Syntax) |
| Anwendungsfall | Web Scraping, Datei-I/O, API-Aufrufe | Datenverarbeitung, ML-Training | Web-Server, Chat-Apps |
Faustregel: Wenn Ihr Programm auf Netzwerk oder Festplatte wartet, verwenden Sie Threading. Wenn es Zahlen crunched, verwenden Sie Multiprocessing. Wenn Sie tausende gleichzeitige Verbindungen benötigen, verwenden Sie Asyncio.
Thread-Grundlagen: Erstellen und Ausführen von Threads
Die threading.Thread-Klasse
Der einfachste Weg, einen Thread zu erstellen, besteht darin, eine Zielfunktion an threading.Thread zu übergeben:
import threading
import time
def download_file(filename):
print(f"[{threading.current_thread().name}] Downloading {filename}...")
time.sleep(2) # Simulate download
print(f"[{threading.current_thread().name}] Finished {filename}")
# Create threads
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
# Start threads
t1.start()
t2.start()
# Wait for both to finish
t1.join()
t2.join()
print("All downloads complete")Beide Downloads laufen gleichzeitig und sind in etwa 2 Sekunden fertig statt in 4.
start() und join()
start()beginnt die Thread-Ausführung. Ein Thread kann nur einmal gestartet werden.join(timeout=None)blockiert den aufrufenden Thread, bis der Ziel-Thread fertig ist. Übergeben Sie eintimeoutin Sekunden, um ewiges Warten zu vermeiden.
import threading
import time
def slow_task():
time.sleep(10)
t = threading.Thread(target=slow_task)
t.start()
# Wait at most 3 seconds
t.join(timeout=3)
if t.is_alive():
print("Thread still running after 3 seconds")
else:
print("Thread finished")Threads benennen
Benannte Threads erleichtern das Debugging:
import threading
def worker():
name = threading.current_thread().name
print(f"Running in thread: {name}")
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()Daemon-Threads
Daemon-Threads sind Hintergrund-Threads, die automatisch beendet werden, wenn das Hauptprogramm endet. Nicht-Daemon-Threads halten das Programm am Leben, bis sie fertig sind.
import threading
import time
def background_monitor():
while True:
print("Monitoring system health...")
time.sleep(5)
# Daemon thread: dies when main program exits
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
# Main program does its work
time.sleep(12)
print("Main program exiting")
# monitor thread is killed automaticallyVerwenden Sie Daemon-Threads für Hintergrund-Logging, Monitoring oder Cleanup-Aufgaben, die das Programmende nicht verhindern sollten.
Thread subklassieren
Für komplexeres Thread-Verhalten subklassieren Sie threading.Thread:
import threading
import time
class FileProcessor(threading.Thread):
def __init__(self, filepath):
super().__init__()
self.filepath = filepath
self.result = None
def run(self):
"""Override run() with thread logic"""
print(f"Processing {self.filepath}")
time.sleep(1) # Simulate work
self.result = f"Processed: {self.filepath}"
# Create and run
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)Argumente an Threads übergeben
Verwendung von args und kwargs
Übergeben Sie Positionsargumente mit args (ein Tuple) und Schlüsselwortargumente mit kwargs (ein Dict):
import threading
def fetch_data(url, timeout, retries=3, verbose=False):
print(f"Fetching {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
# Positional args as tuple
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
# Keyword args as dict
t2 = threading.Thread(
target=fetch_data,
args=("https://api.example.com",),
kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
t1.start()
t2.start()
t1.join()
t2.join()Häufiger Fehler: Das Vergessen des abschließenden Kommas in einem Tuple mit einem Element. args=("hello",) ist ein Tuple; args=("hello") ist nur ein String in Klammern.
Ergebnisse aus Threads sammeln
Threads geben Werte nicht direkt zurück. Verwenden Sie geteilte Datenstrukturen oder eine Liste, um Ergebnisse zu sammeln:
import threading
results = {}
lock = threading.Lock()
def compute(task_id, value):
result = value ** 2
with lock:
results[task_id] = result
threads = []
for i in range(5):
t = threading.Thread(target=compute, args=(i, i * 10))
threads.append(t)
t.start()
for t in threads:
t.join()
print(results) # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}Ein saubererer Ansatz verwendet ThreadPoolExecutor (als nächstes behandelt), der die Ergebnissammlung automatisch handhabt.
ThreadPoolExecutor: Der moderne Ansatz
Das concurrent.futures-Modul bietet ThreadPoolExecutor, eine High-Level-Schnittstelle, die einen Pool von Worker-Threads verwaltet. Es handhabt die Thread-Erstellung, Ergebnissammlung und Exception-Propagation automatisch.
Grundlegende Verwendung mit submit()
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_url(url):
time.sleep(1) # Simulate network request
return f"Content from {url}"
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
]
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks and get Future objects
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# Process results as they complete
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url}: {data}")
except Exception as e:
print(f"{url} generated an exception: {e}")Verwendung von map() für geordnete Ergebnisse
executor.map() gibt Ergebnisse in derselben Reihenfolge zurück wie die Eingabe, ähnlich wie das eingebaute map():
from concurrent.futures import ThreadPoolExecutor
def process_item(item):
return item.upper()
items = ["apple", "banana", "cherry", "date"]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, items))
print(results) # ['APPLE', 'BANANA', 'CHERRY', 'DATE']submit() vs map()
submit() | map() | |
|---|---|---|
| Gibt zurück | Future-Objekte | Iterator von Ergebnissen |
| Ergebnisreihenfolge | Abschlussreihenfolge (mit as_completed) | Eingabereihenfolge |
| Fehlerbehandlung | Pro-Task via future.result() | Wirft bei erstem Fehler |
| Argumente | Einzelner Funktionsaufruf | Wendet Funktion auf jedes Item an |
| Best geeignet für | Heterogene Aufgaben, frühe Ergebnisse | Homogene Batch-Verarbeitung |
Exception-Handling mit Futures
from concurrent.futures import ThreadPoolExecutor, as_completed
def risky_task(n):
if n == 3:
raise ValueError(f"Bad input: {n}")
return n * 10
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(risky_task, i): i for i in range(5)}
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result(timeout=5)
print(f"Task {task_id}: {result}")
except ValueError as e:
print(f"Task {task_id} failed: {e}")
except TimeoutError:
print(f"Task {task_id} timed out")Aufgaben abbrechen
from concurrent.futures import ThreadPoolExecutor
import time
def long_task(n):
time.sleep(5)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(long_task, i) for i in range(10)]
# Cancel pending tasks (already-running tasks cannot be cancelled)
for f in futures[4:]:
cancelled = f.cancel()
print(f"Cancelled: {cancelled}")Thread-Synchronisations-Primitive
Wenn mehrere Threads auf geteilte Daten zugreifen, benötigen Sie Synchronisation, um Race Conditions zu verhindern.
Lock
Ein Lock stellt sicher, dass nur ein Thread gleichzeitig einen kritischen Abschnitt betritt:
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
with self.lock: # Only one thread at a time
if self.balance >= amount:
self.balance -= amount
return True
return False
def deposit(self, amount):
with self.lock:
self.balance += amount
account = BankAccount(1000)
def make_transactions():
for _ in range(100):
account.deposit(10)
account.withdraw(10)
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Final balance: {account.balance}") # Always 1000Ohne das Lock würden gleichzeitige Lese- und Schreiboperationen falsche Ergebnisse produzieren (eine Race Condition).
RLock (Reentrant Lock)
Ein RLock kann vom selben Thread mehrfach erworben werden. Dies verhindert Deadlocks, wenn eine Funktion, die ein Lock hält, eine andere Funktion aufruft, die dasselbe Lock benötigt:
import threading
class SafeCache:
def __init__(self):
self._data = {}
self._lock = threading.RLock()
def get(self, key):
with self._lock:
return self._data.get(key)
def set(self, key, value):
with self._lock:
self._data[key] = value
def get_or_set(self, key, default):
with self._lock:
# This calls get(), which also acquires _lock
# RLock allows this; a regular Lock would deadlock
existing = self.get(key)
if existing is None:
self.set(key, default)
return default
return existingSemaphore
Ein Semaphore erlaubt einer festen Anzahl von Threads gleichzeitig Zugriff auf eine Ressource:
import threading
import time
# Allow max 3 concurrent database connections
db_semaphore = threading.Semaphore(3)
def query_database(query_id):
with db_semaphore:
print(f"Query {query_id}: connected (active connections: {3 - db_semaphore._value})")
time.sleep(2) # Simulate query
print(f"Query {query_id}: done")
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()Event
Ein Event erlaubt einem Thread, andere wartende Threads zu signalisieren:
import threading
import time
data_ready = threading.Event()
shared_data = []
def producer():
print("Producer: preparing data...")
time.sleep(3)
shared_data.extend([1, 2, 3, 4, 5])
print("Producer: data ready, signaling consumers")
data_ready.set()
def consumer(name):
print(f"Consumer {name}: waiting for data...")
data_ready.wait() # Blocks until event is set
print(f"Consumer {name}: got data = {shared_data}")
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer, args=("A",)),
threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()Condition
Eine Condition kombiniert ein Lock mit der Fähigkeit, auf eine Benachrichtigung zu warten. Es ist die Grundlage für Producer-Consumer-Muster:
import threading
import time
import random
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
def producer():
for i in range(20):
with condition:
while len(buffer) >= MAX_SIZE:
condition.wait() # Wait until space available
item = random.randint(1, 100)
buffer.append(item)
print(f"Produced: {item} (buffer size: {len(buffer)})")
condition.notify_all()
time.sleep(0.1)
def consumer(name):
for _ in range(10):
with condition:
while len(buffer) == 0:
condition.wait() # Wait until item available
item = buffer.pop(0)
print(f"Consumer {name} consumed: {item} (buffer size: {len(buffer)})")
condition.notify_all()
time.sleep(0.15)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()Zusammenfassung der Synchronisations-Primitive
| Primitive | Zweck | Wann verwenden |
|---|---|---|
Lock | Gegenseitiger Ausschluss | Geschützte, veränderliche Zustände |
RLock | Reentrant Mutex | Verschachteltes Locking im selben Thread |
Semaphore | Konkurrenz begrenzen | Rate Limiting, Connection Pools |
Event | Einmaliges Signal | Initialisierung abgeschlossen, Shutdown-Signal |
Condition | Wait/notify-Muster | Producer-Consumer, Zustandsänderungen |
Barrier | N Threads synchronisieren | Alle Threads müssen einen Punkt erreichen, bevor sie fortfahren |
Thread-sichere Datenstrukturen
queue.Queue
queue.Queue ist die bevorzugte thread-sichere Datenstruktur. Sie handhabt alle Locks intern:
import threading
import queue
import time
task_queue = queue.Queue()
results = queue.Queue()
def worker():
while True:
item = task_queue.get() # Blocks until item available
if item is None:
break
result = item ** 2
results.put(result)
task_queue.task_done()
# Start 4 workers
workers = []
for _ in range(4):
t = threading.Thread(target=worker, daemon=True)
t.start()
workers.append(t)
# Submit tasks
for i in range(20):
task_queue.put(i)
# Wait for all tasks to complete
task_queue.join()
# Stop workers
for _ in range(4):
task_queue.put(None)
for w in workers:
w.join()
# Collect results
all_results = []
while not results.empty():
all_results.append(results.get())
print(f"Results: {sorted(all_results)}")queue.Queue unterstützt auch:
Queue(maxsize=10): Blockiertput(), wenn vollPriorityQueue(): Items sortiert nach PrioritätLifoQueue(): Last-in, first-out (Stack-Verhalten)
collections.deque
collections.deque ist thread-sicher für append() und popleft() Operationen (atomic auf C-Ebene in CPython), was es zu einer schnellen Alternative für einfache Producer-Consumer-Muster macht:
from collections import deque
import threading
import time
buffer = deque(maxlen=1000)
def producer():
for i in range(100):
buffer.append(i)
time.sleep(0.01)
def consumer():
consumed = 0
while consumed < 100:
if buffer:
item = buffer.popleft()
consumed += 1
else:
time.sleep(0.01)
print(f"Consumed {consumed} items")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()Hinweis: Während einzelne append- und popleft-Operationen thread-sicher sind, ist das Prüfen von len(buffer) und anschließende Pop-Operation nicht atomar. Für volle Thread-Sicherheit verwenden Sie queue.Queue.
Häufige Threading-Muster
Producer-Consumer-Muster
Das klassische Muster zum Entkoppeln der Datenerzeugung von der Datenverarbeitung:
import threading
import queue
import time
import random
def producer(q, name, num_items):
for i in range(num_items):
item = f"{name}-item-{i}"
q.put(item)
print(f"Producer {name}: created {item}")
time.sleep(random.uniform(0.05, 0.15))
print(f"Producer {name}: done")
def consumer(q, name, stop_event):
while not stop_event.is_set() or not q.empty():
try:
item = q.get(timeout=0.5)
print(f"Consumer {name}: processing {item}")
time.sleep(random.uniform(0.1, 0.2))
q.task_done()
except queue.Empty:
continue
print(f"Consumer {name}: shutting down")
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
producers = [
threading.Thread(target=producer, args=(task_queue, "P1", 10)),
threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
task_queue.join() # Wait for all items to be processed
stop_event.set() # Signal consumers to stop
for c in consumers: c.join()Worker-Thread-Pool (manuell)
Wenn Sie mehr Kontrolle brauchen als ThreadPoolExecutor bietet:
import threading
import queue
class WorkerPool:
def __init__(self, num_workers):
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
for _ in range(num_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self.workers.append(t)
def _worker(self):
while True:
func, args, kwargs, future_id = self.task_queue.get()
if func is None:
break
try:
result = func(*args, **kwargs)
self.result_queue.put((future_id, result, None))
except Exception as e:
self.result_queue.put((future_id, None, e))
finally:
self.task_queue.task_done()
def submit(self, func, *args, **kwargs):
future_id = id(func) # Simple ID
self.task_queue.put((func, args, kwargs, future_id))
return future_id
def shutdown(self):
for _ in self.workers:
self.task_queue.put((None, None, None, None))
for w in self.workers:
w.join()
# Usage
pool = WorkerPool(4)
for i in range(10):
pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()Rate-Limited Thread-Pool
Kontrollieren Sie, wie schnell Threads externe Requests machen:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class RateLimiter:
def __init__(self, max_per_second):
self.interval = 1.0 / max_per_second
self.lock = threading.Lock()
self.last_call = 0
def wait(self):
with self.lock:
elapsed = time.time() - self.last_call
wait_time = self.interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
self.last_call = time.time()
limiter = RateLimiter(max_per_second=5)
def rate_limited_fetch(url):
limiter.wait()
print(f"Fetching {url} at {time.time():.2f}")
time.sleep(0.5) # Simulate request
return f"Data from {url}"
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(rate_limited_fetch, urls))Thread-Safety-Fallen und wie man sie vermeidet
Race Conditions
Eine Race Condition tritt auf, wenn das Ergebnis von der zeitlichen Ausführung der Threads abhängt:
import threading
# BAD: Race condition
counter = 0
def increment_unsafe():
global counter
for _ in range(100_000):
counter += 1 # Read, increment, write: NOT atomic
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}") # Often less than 500000
# GOOD: Protected with lock
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100_000):
with lock:
counter += 1
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}") # Always 500000Deadlocks
Ein Deadlock passiert, wenn zwei Threads jeweils ein Lock halten, das der andere braucht:
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a:
print("Thread 1: acquired lock_a")
with lock_b: # Waits forever if thread_2 holds lock_b
print("Thread 1: acquired lock_b")
def thread_2():
with lock_b:
print("Thread 2: acquired lock_b")
with lock_a: # Waits forever if thread_1 holds lock_a
print("Thread 2: acquired lock_a")
# This WILL deadlock
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()Wie man Deadlocks verhindert:
- Immer Locks in derselben Reihenfolge erwerben:
def thread_1_fixed():
with lock_a: # Always lock_a first
with lock_b:
print("Thread 1: acquired both locks")
def thread_2_fixed():
with lock_a: # Always lock_a first (same order)
with lock_b:
print("Thread 2: acquired both locks")- Timeouts verwenden:
def safe_acquire():
acquired_a = lock_a.acquire(timeout=2)
if not acquired_a:
print("Could not acquire lock_a, backing off")
return
try:
acquired_b = lock_b.acquire(timeout=2)
if not acquired_b:
print("Could not acquire lock_b, releasing lock_a")
return
try:
print("Acquired both locks safely")
finally:
lock_b.release()
finally:
lock_a.release()- Lock-Scope minimieren: Halten Sie Locks so kurz wie möglich.
Thread-Safety-Checkliste
- Schützen Sie alle geteilten, veränderlichen Zustände mit Locks
- Verwenden Sie
queue.Queuestatt geteilter Listen oder Dicts, wenn möglich - Vermeiden Sie globalen, veränderlichen Zustand; übergeben Sie Daten durch Funktionsargumente
- Verwenden Sie
ThreadPoolExecutorstatt manueller Thread-Verwaltung - Nehmen Sie nie die Operationsreihenfolge zwischen Threads an
- Testen Sie mit
threading.active_count()und Logging, um Thread-Leaks zu erkennen
Real-World-Beispiele
Gleichzeitiges Web-Scraping
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
def fetch_page(url):
"""Fetch a web page and return its content length"""
try:
with urllib.request.urlopen(url, timeout=10) as response:
content = response.read()
return url, len(content), None
except Exception as e:
return url, 0, str(e)
urls = [
"https://python.org",
"https://docs.python.org",
"https://pypi.org",
"https://realpython.com",
"https://github.com",
"https://stackoverflow.com",
"https://news.ycombinator.com",
"https://httpbin.org",
]
# Sequential
start = time.time()
for url in urls:
fetch_page(url)
sequential_time = time.time() - start
# Concurrent with threads
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(fetch_page, url): url for url in urls}
for future in as_completed(futures):
url, size, error = future.result()
if error:
print(f" FAIL {url}: {error}")
else:
print(f" OK {url}: {size:,} bytes")
threaded_time = time.time() - start
print(f"\nSequential: {sequential_time:.2f}s")
print(f"Threaded: {threaded_time:.2f}s")
print(f"Speedup: {sequential_time / threaded_time:.1f}x")Paralleles Datei-I/O
from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
def process_file(filepath):
"""Read file and compute its SHA-256 hash"""
with open(filepath, 'rb') as f:
content = f.read()
file_hash = hashlib.sha256(content).hexdigest()
size = os.path.getsize(filepath)
return filepath, file_hash, size
def hash_all_files(directory, pattern="*.py"):
"""Hash all matching files in a directory using threads"""
import glob
files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
results = {}
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_file, f): f for f in files}
for future in futures:
try:
path, hash_val, size = future.result()
results[path] = {"hash": hash_val, "size": size}
except Exception as e:
print(f"Error processing {futures[future]}: {e}")
return results
# Usage
# file_hashes = hash_all_files("/path/to/project")Gleichzeitige API-Aufrufe mit Retry-Logik
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
def fetch_api(endpoint, max_retries=3, backoff=1.0):
"""Fetch API endpoint with exponential backoff retry"""
for attempt in range(max_retries):
try:
url = f"https://jsonplaceholder.typicode.com{endpoint}"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as response:
data = json.loads(response.read())
return {"endpoint": endpoint, "data": data, "error": None}
except Exception as e:
if attempt < max_retries - 1:
wait = backoff * (2 ** attempt)
time.sleep(wait)
else:
return {"endpoint": endpoint, "data": None, "error": str(e)}
endpoints = [f"/posts/{i}" for i in range(1, 21)]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_api, ep) for ep in endpoints]
results = [f.result() for f in futures]
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Fetched {success}/{len(endpoints)} endpoints in {elapsed:.2f}s")Periodische Hintergrundaufgaben
import threading
import time
class PeriodicTask:
"""Run a function at fixed intervals in a background thread"""
def __init__(self, interval, func, *args, **kwargs):
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self._stop_event = threading.Event()
self._thread = None
def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
while not self._stop_event.is_set():
self.func(*self.args, **self.kwargs)
self._stop_event.wait(self.interval)
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join()
# Usage
def check_health():
print(f"Health check at {time.strftime('%H:%M:%S')}")
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Stopped")Performance: Threading vs Multiprocessing vs Asyncio
Das richtige Concurrency-Werkzeug hängt von der Arbeitslast ab. Hier ist ein Vergleich der Echtzeit für gängige Aufgaben:
| Aufgabe | Sequenziell | Threading (4) | Multiprocessing (4) | Asyncio |
|---|---|---|---|---|
| 100 HTTP-Requests (je 200ms) | 20,0s | 5,1s | 5,8s | 4,9s |
| 100 Datei-Lesevorgänge (je 10ms) | 1,0s | 0,28s | 0,35s | 0,26s |
| 100 CPU-Aufgaben (je 100ms) | 10,0s | 10,2s | 2,7s | 10,0s |
| 50 DB-Abfragen (je 50ms) | 2,5s | 0,68s | 0,85s | 0,62s |
| Gemischtes I/O + CPU | 15,0s | 8,2s | 4,1s | 9,5s |
Wichtige Erkenntnisse:
- Threading liefert 3-5x Geschwindigkeitszuwachs bei I/O-gebundenen Workloads mit minimalen Code-Änderungen
- Multiprocessing ist die einzige Option für echte CPU-Parallelität, aber mit Prozess-Overhead
- Asyncio übertrifft Threading bei hochgradigem I/O-Concurrency, erfordert aber Umschreiben mit async/await
- Für gemischte Workloads, erwägen Sie die Kombination von Threading für I/O und Multiprocessing für CPU-Aufgaben
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_task():
time.sleep(0.2)
def cpu_task(n=2_000_000):
return sum(i * i for i in range(n))
# Benchmark threading vs multiprocessing
NUM_TASKS = 20
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")Experimentieren mit Threading in RunCell
Das Debuggen und Profilen von Threaded-Code kann herausfordernd sein. Wenn Sie Thread-Synchronisation testen, Timing-Überlappungen visualisieren oder Race Conditions interaktiv diagnostizieren müssen, bietet RunCell (www.runcell.dev (opens in a new tab)) eine KI-gestützte Jupyter-Umgebung, die für diesen Workflow konzipiert ist.
Der KI-Agent von RunCell kann Ihren Threading-Code analysieren, potenzielle Deadlocks erkennen, bevor sie passieren, optimale Worker-Zahlen basierend auf Ihrer Workload vorschlagen und Ihnen helfen, zu verstehen, warum sich Threads unerwartet verhalten. Wenn ein Thread-Pool intermitierend falsche Ergebnisse produziert, verfolgt RunCell die Ausführungs-Timeline, um den genauen Moment zu identifizieren, in dem der geteilte Zustand korrumpiert wird.
Wenn Sie die Performance-Charakteristiken verschiedener Threading-Konfigurationen visualisieren möchten, kann PyGWalker (github.com/Kanaries/pygwalker) Ihre Benchmark-DataFrames in interaktive Diagramme verwandeln. Führen Sie Threading-Benchmarks durch, sammeln Sie Timing-Daten in einem pandas-DataFrame und erkunden Sie die Ergebnisse mit Drag-and-drop-Visualisierungen, um die optimale Thread-Anzahl für Ihre Workload zu finden.
FAQ
Was ist der Unterschied zwischen Threading und Multiprocessing in Python?
Threading führt mehrere Threads innerhalb eines einzigen Prozesses aus, der Speicher teilt. Der Global Interpreter Lock (GIL) verhindert, dass Threads Python-Bytecode parallel ausführen, was Threading nur für I/O-gebundene Aufgaben wie Netzwerk-Requests und Dateioperationen effektiv macht. Multiprocessing erzeugt separate Prozesse, jeder mit seinem eigenen Python-Interpreter und Speicherbereich, was echte parallele Ausführung für CPU-gebundene Aufgaben ermöglicht. Threading hat geringeren Overhead (schnellerer Start, weniger Speicher), während Multiprocessing den GIL für echte Parallelität umgeht.
Ist Python Threading wirklich parallel?
Nein, Python Threading ist gleichzeitig (concurrent), aber nicht parallel für CPU-gebundenen Code aufgrund des GIL. Nur ein Thread führt Python-Bytecode zur gleichen Zeit aus. Allerdings gibt der GIL während I/O-Operationen (Netzwerk, Festplatte, Datenbank) frei, sodass mehrere Threads effektiv parallel laufen, wenn sie auf I/O warten. Für CPU-gebundene Parallelität verwenden Sie das multiprocessing-Modul oder C-Erweiterungen, die den GIL freigeben (wie NumPy).
Wie viele Threads sollte ich in Python verwenden?
Für I/O-gebundene Aufgaben beginnen Sie mit 5-20 Threads, abhängig von den Rate-Limits des externen Services und Ihrer Netzwerkbandbreite. Zu viele Threads an einen einzelnen Server können Verbindungsablehnungen oder Throttling verursachen. Für gemischte Workloads experimentieren Sie mit Thread-Zahlen zwischen der Anzahl der CPU-Kerne und dem 4-fachen der Kernanzahl. Verwenden Sie ThreadPoolExecutor und benchmarken Sie mit verschiedenen max_workers-Werten, um die optimale Anzahl für Ihre spezifische Workload zu finden. Der Standardwert für ThreadPoolExecutor ist min(32, os.cpu_count() + 4).
Wie gebe ich einen Wert aus einem Python-Thread zurück?
Threads geben Werte nicht direkt aus ihrer Zielfunktion zurück. Die drei Hauptansätze sind: (1) Verwenden Sie ThreadPoolExecutor.submit(), das ein Future-Objekt zurückgibt, bei dem Sie future.result() aufrufen, um den Rückgabewert zu erhalten. (2) Übergeben Sie einen veränderlichen Container (wie ein Dictionary oder eine Liste) als Argument und lassen Sie den Thread Ergebnisse hineinschreiben, geschützt durch ein Lock. (3) Verwenden Sie queue.Queue, bei dem der Thread Ergebnisse in die Queue legt und der Haupt-Thread daraus liest. ThreadPoolExecutor ist der sauberste Ansatz für die meisten Anwendungsfälle.
Was passiert, wenn ein Python-Thread eine Exception wirft?
In einem rohen threading.Thread beendet eine unbehandelte Exception den Thread stillschweigend und die Exception geht verloren. Der Haupt-Thread und andere Threads laufen ohne Benachrichtigung weiter. Mit ThreadPoolExecutor werden Exceptions erfasst und neu geworfen, wenn Sie future.result() aufrufen, was die Fehlerbehandlung viel zuverlässiger macht. Verwenden Sie immer try/except-Blöcke innerhalb von Thread-Zielfunktionen oder verwenden Sie ThreadPoolExecutor, um sicherzustellen, dass Exceptions ordnungsgemäß erfasst und behandelt werden.
Fazit
Python Threading ist ein mächtiges Werkzeug, um I/O-gebundene Programme zu beschleunigen. Indem Sie Netzwerk-Requests, Dateioperationen und Datenbankabfragen gleichzeitig ausführen, können Sie aus einem 20-sekündigen sequenziellen Skript eines machen, das mit minimalen Code-Änderungen in 5 Sekunden fertig ist.
Die wichtigsten Punkte zum Mitnehmen:
- Verwenden Sie Threading für I/O-gebundene Arbeit. Der GIL verhindert CPU-Parallelität, aber Threads überlappen I/O-Wartezeiten effektiv.
- Verwenden Sie
ThreadPoolExecutorfür die meisten Threading-Bedürfnisse. Es verwaltet Threads, sammelt Ergebnisse und propagiert Exceptions sauber. - Schützen Sie geteilte Zustände mit Locks. Race Conditions sind der häufigste Threading-Bug, und
queue.Queueeliminiert die meisten Locking-Sorgen. - Vermeiden Sie Deadlocks, indem Sie Locks in konsistenter Reihenfolge erwerben und Timeouts verwenden.
- Wählen Sie das richtige Werkzeug: Threading für I/O, Multiprocessing für CPU, Asyncio für tausende gleichzeitige Verbindungen.
Beginnen Sie mit ThreadPoolExecutor und einem einfachen executor.map()-Aufruf. Messen Sie den Geschwindigkeitszuwachs. Fügen Sie Synchronisation nur dort hinzu, wo geteilte, veränderliche Zustände es erfordern. Threading erfordert keine komplette Neuschreibung Ihres Codes. Ein paar Zeilen concurrent.futures können dramatische Performance-Verbesserungen für jedes Programm liefern, das Zeit mit Warten verbringt.