Skip to content
Themen
Python
Python Multiprocessing: Parallel Processing Guide for Speed

Python Multiprocessing: Leitfaden zur parallelen Verarbeitung für mehr Geschwindigkeit

Updated on

Das Single-Thread-Ausführungsmodell von Python stößt an seine Grenzen bei der Verarbeitung großer Datensätze oder rechenintensiven Berechnungen. Ein Skript, das 10 Minuten zur Datenverarbeitung benötigt, könnte theoretisch in 2 Minuten auf einer 5-Kern-Maschine laufen, aber der Global Interpreter Lock (GIL) von Python verhindert, dass Standard-Threads echten Parallelismus erreichen. Das Ergebnis sind ungenutzte CPU-Kerne und frustrierte Entwickler, die zusehen, wie ihre Multi-Core-Prozessoren untätig bleiben, während Python Aufgaben nacheinander abarbeitet.

Dieser Engpass kostet echte Zeit und Geld. Data Scientists warten stundenlang auf Modelltraining, das in Minuten abgeschlossen sein könnte. Web-Scraper crawlen mit einem Bruchteil ihrer potenziellen Geschwindigkeit. Bildverarbeitungs-Pipelines, die alle verfügbaren Kerne nutzen sollten, hinken stattdessen mit nur einem Kern.

Das multiprocessing-Modul löst dies, indem es separate Python-Prozesse erstellt, jeder mit seinem eigenen Interpreter und Speicherbereich. Anders als Threads umgehen Prozesse den GIL vollständig und ermöglichen echte parallele Ausführung über CPU-Kerne hinweg. Dieser Leitfaden zeigt Ihnen, wie Sie Multiprocessing für dramatische Performance-Verbesserungen nutzen, von grundlegender paralleler Ausführung bis zu fortgeschrittenen Mustern wie Process Pools und Shared Memory.

📚

Das GIL-Problem verstehen

Der Global Interpreter Lock (GIL) ist ein Mutex, der den Zugriff auf Python-Objekte schützt und verhindert, dass mehrere Threads gleichzeitig Python-Bytecode ausführen. Selbst auf einer 16-Kern-Maschine führen Python-Threads bei CPU-gebundenen Aufgaben jeweils nur einen Thread gleichzeitig aus.

import threading
import time
 
def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i * i
    return count
 
# Threading parallelisiert CPU-gebundene Arbeit NICHT
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s")  # ~gleiche Zeit wie Single-Thread

Der GIL gibt nur während I/O-Operationen (Datei-Lesevorgänge, Netzwerk-Requests) frei, wodurch Threading für I/O-gebundene Aufgaben nützlich, aber für CPU-gebundene Arbeit ineffektiv ist. Multiprocessing umgeht den GIL, indem es separate Python-Interpreter parallel ausführt.

Grundlegendes Multiprocessing mit Process

Die Process-Klasse erstellt einen neuen Python-Prozess, der unabhängig läuft. Jeder Prozess hat seinen eigenen Speicherbereich und Python-Interpreter.

from multiprocessing import Process
import os
 
def worker(name):
    print(f"Worker {name} läuft in Prozess {os.getpid()}")
    result = sum(i*i for i in range(5_000_000))
    print(f"Worker {name} abgeschlossen: {result}")
 
if __name__ == '__main__':
    processes = []
 
    # 4 Prozesse erstellen
    for i in range(4):
        p = Process(target=worker, args=(f"#{i}",))
        processes.append(p)
        p.start()
 
    # Auf Abschluss aller warten
    for p in processes:
        p.join()
 
    print("Alle Prozesse abgeschlossen")

Kritische Anforderung: Verwenden Sie immer den if __name__ == '__main__'-Guard unter Windows und macOS. Ohne diesen spawnen Child-Prozesse rekursiv weitere Prozesse und verursachen eine Fork-Bombe.

Process Pool: Vereinfachte parallele Ausführung

Pool verwaltet einen Pool von Worker-Prozessen und verteilt Aufgaben automatisch. Dies ist das gängigste Multiprocessing-Muster.

from multiprocessing import Pool
import time
 
def process_item(x):
    """Simuliert CPU-intensive Arbeit"""
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    data = range(100)
 
    # Sequentielle Verarbeitung
    start = time.time()
    results_seq = [process_item(x) for x in data]
    seq_time = time.time() - start
 
    # Parallele Verarbeitung mit 4 Workern
    start = time.time()
    with Pool(processes=4) as pool:
        results_par = pool.map(process_item, data)
    par_time = time.time() - start
 
    print(f"Sequentiell: {seq_time:.2f}s")
    print(f"Parallel (4 Kerne): {par_time:.2f}s")
    print(f"Beschleunigung: {seq_time/par_time:.2f}x")

