Skip to content
トピック
Python
Python Multiprocessing:高速化のための並列処理ガイド

Python Multiprocessing:高速化のための並列処理ガイド

Updated on

Pythonのシングルスレッド実行モデルは、大規模なデータセットの処理やCPU集約的な計算を行う際に限界に達します。10分かかるデータ処理スクリプトは、理論的には5コアマシンで2分で実行できるはずですが、PythonのGlobal Interpreter Lock(GIL)により、標準スレッドが真の並列処理を実現できません。その結果、CPUコアが無駄になり、マルチコアプロセッサが遊休状態のまま、Pythonがタスクを1つずつ処理するのを開発者は苛立ちながら見守ることになります。

このボトルネックは実際の時間とコストを消費します。データサイエンティストは数分で終わるはずのモデルトレーニングに何時間も待ちます。Webスクレイパーは潜在的な速度の一部でクロールします。すべての利用可能なコアを活用すべき画像処理パイプラインが、代わりに1つのコアだけを使って遅々として進みます。

multiprocessingモジュールは、それぞれ独自のインタープリタとメモリ空間を持つ別々のPythonプロセスを作成することでこれを解決します。スレッドとは異なり、プロセスはGILを完全にバイパスし、CPUコア全体で真の並列実行を可能にします。このガイドでは、基本的な並列実行から、プロセスプールや共有メモリなどの高度なパターンまで、劇的なパフォーマンス向上のためにmultiprocessingを活用する方法を示します。

📚

GIL問題を理解する

Global Interpreter Lock(GIL)は、Pythonオブジェクトへのアクセスを保護するmutexで、複数のスレッドが同時にPythonバイトコードを実行することを防ぎます。16コアマシンでも、CPU束縛タスクの場合、Pythonスレッドは一度に1つずつ実行されます。

import threading
import time
 
def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i * i
    return count
 
# ThreadingはCPU束縛作業を並列化しません
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s")  # シングルスレッドとほぼ同じ時間

GILはI/O操作(ファイル読み取り、ネットワークリクエスト)中にのみ解放されるため、threadingはI/O束縛タスクには有用ですが、CPU束縛作業には効果がありません。Multiprocessingは、別々のPythonインタープリタを並列実行することでGILをバイパスします。

Processを使った基本的なMultiprocessing

Processクラスは、独立して実行される新しいPythonプロセスを作成します。各プロセスには独自のメモリ空間とPythonインタープリタがあります。

from multiprocessing import Process
import os
 
def worker(name):
    print(f"Worker {name} がプロセス {os.getpid()} で実行中")
    result = sum(i*i for i in range(5_000_000))
    print(f"Worker {name} 完了: {result}")
 
if __name__ == '__main__':
    processes = []
 
    # 4つのプロセスを作成
    for i in range(4):
        p = Process(target=worker, args=(f"#{i}",))
        processes.append(p)
        p.start()
 
    # すべての完了を待つ
    for p in processes:
        p.join()
 
    print("すべてのプロセスが完了しました")

重要な要件:WindowsとmacOSでは常にif __name__ == '__main__'ガードを使用してください。これがないと、子プロセスが再帰的にさらにプロセスを生成し、フォーク爆弾を引き起こします。

Process Pool:簡素化された並列実行

Poolはワーカープロセスのプールを管理し、タスクを自動的に分散します。これは最も一般的なmultiprocessingパターンです。

from multiprocessing import Pool
import time
 
def process_item(x):
    """CPU集約的な作業をシミュレート"""
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    data = range(100)
 
    # 逐次処理
    start = time.time()
    results_seq = [process_item(x) for x in data]
    seq_time = time.time() - start
 
    # 4ワーカーでの並列処理
    start = time.time()
    with Pool(processes=4) as pool:
        results_par = pool.map(process_item, data)
    par_time = time.time() - start
 
    print(f"逐次: {seq_time:.2f}s")
    print(f"並列 (4コア): {par_time:.2f}s")
    print(f"高速化: {seq_time/par_time:.2f}x")