Vergleich der Pool-Methoden

Verschiedene Pool-Methoden eignen sich für verschiedene Anwendungsfälle:

MethodeAnwendungsfallBlockiertRückgabeMehrere Argumente
map()Einfache ParallelisierungJaGeordnete ListeNein (einzelnes Argument)
map_async()Nicht-blockierendes mapNeinAsyncResultNein
starmap()Mehrere ArgumenteJaGeordnete ListeJa (Tuple-Entpackung)
starmap_async()Nicht-blockierendes starmapNeinAsyncResultJa
apply()Einzelner FunktionsaufrufJaEinzelnes ErgebnisJa
apply_async()Nicht-blockierendes applyNeinAsyncResultJa
imap()Lazy IteratorJaIteratorNein
imap_unordered()Lazy, ungeordnetJaIteratorNein
from multiprocessing import Pool
 
def add(x, y):
    return x + y
 
def power(x, exp):
    return x ** exp
 
if __name__ == '__main__':
    with Pool(4) as pool:
        # map: einzelnes Argument
        squares = pool.map(lambda x: x**2, [1, 2, 3, 4])
 
        # starmap: mehrere Argumente (entpackt Tuples)
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
 
        # apply_async: nicht-blockierender Einzelaufruf
        async_result = pool.apply_async(power, (2, 10))
        result = async_result.get()  # blockiert bis bereit
 
        # imap: Lazy Evaluation für große Datensätze
        for result in pool.imap(lambda x: x**2, range(1000)):
            pass  # verarbeitet nacheinander wenn Ergebnisse eintreffen

Inter-Prozess-Kommunikation

Prozesse teilen standardmäßig keinen Speicher. Verwenden Sie Queue oder Pipe zur Kommunikation.

Queue: Thread-sichere Nachrichten-Weitergabe

from multiprocessing import Process, Queue
 
def producer(queue, items):
    for item in items:
        queue.put(item)
        print(f"Produziert: {item}")
    queue.put(None)  # Sentinel-Wert
 
def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Konsumiert: {item}")
        # Item verarbeiten...
 
if __name__ == '__main__':
    q = Queue()
    items = [1, 2, 3, 4, 5]
 
    prod = Process(target=producer, args=(q, items))
    cons = Process(target=consumer, args=(q,))
 
    prod.start()
    cons.start()
 
    prod.join()
    cons.join()

Pipe: Bidirektionale Kommunikation

from multiprocessing import Process, Pipe
 
def worker(conn):
    conn.send("Hallo vom Worker")
    msg = conn.recv()
    print(f"Worker empfangen: {msg}")
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
 
    msg = parent_conn.recv()
    print(f"Parent empfangen: {msg}")
    parent_conn.send("Hallo vom Parent")
 
    p.join()

Shared Memory und State

Während Prozesse separaten Speicher haben, bietet multiprocessing Shared-Memory-Primitive.

Value und Array: Geteilte Primitive

from multiprocessing import Process, Value, Array
import time
 
def increment_counter(counter, lock):
    for _ in range(100_000):
        with lock:
            counter.value += 1
 
def fill_array(arr, start, end):
    for i in range(start, end):
        arr[i] = i * i
 
if __name__ == '__main__':
    # Geteilter Wert mit Lock
    counter = Value('i', 0)
    lock = counter.get_lock()
 
    processes = [Process(target=increment_counter, args=(counter, lock)) for _ in range(4)]
    for p in processes: p.start()
    for p in processes: p.join()
 
    print(f"Counter: {counter.value}")  # Sollte 400.000 sein
 
    # Geteiltes Array
    shared_arr = Array('i', 1000)
    p1 = Process(target=fill_array, args=(shared_arr, 0, 500))
    p2 = Process(target=fill_array, args=(shared_arr, 500, 1000))
    p1.start(); p2.start()
    p1.join(); p2.join()
 
    print(f"Array[100]: {shared_arr[100]}")  # 10.000

Manager: Komplexe geteilte Objekte

from multiprocessing import Process, Manager
 
def update_dict(shared_dict, key, value):
    shared_dict[key] = value
 
if __name__ == '__main__':
    with Manager() as manager:
        # Geteiltes Dict, List, Namespace
        shared_dict = manager.dict()
        shared_list = manager.list()
 
        processes = [
            Process(target=update_dict, args=(shared_dict, f"key{i}", i*10))
            for i in range(5)
        ]
 
        for p in processes: p.start()
        for p in processes: p.join()
 
        print(dict(shared_dict))  # {'key0': 0, 'key1': 10, ...}

Vergleich: Multiprocessing vs Threading vs Asyncio

FeatureMultiprocessingThreadingAsyncioconcurrent.futures
GIL-UmgehungJaNeinNeinAbhängig vom Executor
CPU-gebundene AufgabenExzellentSchwachSchwachExzellent (ProcessPoolExecutor)
I/O-gebundene AufgabenGutExzellentExzellentExzellent (ThreadPoolExecutor)
Speicher-OverheadHoch (separate Prozesse)Niedrig (geteilter Speicher)NiedrigVariiert
StartkostenHochNiedrigSehr niedrigVariiert
KommunikationQueue, Pipe, Shared MemoryDirekt (geteilter State)Native async/awaitFutures
Am besten fürCPU-intensive parallele AufgabenI/O-gebundene Aufgaben, einfache NebenläufigkeitAsync I/O, viele parallele AufgabenEinheitliche API für beides
# Multiprocessing für CPU-gebunden verwenden
from multiprocessing import Pool
 
def cpu_bound(n):
    return sum(i*i for i in range(n))
 
with Pool(4) as pool:
    results = pool.map(cpu_bound, [10_000_000] * 4)
 
# Threading für I/O-gebunden verwenden
import threading
import requests
 
def fetch_url(url):
    return requests.get(url).text
 
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
 
# Asyncio für Async I/O verwenden
import asyncio
import aiohttp
 
async def fetch_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
 
asyncio.run(asyncio.gather(*[fetch_async(url) for url in urls]))

Fortgeschritten: ProcessPoolExecutor

concurrent.futures.ProcessPoolExecutor bietet eine höhere Schnittstelle mit der gleichen API wie ThreadPoolExecutor.

from concurrent.futures import ProcessPoolExecutor, as_completed
import time
 
def process_task(x):
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    # Context Manager stellt Aufräumen sicher
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Einzelne Aufgaben einreichen
        futures = [executor.submit(process_task, i) for i in range(20)]
 
        # Verarbeiten sobald sie fertig sind
        for future in as_completed(futures):
            result = future.result()
            print(f"Ergebnis: {result}")
 
        # Oder map verwenden (wie Pool.map)
        results = executor.map(process_task, range(20))
        print(list(results))

Vorteile gegenüber Pool:

  • Gleiche API für ThreadPoolExecutor und ProcessPoolExecutor
  • Futures-Interface für mehr Kontrolle
  • Bessere Fehlerbehandlung
  • Einfacheres Mischen von Sync- und Async-Code

Gängige Muster

Embarrassingly Parallel Tasks

Aufgaben ohne Abhängigkeiten sind ideal für Multiprocessing:

from multiprocessing import Pool
import pandas as pd
 
def process_chunk(chunk):
    """Verarbeitet einen Daten-Chunk unabhängig"""
    chunk['new_col'] = chunk['value'] * 2
    return chunk.groupby('category').sum()
 
if __name__ == '__main__':
    df = pd.DataFrame({'category': ['A', 'B'] * 5000, 'value': range(10000)})
 
    # In Chunks aufteilen
    chunks = [df.iloc[i:i+2500] for i in range(0, len(df), 2500)]
 
    with Pool(4) as pool:
        results = pool.map(process_chunk, chunks)
 
    # Ergebnisse kombinieren
    final = pd.concat(results).groupby('category').sum()

Map-Reduce-Muster

from multiprocessing import Pool
from functools import reduce
 
def mapper(text):
    """Map: Wörter extrahieren und zählen"""
    words = text.lower().split()
    return {word: 1 for word in words}
 
def reducer(dict1, dict2):
    """Reduce: Wortzählungen zusammenführen"""
    for word, count in dict2.items():
        dict1[word] = dict1.get(word, 0) + count
    return dict1
 
if __name__ == '__main__':
    documents = ["hallo welt", "welt von python", "hallo python"] * 1000
 
    with Pool(4) as pool:
        # Map-Phase: parallel
        word_dicts = pool.map(mapper, documents)
 
    # Reduce-Phase: sequentiell (oder Tree-Reduktion verwenden)
    word_counts = reduce(reducer, word_dicts)
    print(word_counts)