Poolメソッドの比較

異なるPoolメソッドは異なるユースケースに適しています:

メソッドユースケースブロック戻り値複数引数
map()単純な並列化はい順序付きリストなし(単一引数)
map_async()非ブロッキングmapいいえAsyncResultなし
starmap()複数引数はい順序付きリストあり(タプル展開)
starmap_async()非ブロッキングstarmapいいえAsyncResultあり
apply()単一関数呼び出しはい単一結果あり
apply_async()非ブロッキングapplyいいえAsyncResultあり
imap()遅延イテレータはいイテレータなし
imap_unordered()遅延、順序なしはいイテレータなし
from multiprocessing import Pool
 
def add(x, y):
    return x + y
 
def power(x, exp):
    return x ** exp
 
if __name__ == '__main__':
    with Pool(4) as pool:
        # map: 単一引数
        squares = pool.map(lambda x: x**2, [1, 2, 3, 4])
 
        # starmap: 複数引数(タプルを展開)
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
 
        # apply_async: 非ブロッキング単一呼び出し
        async_result = pool.apply_async(power, (2, 10))
        result = async_result.get()  # 準備完了までブロック
 
        # imap: 大規模データセット用の遅延評価
        for result in pool.imap(lambda x: x**2, range(1000)):
            pass  # 結果が到着するたびに1つずつ処理

プロセス間通信

プロセスはデフォルトでメモリを共有しません。通信にはQueueまたはPipeを使用します。

Queue:スレッドセーフなメッセージ受け渡し

from multiprocessing import Process, Queue
 
def producer(queue, items):
    for item in items:
        queue.put(item)
        print(f"生成: {item}")
    queue.put(None)  # センチネル値
 
def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消費: {item}")
        # アイテムを処理...
 
if __name__ == '__main__':
    q = Queue()
    items = [1, 2, 3, 4, 5]
 
    prod = Process(target=producer, args=(q, items))
    cons = Process(target=consumer, args=(q,))
 
    prod.start()
    cons.start()
 
    prod.join()
    cons.join()

Pipe:双方向通信

from multiprocessing import Process, Pipe
 
def worker(conn):
    conn.send("ワーカーからこんにちは")
    msg = conn.recv()
    print(f"ワーカーが受信: {msg}")
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
 
    msg = parent_conn.recv()
    print(f"親が受信: {msg}")
    parent_conn.send("親からこんにちは")
 
    p.join()

共有メモリと状態

プロセスは別々のメモリを持ちますが、multiprocessingは共有メモリプリミティブを提供します。

ValueとArray:共有プリミティブ

from multiprocessing import Process, Value, Array
import time
 
def increment_counter(counter, lock):
    for _ in range(100_000):
        with lock:
            counter.value += 1
 
def fill_array(arr, start, end):
    for i in range(start, end):
        arr[i] = i * i
 
if __name__ == '__main__':
    # ロック付き共有値
    counter = Value('i', 0)
    lock = counter.get_lock()
 
    processes = [Process(target=increment_counter, args=(counter, lock)) for _ in range(4)]
    for p in processes: p.start()
    for p in processes: p.join()
 
    print(f"カウンター: {counter.value}")  # 400,000であるべき
 
    # 共有配列
    shared_arr = Array('i', 1000)
    p1 = Process(target=fill_array, args=(shared_arr, 0, 500))
    p2 = Process(target=fill_array, args=(shared_arr, 500, 1000))
    p1.start(); p2.start()
    p1.join(); p2.join()
 
    print(f"Array[100]: {shared_arr[100]}")  # 10,000

Manager:複雑な共有オブジェクト

from multiprocessing import Process, Manager
 
def update_dict(shared_dict, key, value):
    shared_dict[key] = value
 