Producer-Consumer mit mehreren Producern

from multiprocessing import Process, Queue, cpu_count
 
def producer(queue, producer_id, items):
    for item in items:
        queue.put((producer_id, item))
    print(f"Producer {producer_id} abgeschlossen")
 
def consumer(queue, num_producers):
    finished_producers = 0
    while finished_producers < num_producers:
        if not queue.empty():
            item = queue.get()
            if item is None:
                finished_producers += 1
            else:
                producer_id, data = item
                print(f"Von Producer {producer_id} konsumiert: {data}")
 
if __name__ == '__main__':
    q = Queue()
    num_producers = 3
 
    # Producer starten
    producers = [
        Process(target=producer, args=(q, i, range(i*10, (i+1)*10)))
        for i in range(num_producers)
    ]
    for p in producers: p.start()
 
    # Consumer starten
    cons = Process(target=consumer, args=(q, num_producers))
    cons.start()
 
    # Aufräumen
    for p in producers: p.join()
    for _ in range(num_producers):
        q.put(None)  # Consumer signalisieren
    cons.join()

Performance-Überlegungen

Wann Multiprocessing hilft

  • CPU-gebundene Aufgaben: Datenverarbeitung, mathematische Berechnungen, Bildverarbeitung
  • Große Datensätze: Wenn die Verarbeitungszeit pro Item den Prozess-Overhead rechtfertigt
  • Unabhängige Aufgaben: Kein geteilter State oder minimale Kommunikation

Wann Multiprocessing schadet

Prozesserstellungs-Overhead kann Vorteile zunichtemachen für:

from multiprocessing import Pool
import time
 
def tiny_task(x):
    return x + 1
 
if __name__ == '__main__':
    data = range(100)
 
    # Sequentiell ist schneller für winzige Aufgaben
    start = time.time()
    results = [tiny_task(x) for x in data]
    print(f"Sequentiell: {time.time() - start:.4f}s")  # ~0.0001s
 
    start = time.time()
    with Pool(4) as pool:
        results = pool.map(tiny_task, data)
    print(f"Parallel: {time.time() - start:.4f}s")  # ~0.05s (500x langsamer!)

Faustregeln:

  • Minimale Aufgabendauer: ~0,1 Sekunden pro Item
  • Datengröße: Wenn das Picklen von Daten länger dauert als die Verarbeitung, Shared Memory verwenden
  • Anzahl der Worker: Mit cpu_count() beginnen, basierend auf Aufgabenmerkmalen optimieren

Pickling-Anforderungen

Nur picklbare Objekte können zwischen Prozessen übergeben werden:

from multiprocessing import Pool
 
# ❌ Lambda-Funktionen sind nicht picklbar
# pool.map(lambda x: x*2, range(10))  # Schlägt fehl
 