if __name__ == '__main__':
    with Manager() as manager:
        # 共有dict、list、namespace
        shared_dict = manager.dict()
        shared_list = manager.list()
 
        processes = [
            Process(target=update_dict, args=(shared_dict, f"key{i}", i*10))
            for i in range(5)
        ]
 
        for p in processes: p.start()
        for p in processes: p.join()
 
        print(dict(shared_dict))  # {'key0': 0, 'key1': 10, ...}

比較:Multiprocessing vs Threading vs Asyncio

機能MultiprocessingThreadingAsyncioconcurrent.futures
GILバイパスはいいいえいいえエグゼキュータ次第
CPU束縛タスク優秀劣る劣る優秀(ProcessPoolExecutor)
I/O束縛タスク良い優秀優秀優秀(ThreadPoolExecutor)
メモリオーバーヘッド高い(別プロセス)低い(共有メモリ)低い変動
起動コスト高い低い非常に低い変動
通信Queue、Pipe、共有メモリ直接(共有状態)ネイティブasync/awaitFutures
最適な用途CPU集約的並列タスクI/O束縛タスク、単純な並行性非同期I/O、多数の並行タスク両方の統一API
# CPU束縛にmultiprocessingを使用
from multiprocessing import Pool
 
def cpu_bound(n):
    return sum(i*i for i in range(n))
 
with Pool(4) as pool:
    results = pool.map(cpu_bound, [10_000_000] * 4)
 
# I/O束縛にthreadingを使用
import threading
import requests
 
def fetch_url(url):
    return requests.get(url).text
 
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
 
# 非同期I/Oにasyncioを使用
import asyncio
import aiohttp
 
async def fetch_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
 
asyncio.run(asyncio.gather(*[fetch_async(url) for url in urls]))

高度:ProcessPoolExecutor

concurrent.futures.ProcessPoolExecutorは、ThreadPoolExecutorと同じAPIを持つ高レベルインターフェースを提供します。

from concurrent.futures import ProcessPoolExecutor, as_completed
import time
 
def process_task(x):
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    # コンテキストマネージャがクリーンアップを保証
    with ProcessPoolExecutor(max_workers=4) as executor:
        # 個別のタスクを送信
        futures = [executor.submit(process_task, i) for i in range(20)]
 
        # 完了次第処理
        for future in as_completed(futures):
            result = future.result()
            print(f"結果: {result}")
 
        # またはmapを使用(Pool.mapのように)
        results = executor.map(process_task, range(20))
        print(list(results))

Poolに対する利点

  • ThreadPoolExecutorProcessPoolExecutorで同じAPI
  • より細かい制御のためのFuturesインターフェース
  • より良いエラーハンドリング
  • 同期と非同期コードの混在が容易

一般的なパターン

恥ずかしいほど並列なタスク

依存関係のないタスクはmultiprocessingに最適です:

from multiprocessing import Pool
import pandas as pd
 
def process_chunk(chunk):
    """データチャンクを独立して処理"""
    chunk['new_col'] = chunk['value'] * 2
    return chunk.groupby('category').sum()
 
if __name__ == '__main__':
    df = pd.DataFrame({'category': ['A', 'B'] * 5000, 'value': range(10000)})
 
    # チャンクに分割
    chunks = [df.iloc[i:i+2500] for i in range(0, len(df), 2500)]
 
    with Pool(4) as pool:
        results = pool.map(process_chunk, chunks)
 
    # 結果を結合
    final = pd.concat(results).groupby('category').sum()

Map-Reduceパターン

from multiprocessing import Pool
from functools import reduce
 
def mapper(text):
    """Map: 単語を抽出してカウント"""
    words = text.lower().split()
    return {word: 1 for word in words}
 
def reducer(dict1, dict2):
    """Reduce: 単語カウントをマージ"""
    for word, count in dict2.items():
        dict1[word] = dict1.get(word, 0) + count
    return dict1
 
if __name__ == '__main__':
    documents = ["こんにちは 世界", "pythonの世界", "こんにちは python"] * 1000
 
    with Pool(4) as pool:
        # Mapフェーズ: 並列
        word_dicts = pool.map(mapper, documents)
 
    # Reduceフェーズ: 逐次(またはツリー削減を使用)
    word_counts = reduce(reducer, word_dicts)
    print(word_counts)