# ✅ Benannte Funktionen verwenden
def double(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(double, range(10))
 
# ❌ Lokale Funktionen in Notebooks schlagen fehl
# def process():
#     def inner(x): return x*2
#     pool.map(inner, range(10))  # Schlägt fehl
 
# ✅ Auf Modulebene definieren oder functools.partial verwenden
from functools import partial
 
def multiply(x, factor):
    return x * factor
 
with Pool(4) as pool:
    pool.map(partial(multiply, factor=3), range(10))

Parallelen Code mit RunCell debuggen

Das Debuggen von Multiprocessing-Code ist notorisch schwierig. Print-Anweisungen verschwinden, Breakpoints funktionieren nicht, und Stack Traces sind kryptisch. Wenn Prozesse lautlos abstürzen oder falsche Ergebnisse produzieren, versagen traditionelle Debugging-Tools.

RunCell (www.runcell.dev (opens in a new tab)) ist ein AI Agent für Jupyter, der sich im Debuggen von parallelem Code auszeichnet. Anders als Standard-Debugger, die die Ausführung über Prozesse hinweg nicht verfolgen können, analysiert RunCell Ihre Multiprocessing-Muster, identifiziert Race Conditions, erkennt Pickling-Fehler vor der Laufzeit und erklärt, warum Prozesse blockieren.

Wenn ein Pool-Worker ohne Traceback abstürzt, kann RunCell die Error-Queue inspizieren und Ihnen genau zeigen, welcher Funktionsaufruf fehlgeschlagen ist und warum. Wenn geteilter State falsche Ergebnisse produziert, verfolgt RunCell Speicherzugriffsmuster, um das fehlende Lock zu finden. Für Data Scientists, die komplexe parallele Daten-Pipelines debuggen, verwandelt RunCell stundenlange Print-Statement-Debugging-Sessions in Minuten AI-geführter Fixes.

Best Practices

1. Verwenden Sie immer den if name Guard

# ✅ Korrekt
if __name__ == '__main__':
    with Pool(4) as pool:
        pool.map(func, data)
 
# ❌ Falsch - verursacht Fork-Bombe unter Windows
with Pool(4) as pool:
    pool.map(func, data)

2. Schließen Sie Pools explizit

# ✅ Context Manager (empfohlen)
with Pool(4) as pool:
    results = pool.map(func, data)
 
# ✅ Explizites Schließen und Join
pool = Pool(4)
results = pool.map(func, data)
pool.close()
pool.join()
 
# ❌ Leckt Ressourcen
pool = Pool(4)
results = pool.map(func, data)

3. Behandeln Sie Exceptions

from multiprocessing import Pool
 
def risky_task(x):
    if x == 5:
        raise ValueError("Schlechter Wert")
    return x * 2
 
if __name__ == '__main__':
    with Pool(4) as pool:
        try:
            results = pool.map(risky_task, range(10))
        except ValueError as e:
            print(f"Aufgabe fehlgeschlagen: {e}")
 
        # Oder individuell mit apply_async behandeln
        async_results = [pool.apply_async(risky_task, (i,)) for i in range(10)]
 
        for i, ar in enumerate(async_results):
            try:
                result = ar.get()
                print(f"Ergebnis {i}: {result}")
            except ValueError:
                print(f"Aufgabe {i} fehlgeschlagen")

4. Vermeiden Sie geteilten State wenn möglich

# ❌ Geteilter State erfordert Synchronisation
from multiprocessing import Process, Value
 
counter = Value('i', 0)
 
def increment():
    for _ in range(100000):
        counter.value += 1  # Race Condition!
 
# ✅ Locks verwenden oder Teilen vermeiden
from multiprocessing import Lock
 
lock = Lock()
 
def increment_safe():
    for _ in range(100000):
        with lock:
            counter.value += 1
 
# ✅ Noch besser: Geteilten State vermeiden
def count_locally(n):
    return n  # Ergebnis stattdessen zurückgeben
 
with Pool(4) as pool:
    results = pool.map(count_locally, [100000] * 4)
    total = sum(results)

5. Wählen Sie die richtige Anzahl von Workern

from multiprocessing import cpu_count, Pool
 
# CPU-gebunden: alle Kerne verwenden
num_workers = cpu_count()
 
# I/O-gebunden: mehr Worker verwenden
num_workers = cpu_count() * 2
 
# Gemischte Workload: empirisch optimieren
with Pool(processes=num_workers) as pool:
    results = pool.map(func, data)

Häufige Fehler

1. Vergessen des if name Guards

Führt zu unendlichem Prozess-Spawning unter Windows/macOS.

2. Versuch, nicht-picklbare Objekte zu picklen

# ❌ Klassenmethoden, Lambdas, lokale Funktionen schlagen fehl
class DataProcessor:
    def process(self, x):
        return x * 2
 
dp = DataProcessor()
# pool.map(dp.process, data)  # Schlägt fehl
 
# ✅ Top-Level-Funktionen verwenden
def process(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(process, data)

3. Keine Prozessbeendigung behandeln

# ❌ Räumt nicht ordnungsgemäß auf
pool = Pool(4)
results = pool.map(func, data)
# pool läuft noch
 
# ✅ Immer schließen und joinen
pool = Pool(4)
try:
    results = pool.map(func, data)
finally:
    pool.close()
    pool.join()

4. Exzessive Datenübertragung

# ❌ Picklen riesiger Objekte ist langsam
large_data = [np.random.rand(1000, 1000) for _ in range(100)]
with Pool(4) as pool:
    pool.map(process_array, large_data)  # Langsame Serialisierung
 
# ✅ Shared Memory oder Memory-Mapped Files verwenden
import numpy as np
from multiprocessing import shared_memory
 
# Shared Memory erstellen
shm = shared_memory.SharedMemory(create=True, size=1000*1000*8)
arr = np.ndarray((1000, 1000), dtype=np.float64, buffer=shm.buf)
 
# Nur Name und Shape übergeben
def process_shared(name, shape):
    existing_shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray(shape, dtype=np.float64, buffer=existing_shm.buf)
    # arr verarbeiten...
    existing_shm.close()
 
with Pool(4) as pool:
    pool.starmap(process_shared, [(shm.name, (1000, 1000))] * 4)
 
shm.close()
shm.unlink()

FAQ

Wie umgeht Multiprocessing den GIL?

Der GIL (Global Interpreter Lock) ist ein Mutex in jedem Python-Interpreter, der verhindert, dass mehrere Threads gleichzeitig Python-Bytecode ausführen. Multiprocessing umgeht dies, indem es separate Python-Prozesse erstellt, jeder mit seinem eigenen Interpreter und GIL. Da Prozesse keinen Speicher teilen, laufen sie wirklich parallel über CPU-Kerne hinweg ohne GIL-Konkurrenz.

Wann sollte ich Multiprocessing vs Threading verwenden?

Verwenden Sie Multiprocessing für CPU-gebundene Aufgaben (Datenverarbeitung, Berechnungen, Bildmanipulation), bei denen der GIL die Performance limitiert. Verwenden Sie Threading für I/O-gebundene Aufgaben (Netzwerk-Requests, Dateioperationen), bei denen der GIL während I/O freigibt und Threads nebenläufig arbeiten können. Threading hat niedrigeren Overhead, kann aber CPU-Arbeit aufgrund des GIL nicht parallelisieren.

Warum brauche ich den if name == 'main' Guard?

Unter Windows und macOS importieren Child-Prozesse das Hauptmodul, um auf Funktionen zuzugreifen. Ohne den Guard führt das Importieren des Moduls den Pool-Erstellungscode erneut aus und spawnt unendlich viele Prozesse (Fork-Bombe). Linux verwendet fork(), was keine Imports erfordert, aber der Guard ist dennoch Best Practice für plattformübergreifenden Code.

Wie viele Worker-Prozesse sollte ich verwenden?

Für CPU-gebundene Aufgaben beginnen Sie mit cpu_count() (Anzahl der CPU-Kerne). Mehr Worker als Kerne verursachen Context-Switching-Overhead. Für I/O-gebundene Aufgaben können Sie mehr Worker verwenden (2-4x Kerne), da Prozesse auf I/O warten. Benchmarken Sie immer mit Ihrer spezifischen Workload, da Speicher- und Datenübertragungs-Overhead die optimale Worker-Anzahl begrenzen können.

Welche Objekte kann ich an Multiprocessing-Funktionen übergeben?

Objekte müssen picklbar sein (serialisierbar mit pickle). Dies umfasst Built-in-Typen (int, str, list, dict), NumPy-Arrays, Pandas-DataFrames und die meisten benutzerdefinierten Klassen. Lambdas, lokale Funktionen, Klassenmethoden, File Handles, Datenbankverbindungen und Thread-Locks können nicht gepickelt werden. Definieren Sie Funktionen auf Modulebene oder verwenden Sie functools.partial für partielle Anwendung.

Fazit

Python Multiprocessing verwandelt CPU-gebundene Engpässe in parallele Operationen, die mit verfügbaren Kernen skalieren. Durch Umgehung des GIL über separate Prozesse erreichen Sie echten Parallelismus, der mit Threading unmöglich ist. Das Pool-Interface vereinfacht gängige Muster, während Queue, Pipe und Shared Memory komplexe Inter-Prozess-Workflows ermöglichen.

Beginnen Sie mit Pool.map() für embarrassingly parallel Tasks, messen Sie die Beschleunigung und optimieren Sie von dort aus. Denken Sie an den if __name__ == '__main__'-Guard, halten Sie Aufgaben grobkörnig, um Prozess-Overhead zu amortisieren, und minimieren Sie Datenübertragung zwischen Prozessen. Wenn Debugging komplex wird, können Tools wie RunCell helfen, die Ausführung über Prozessgrenzen hinweg zu verfolgen.

Multiprocessing ist nicht immer die Antwort. Für I/O-gebundene Arbeit können Threading oder Asyncio einfacher und schneller sein. Aber wenn Sie große Datensätze verarbeiten, Modelle trainieren oder schwere Berechnungen durchführen, liefert Multiprocessing die Performance, für die Ihre Multi-Core-Maschine gebaut wurde.

📚