複数プロデューサーによるProducer-Consumer

from multiprocessing import Process, Queue, cpu_count
 
def producer(queue, producer_id, items):
    for item in items:
        queue.put((producer_id, item))
    print(f"プロデューサー {producer_id} 完了")
 
def consumer(queue, num_producers):
    finished_producers = 0
    while finished_producers < num_producers:
        if not queue.empty():
            item = queue.get()
            if item is None:
                finished_producers += 1
            else:
                producer_id, data = item
                print(f"プロデューサー {producer_id} から消費: {data}")
 
if __name__ == '__main__':
    q = Queue()
    num_producers = 3
 
    # プロデューサーを開始
    producers = [
        Process(target=producer, args=(q, i, range(i*10, (i+1)*10)))
        for i in range(num_producers)
    ]
    for p in producers: p.start()
 
    # コンシューマーを開始
    cons = Process(target=consumer, args=(q, num_producers))
    cons.start()
 
    # クリーンアップ
    for p in producers: p.join()
    for _ in range(num_producers):
        q.put(None)  # コンシューマーに通知
    cons.join()

パフォーマンスに関する考慮事項

Multiprocessingが役立つ場合

  • CPU束縛タスク:データ処理、数学計算、画像処理
  • 大規模データセット:アイテムごとの処理時間がプロセスオーバーヘッドを正当化する場合
  • 独立したタスク:共有状態がないか、最小限の通信

Multiprocessingが不利な場合

プロセス作成のオーバーヘッドが利益を上回る場合:

from multiprocessing import Pool
import time
 
def tiny_task(x):
    return x + 1
 
if __name__ == '__main__':
    data = range(100)
 
    # 小さなタスクには逐次の方が速い
    start = time.time()
    results = [tiny_task(x) for x in data]
    print(f"逐次: {time.time() - start:.4f}s")  # ~0.0001s
 
    start = time.time()
    with Pool(4) as pool:
        results = pool.map(tiny_task, data)
    print(f"並列: {time.time() - start:.4f}s")  # ~0.05s(500倍遅い!)

経験則

  • 最小タスク期間:アイテムごとに約0.1秒
  • データサイズ:データのpickle化が処理よりも時間がかかる場合は共有メモリを使用
  • ワーカー数:cpu_count()から始め、タスク特性に基づいて調整

Pickle化の要件

プロセス間で渡せるのはpickle可能なオブジェクトのみです:

from multiprocessing import Pool
 
# ❌ ラムダ関数はpickle不可
# pool.map(lambda x: x*2, range(10))  # 失敗
 
# ✅ 名前付き関数を使用
def double(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(double, range(10))
 
# ❌ ノートブック内のローカル関数は失敗
# def process():
#     def inner(x): return x*2
#     pool.map(inner, range(10))  # 失敗
 
# ✅ モジュールレベルで定義するか、functools.partialを使用
from functools import partial
 
def multiply(x, factor):
    return x * factor
 
with Pool(4) as pool:
    pool.map(partial(multiply, factor=3), range(10))

RunCellで並列コードをデバッグ

multiprocessingコードのデバッグは非常に困難です。print文は消え、ブレークポイントは機能せず、スタックトレースは暗号的です。プロセスが静かにクラッシュしたり、誤った結果を生成したりすると、従来のデバッグツールは失敗します。

RunCellwww.runcell.dev)は、並列コードのデバッグに優れたJupyter用AIエージェントです。プロセス全体の実行を追跡できない標準デバッガとは異なり、RunCellはmultiprocessingパターンを分析し、競合状態を特定し、実行時前にpickle化エラーを検出し、プロセスがデッドロックする理由を説明します。 (opens in a new tab)

Poolワーカーがトレースバックなしでクラッシュした場合、RunCellはエラーキューを検査し、どの関数呼び出しが失敗したのか、その理由を正確に示すことができます。共有状態が誤った結果を生成する場合、RunCellはメモリアクセスパターンをトレースして欠落しているロックを見つけます。複雑な並列データパイプラインをデバッグするデータサイエンティストにとって、RunCellは何時間ものprint文デバッグを、AIガイドによる修正の数分に変えます。

ベストプラクティス

1. 常にif __name__ガードを使用

# ✅ 正しい
if __name__ == '__main__':
    with Pool(4) as pool:
        pool.map(func, data)
 
# ❌ 間違い - Windowsでフォーク爆弾を引き起こす
with Pool(4) as pool:
    pool.map(func, data)

2. Poolを明示的に閉じる

# ✅ コンテキストマネージャ(推奨)
with Pool(4) as pool:
    results = pool.map(func, data)
 
# ✅ 明示的なクローズとjoin
pool = Pool(4)
results = pool.map(func, data)
pool.close()
pool.join()
 
# ❌ リソースリーク
pool = Pool(4)
results = pool.map(func, data)

3. 例外を処理

from multiprocessing import Pool
 
def risky_task(x):
    if x == 5:
        raise ValueError("不正な値")
    return x * 2
 
if __name__ == '__main__':
    with Pool(4) as pool:
        try:
            results = pool.map(risky_task, range(10))
        except ValueError as e:
            print(f"タスク失敗: {e}")
 
        # またはapply_asyncで個別に処理
        async_results = [pool.apply_async(risky_task, (i,)) for i in range(10)]
 
        for i, ar in enumerate(async_results):
            try:
                result = ar.get()
                print(f"結果 {i}: {result}")
            except ValueError:
                print(f"タスク {i} 失敗")

4. 可能な限り共有状態を避ける

# ❌ 共有状態には同期が必要
from multiprocessing import Process, Value
 
counter = Value('i', 0)
 
def increment():
    for _ in range(100000):
        counter.value += 1  # 競合状態!
 
# ✅ ロックを使用するか共有を避ける
from multiprocessing import Lock
 
lock = Lock()
 
def increment_safe():
    for _ in range(100000):
        with lock:
            counter.value += 1
 
# ✅ さらに良い:共有状態を避ける
def count_locally(n):
    return n  # 代わりに結果を返す
 
with Pool(4) as pool:
    results = pool.map(count_locally, [100000] * 4)
    total = sum(results)

5. 適切なワーカー数を選択

from multiprocessing import cpu_count, Pool
 
# CPU束縛:すべてのコアを使用
num_workers = cpu_count()
 
# I/O束縛:より多くのワーカーを使用可能
num_workers = cpu_count() * 2
 
# 混合ワークロード:経験的に調整
with Pool(processes=num_workers) as pool:
    results = pool.map(func, data)

よくある間違い

1. if __name__ガードを忘れる

Windows/macOSで無限プロセス生成につながります。

2. pickle化できないオブジェクトをpickle化しようとする

# ❌ クラスメソッド、ラムダ、ローカル関数は失敗
class DataProcessor:
    def process(self, x):
        return x * 2
 
dp = DataProcessor()
# pool.map(dp.process, data)  # 失敗
 
# ✅ トップレベル関数を使用
def process(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(process, data)

3. プロセス終了を処理しない

# ❌ 適切にクリーンアップしない
pool = Pool(4)
results = pool.map(func, data)
# プールがまだ実行中
 
# ✅ 常にクローズとjoin
pool = Pool(4)
try:
    results = pool.map(func, data)
finally:
    pool.close()
    pool.join()

4. 過度なデータ転送

# ❌ 巨大なオブジェクトのpickle化は遅い
large_data = [np.random.rand(1000, 1000) for _ in range(100)]
with Pool(4) as pool:
    pool.map(process_array, large_data)  # 遅いシリアライゼーション
 
# ✅ 共有メモリまたはメモリマップファイルを使用
import numpy as np
from multiprocessing import shared_memory
 
# 共有メモリを作成
shm = shared_memory.SharedMemory(create=True, size=1000*1000*8)
arr = np.ndarray((1000, 1000), dtype=np.float64, buffer=shm.buf)
 
# 名前と形状のみを渡す
def process_shared(name, shape):
    existing_shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray(shape, dtype=np.float64, buffer=existing_shm.buf)
    # arrを処理...
    existing_shm.close()
 
with Pool(4) as pool:
    pool.starmap(process_shared, [(shm.name, (1000, 1000))] * 4)
 
shm.close()
shm.unlink()

FAQ

multiprocessingはどのようにしてGILをバイパスしますか?

GIL(Global Interpreter Lock)は各Pythonインタープリタ内のmutexで、複数のスレッドが同時にPythonバイトコードを実行することを防ぎます。Multiprocessingは、それぞれ独自のインタープリタとGILを持つ別々のPythonプロセスを作成することでこれをバイパスします。プロセスはメモリを共有しないため、GILの競合なしにCPUコア全体で真に並列に実行されます。

multiprocessingとthreadingはいつ使い分けるべきですか?

GILがパフォーマンスを制限するCPU束縛タスク(データ処理、計算、画像操作)にはmultiprocessingを使用します。I/O中にGILが解放されるI/O束縛タスク(ネットワークリクエスト、ファイル操作)にはthreadingを使用し、スレッドが並行して動作できるようにします。Threadingはオーバーヘッドが低いですが、GILのためCPU作業を並列化できません。

なぜif name == 'main'ガードが必要ですか?

WindowsとmacOSでは、子プロセスが関数にアクセスするためにメインモジュールをインポートします。ガードがないと、モジュールのインポートがPool作成コードを再度実行し、無限プロセス(フォーク爆弾)を生成します。Linuxはインポートを必要としないfork()を使用しますが、クロスプラットフォームコードのベストプラクティスとしてガードは必要です。

何個のワーカープロセスを使用すべきですか?

CPU束縛タスクの場合、cpu_count()(CPUコア数)から始めます。コア数より多いワーカーはコンテキストスイッチングオーバーヘッドを引き起こします。I/O束縛タスクの場合、プロセスがI/Oを待機するため、より多くのワーカー(2-4倍のコア)を使用できます。メモリとデータ転送オーバーヘッドが最適なワーカー数を制限する可能性があるため、常に特定のワークロードでベンチマークしてください。

multiprocessing関数に渡せるオブジェクトは何ですか?

オブジェクトはpickle可能(pickleでシリアライズ可能)である必要があります。これには組み込み型(int、str、list、dict)、NumPy配列、pandas DataFrame、ほとんどのユーザー定義クラスが含まれます。ラムダ、ローカル関数、クラスメソッド、ファイルハンドル、データベース接続、スレッドロックはpickle化できません。モジュールレベルで関数を定義するか、部分適用にはfunctools.partialを使用してください。

結論

Python multiprocessingは、CPU束縛のボトルネックを、利用可能なコアに応じてスケールする並列操作に変換します。別々のプロセスを通じてGILをバイパスすることで、threadingでは不可能な真の並列処理を実現します。Poolインターフェースは一般的なパターンを簡素化し、Queue、Pipe、共有メモリは複雑なプロセス間ワークフローを可能にします。

恥ずかしいほど並列なタスクにはPool.map()から始め、高速化を測定し、そこから最適化してください。if __name__ == '__main__'ガードを忘れず、プロセスオーバーヘッドを償却するためにタスクを粗粒度に保ち、プロセス間のデータ転送を最小限に抑えてください。デバッグが複雑になった場合、RunCellのようなツールがプロセス境界を越えた実行のトレースに役立ちます。

Multiprocessingが常に答えというわけではありません。I/O束縛作業の場合、threadingやasyncioの方がシンプルで高速かもしれません。しかし、大規模なデータセットを処理したり、モデルをトレーニングしたり、重い計算を実行したりする場合、multiprocessingはマルチコアマシンが構築された目的のパフォーマンスを提供します。

📚