Skip to content
话题
Python
Python Multiprocessing:提升速度的并行处理指南

Python Multiprocessing:提升速度的并行处理指南

Updated on

Python的单线程执行模型在处理大型数据集或执行CPU密集型计算时会遇到瓶颈。一个需要10分钟处理数据的脚本理论上可以在5核机器上2分钟内完成,但Python的全局解释器锁(GIL)阻止了标准线程实现真正的并行性。结果是CPU核心被浪费,开发者只能沮丧地看着他们的多核处理器闲置,而Python一次处理一个任务。

这个瓶颈耗费真实的时间和金钱。数据科学家等待几小时完成本可以在几分钟内完成的模型训练。网页爬虫以潜在速度的一小部分爬取。应该利用所有可用核心的图像处理管道只用一个核心缓慢前进。

multiprocessing模块通过创建独立的Python进程来解决这个问题,每个进程都有自己的解释器和内存空间。与线程不同,进程完全绕过GIL,允许在CPU核心上真正并行执行。本指南向您展示如何利用multiprocessing实现显著的性能提升,从基本的并行执行到进程池和共享内存等高级模式。

📚

理解GIL问题

全局解释器锁(GIL)是一个保护Python对象访问的互斥锁,防止多个线程同时执行Python字节码。即使在16核机器上,对于CPU密集型任务,Python线程也是一次执行一个。

import threading
import time
 
def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i * i
    return count
 
# Threading不会并行化CPU密集型工作
start = time.time()
threads = [threading.Thread(target=cpu_bound_task, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Threading: {time.time() - start:.2f}s")  # 与单线程时间相同

GIL只在I/O操作(文件读取、网络请求)期间释放,使得threading对I/O密集型任务有用,但对CPU密集型工作无效。Multiprocessing通过并行运行独立的Python解释器来绕过GIL。

使用Process进行基本Multiprocessing

Process类创建一个独立运行的新Python进程。每个进程都有自己的内存空间和Python解释器。

from multiprocessing import Process
import os
 
def worker(name):
    print(f"Worker {name} 在进程 {os.getpid()} 中运行")
    result = sum(i*i for i in range(5_000_000))
    print(f"Worker {name} 完成: {result}")
 
if __name__ == '__main__':
    processes = []
 
    # 创建4个进程
    for i in range(4):
        p = Process(target=worker, args=(f"#{i}",))
        processes.append(p)
        p.start()
 
    # 等待所有进程完成
    for p in processes:
        p.join()
 
    print("所有进程已完成")

关键要求:在Windows和macOS上始终使用if __name__ == '__main__'保护。没有它,子进程会递归生成更多进程,导致fork炸弹。

Process Pool:简化的并行执行

Pool管理一个工作进程池,自动分配任务。这是最常见的multiprocessing模式。

from multiprocessing import Pool
import time
 
def process_item(x):
    """模拟CPU密集型工作"""
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    data = range(100)
 
    # 顺序处理
    start = time.time()
    results_seq = [process_item(x) for x in data]
    seq_time = time.time() - start
 
    # 使用4个工作进程并行处理
    start = time.time()
    with Pool(processes=4) as pool:
        results_par = pool.map(process_item, data)
    par_time = time.time() - start
 
    print(f"顺序: {seq_time:.2f}s")
    print(f"并行 (4核): {par_time:.2f}s")
    print(f"加速: {seq_time/par_time:.2f}x")

Pool方法比较

不同的Pool方法适用于不同的使用场景:

方法使用场景阻塞返回多参数
map()简单并行化有序列表否(单参数)
map_async()非阻塞mapAsyncResult
starmap()多参数有序列表是(元组解包)
starmap_async()非阻塞starmapAsyncResult
apply()单个函数调用单个结果
apply_async()非阻塞applyAsyncResult
imap()惰性迭代器迭代器
imap_unordered()惰性、无序迭代器
from multiprocessing import Pool
 
def add(x, y):
    return x + y
 
def power(x, exp):
    return x ** exp
 
if __name__ == '__main__':
    with Pool(4) as pool:
        # map: 单参数
        squares = pool.map(lambda x: x**2, [1, 2, 3, 4])
 
        # starmap: 多参数(解包元组)
        results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
 
        # apply_async: 非阻塞单次调用
        async_result = pool.apply_async(power, (2, 10))
        result = async_result.get()  # 阻塞直到就绪
 
        # imap: 大数据集的惰性求值
        for result in pool.imap(lambda x: x**2, range(1000)):
            pass  # 结果到达时逐个处理

进程间通信

进程默认不共享内存。使用QueuePipe进行通信。

Queue:线程安全的消息传递

from multiprocessing import Process, Queue
 
def producer(queue, items):
    for item in items:
        queue.put(item)
        print(f"生产: {item}")
    queue.put(None)  # 哨兵值
 
def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费: {item}")
        # 处理项目...
 
if __name__ == '__main__':
    q = Queue()
    items = [1, 2, 3, 4, 5]
 
    prod = Process(target=producer, args=(q, items))
    cons = Process(target=consumer, args=(q,))
 
    prod.start()
    cons.start()
 
    prod.join()
    cons.join()

Pipe:双向通信

from multiprocessing import Process, Pipe
 
def worker(conn):
    conn.send("来自工作进程的问候")
    msg = conn.recv()
    print(f"工作进程接收: {msg}")
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
 
    msg = parent_conn.recv()
    print(f"父进程接收: {msg}")
    parent_conn.send("来自父进程的问候")
 
    p.join()

共享内存和状态

虽然进程有独立的内存,但multiprocessing提供了共享内存原语。

Value和Array:共享原语

from multiprocessing import Process, Value, Array
import time
 
def increment_counter(counter, lock):
    for _ in range(100_000):
        with lock:
            counter.value += 1
 
def fill_array(arr, start, end):
    for i in range(start, end):
        arr[i] = i * i
 
if __name__ == '__main__':
    # 带锁的共享值
    counter = Value('i', 0)
    lock = counter.get_lock()
 
    processes = [Process(target=increment_counter, args=(counter, lock)) for _ in range(4)]
    for p in processes: p.start()
    for p in processes: p.join()
 
    print(f"计数器: {counter.value}")  # 应该是400,000
 
    # 共享数组
    shared_arr = Array('i', 1000)
    p1 = Process(target=fill_array, args=(shared_arr, 0, 500))
    p2 = Process(target=fill_array, args=(shared_arr, 500, 1000))
    p1.start(); p2.start()
    p1.join(); p2.join()
 
    print(f"Array[100]: {shared_arr[100]}")  # 10,000

Manager:复杂的共享对象

from multiprocessing import Process, Manager
 
def update_dict(shared_dict, key, value):
    shared_dict[key] = value
 
if __name__ == '__main__':
    with Manager() as manager:
        # 共享dict、list、namespace
        shared_dict = manager.dict()
        shared_list = manager.list()
 
        processes = [
            Process(target=update_dict, args=(shared_dict, f"key{i}", i*10))
            for i in range(5)
        ]
 
        for p in processes: p.start()
        for p in processes: p.join()
 
        print(dict(shared_dict))  # {'key0': 0, 'key1': 10, ...}

比较:Multiprocessing vs Threading vs Asyncio

特性MultiprocessingThreadingAsyncioconcurrent.futures
绕过GIL取决于执行器
CPU密集型任务优秀优秀(ProcessPoolExecutor)
I/O密集型任务良好优秀优秀优秀(ThreadPoolExecutor)
内存开销高(独立进程)低(共享内存)变化
启动成本很低变化
通信Queue、Pipe、共享内存直接(共享状态)原生async/awaitFutures
最适合CPU密集型并行任务I/O密集型任务、简单并发异步I/O、多并发任务两者的统一API
# 对CPU密集型使用multiprocessing
from multiprocessing import Pool
 
def cpu_bound(n):
    return sum(i*i for i in range(n))
 
with Pool(4) as pool:
    results = pool.map(cpu_bound, [10_000_000] * 4)
 
# 对I/O密集型使用threading
import threading
import requests
 
def fetch_url(url):
    return requests.get(url).text
 
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
 
# 对异步I/O使用asyncio
import asyncio
import aiohttp
 
async def fetch_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
 
asyncio.run(asyncio.gather(*[fetch_async(url) for url in urls]))

高级:ProcessPoolExecutor

concurrent.futures.ProcessPoolExecutor提供了与ThreadPoolExecutor相同API的高级接口。

from concurrent.futures import ProcessPoolExecutor, as_completed
import time
 
def process_task(x):
    time.sleep(0.1)
    return x * x
 
if __name__ == '__main__':
    # 上下文管理器确保清理
    with ProcessPoolExecutor(max_workers=4) as executor:
        # 提交单个任务
        futures = [executor.submit(process_task, i) for i in range(20)]
 
        # 完成时处理
        for future in as_completed(futures):
            result = future.result()
            print(f"结果: {result}")
 
        # 或使用map(类似Pool.map)
        results = executor.map(process_task, range(20))
        print(list(results))

相比Pool的优势

  • ThreadPoolExecutorProcessPoolExecutor的API相同
  • Futures接口提供更多控制
  • 更好的错误处理
  • 更容易混合同步和异步代码

常见模式

尴尬并行任务

没有依赖关系的任务最适合multiprocessing:

from multiprocessing import Pool
import pandas as pd
 
def process_chunk(chunk):
    """独立处理数据块"""
    chunk['new_col'] = chunk['value'] * 2
    return chunk.groupby('category').sum()
 
if __name__ == '__main__':
    df = pd.DataFrame({'category': ['A', 'B'] * 5000, 'value': range(10000)})
 
    # 分割成块
    chunks = [df.iloc[i:i+2500] for i in range(0, len(df), 2500)]
 
    with Pool(4) as pool:
        results = pool.map(process_chunk, chunks)
 
    # 合并结果
    final = pd.concat(results).groupby('category').sum()

Map-Reduce模式

from multiprocessing import Pool
from functools import reduce
 
def mapper(text):
    """Map: 提取词语并计数"""
    words = text.lower().split()
    return {word: 1 for word in words}
 
def reducer(dict1, dict2):
    """Reduce: 合并词频"""
    for word, count in dict2.items():
        dict1[word] = dict1.get(word, 0) + count
    return dict1
 
if __name__ == '__main__':
    documents = ["你好 世界", "python的世界", "你好 python"] * 1000
 
    with Pool(4) as pool:
        # Map阶段:并行
        word_dicts = pool.map(mapper, documents)
 
    # Reduce阶段:顺序(或使用树状归约)
    word_counts = reduce(reducer, word_dicts)
    print(word_counts)

多生产者的生产者-消费者

from multiprocessing import Process, Queue, cpu_count
 
def producer(queue, producer_id, items):
    for item in items:
        queue.put((producer_id, item))
    print(f"生产者 {producer_id} 完成")
 
def consumer(queue, num_producers):
    finished_producers = 0
    while finished_producers < num_producers:
        if not queue.empty():
            item = queue.get()
            if item is None:
                finished_producers += 1
            else:
                producer_id, data = item
                print(f"从生产者 {producer_id} 消费: {data}")
 
if __name__ == '__main__':
    q = Queue()
    num_producers = 3
 
    # 启动生产者
    producers = [
        Process(target=producer, args=(q, i, range(i*10, (i+1)*10)))
        for i in range(num_producers)
    ]
    for p in producers: p.start()
 
    # 启动消费者
    cons = Process(target=consumer, args=(q, num_producers))
    cons.start()
 
    # 清理
    for p in producers: p.join()
    for _ in range(num_producers):
        q.put(None)  # 向消费者发信号
    cons.join()

性能考虑

何时Multiprocessing有帮助

  • CPU密集型任务:数据处理、数学计算、图像处理
  • 大数据集:当每项处理时间证明进程开销合理时
  • 独立任务:无共享状态或最小通信

何时Multiprocessing有害

进程创建开销可能超过收益:

from multiprocessing import Pool
import time
 
def tiny_task(x):
    return x + 1
 
if __name__ == '__main__':
    data = range(100)
 
    # 对小任务顺序处理更快
    start = time.time()
    results = [tiny_task(x) for x in data]
    print(f"顺序: {time.time() - start:.4f}s")  # ~0.0001s
 
    start = time.time()
    with Pool(4) as pool:
        results = pool.map(tiny_task, data)
    print(f"并行: {time.time() - start:.4f}s")  # ~0.05s(慢500倍!)

经验法则

  • 最小任务持续时间:每项约0.1秒
  • 数据大小:如果pickle数据比处理时间长,使用共享内存
  • 工作进程数:从cpu_count()开始,根据任务特征调整

Pickling要求

只有可pickle的对象才能在进程间传递:

from multiprocessing import Pool
 
# ❌ Lambda函数不可pickle
# pool.map(lambda x: x*2, range(10))  # 失败
 
# ✅ 使用命名函数
def double(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(double, range(10))
 
# ❌ 笔记本中的局部函数会失败
# def process():
#     def inner(x): return x*2
#     pool.map(inner, range(10))  # 失败
 
# ✅ 在模块级别定义或使用functools.partial
from functools import partial
 
def multiply(x, factor):
    return x * factor
 
with Pool(4) as pool:
    pool.map(partial(multiply, factor=3), range(10))

使用RunCell调试并行代码

调试multiprocessing代码是出了名的困难。print语句消失、断点不工作、堆栈跟踪难以理解。当进程静默崩溃或产生错误结果时,传统调试工具会失败。

RunCellwww.runcell.dev)是为Jupyter构建的AI (opens in a new tab) Agent,擅长调试并行代码。与无法跟踪跨进程执行的标准调试器不同,RunCell分析您的multiprocessing模式、识别竞态条件、在运行时之前捕获pickling错误,并解释为什么进程死锁。

当Pool工作进程在没有traceback的情况下崩溃时,RunCell可以检查错误队列并准确显示哪个函数调用失败以及原因。当共享状态产生错误结果时,RunCell追踪内存访问模式以找到缺失的锁。对于调试复杂并行数据管道的数据科学家,RunCell将数小时的print语句调试转变为几分钟的AI引导修复。

最佳实践

1. 始终使用if __name__保护

# ✅ 正确
if __name__ == '__main__':
    with Pool(4) as pool:
        pool.map(func, data)
 
# ❌ 错误 - 在Windows上导致fork炸弹
with Pool(4) as pool:
    pool.map(func, data)

2. 显式关闭Pool

# ✅ 上下文管理器(推荐)
with Pool(4) as pool:
    results = pool.map(func, data)
 
# ✅ 显式close和join
pool = Pool(4)
results = pool.map(func, data)
pool.close()
pool.join()
 
# ❌ 泄漏资源
pool = Pool(4)
results = pool.map(func, data)

3. 处理异常

from multiprocessing import Pool
 
def risky_task(x):
    if x == 5:
        raise ValueError("错误的值")
    return x * 2
 
if __name__ == '__main__':
    with Pool(4) as pool:
        try:
            results = pool.map(risky_task, range(10))
        except ValueError as e:
            print(f"任务失败: {e}")
 
        # 或使用apply_async单独处理
        async_results = [pool.apply_async(risky_task, (i,)) for i in range(10)]
 
        for i, ar in enumerate(async_results):
            try:
                result = ar.get()
                print(f"结果 {i}: {result}")
            except ValueError:
                print(f"任务 {i} 失败")

4. 尽可能避免共享状态

# ❌ 共享状态需要同步
from multiprocessing import Process, Value
 
counter = Value('i', 0)
 
def increment():
    for _ in range(100000):
        counter.value += 1  # 竞态条件!
 
# ✅ 使用锁或避免共享
from multiprocessing import Lock
 
lock = Lock()
 
def increment_safe():
    for _ in range(100000):
        with lock:
            counter.value += 1
 
# ✅ 更好:避免共享状态
def count_locally(n):
    return n  # 返回结果
 
with Pool(4) as pool:
    results = pool.map(count_locally, [100000] * 4)
    total = sum(results)

5. 选择正确的工作进程数

from multiprocessing import cpu_count, Pool
 
# CPU密集型:使用所有核心
num_workers = cpu_count()
 
# I/O密集型:可以使用更多工作进程
num_workers = cpu_count() * 2
 
# 混合工作负载:经验调整
with Pool(processes=num_workers) as pool:
    results = pool.map(func, data)

常见错误

1. 忘记if __name__保护

导致Windows/macOS上无限进程生成。

2. 尝试Pickle不可Pickle的对象

# ❌ 类方法、lambda、局部函数会失败
class DataProcessor:
    def process(self, x):
        return x * 2
 
dp = DataProcessor()
# pool.map(dp.process, data)  # 失败
 
# ✅ 使用顶层函数
def process(x):
    return x * 2
 
with Pool(4) as pool:
    pool.map(process, data)

3. 不处理进程终止

# ❌ 不适当清理
pool = Pool(4)
results = pool.map(func, data)
# pool仍在运行
 
# ✅ 始终close和join
pool = Pool(4)
try:
    results = pool.map(func, data)
finally:
    pool.close()
    pool.join()

4. 过度数据传输

# ❌ Pickle巨大对象很慢
large_data = [np.random.rand(1000, 1000) for _ in range(100)]
with Pool(4) as pool:
    pool.map(process_array, large_data)  # 慢速序列化
 
# ✅ 使用共享内存或内存映射文件
import numpy as np
from multiprocessing import shared_memory
 
# 创建共享内存
shm = shared_memory.SharedMemory(create=True, size=1000*1000*8)
arr = np.ndarray((1000, 1000), dtype=np.float64, buffer=shm.buf)
 
# 只传递名称和形状
def process_shared(name, shape):
    existing_shm = shared_memory.SharedMemory(name=name)
    arr = np.ndarray(shape, dtype=np.float64, buffer=existing_shm.buf)
    # 处理arr...
    existing_shm.close()
 
with Pool(4) as pool:
    pool.starmap(process_shared, [(shm.name, (1000, 1000))] * 4)
 
shm.close()
shm.unlink()

常见问题

multiprocessing如何绕过GIL?

GIL(全局解释器锁)是每个Python解释器中的互斥锁,防止多个线程同时执行Python字节码。Multiprocessing通过创建独立的Python进程来绕过这一点,每个进程都有自己的解释器和GIL。由于进程不共享内存,它们在CPU核心上真正并行运行,没有GIL竞争。

何时应该使用multiprocessing vs threading?

对GIL限制性能的CPU密集型任务(数据处理、计算、图像操作)使用multiprocessing。对I/O密集型任务(网络请求、文件操作)使用threading,其中GIL在I/O期间释放,允许线程并发工作。Threading开销较低,但由于GIL无法并行化CPU工作。

为什么需要if name == 'main'保护?

在Windows和macOS上,子进程导入主模块以访问函数。没有保护,导入模块会再次运行Pool创建代码,生成无限进程(fork炸弹)。Linux使用不需要导入的fork(),但保护仍是跨平台代码的最佳实践。

应该使用多少个工作进程?

对于CPU密集型任务,从cpu_count()(CPU核心数)开始。比核心多的工作进程会导致上下文切换开销。对于I/O密集型任务,可以使用更多工作进程(2-4倍核心),因为进程在I/O上等待。始终用您的特定工作负载进行基准测试,因为内存和数据传输开销可能限制最佳工作进程数。

可以向multiprocessing函数传递哪些对象?

对象必须是可pickle的(可用pickle序列化)。这包括内置类型(int、str、list、dict)、NumPy数组、pandas DataFrame和大多数用户定义的类。Lambda、局部函数、类方法、文件句柄、数据库连接和线程锁不能被pickle。在模块级别定义函数或使用functools.partial进行部分应用。

结论

Python multiprocessing将CPU密集型瓶颈转变为随可用核心扩展的并行操作。通过独立进程绕过GIL,您可以实现threading无法实现的真正并行性。Pool接口简化了常见模式,而Queue、Pipe和共享内存支持复杂的进程间工作流。

从尴尬并行任务的Pool.map()开始,测量加速比,然后从那里优化。记住if __name__ == '__main__'保护,保持任务粗粒度以摊销进程开销,并最小化进程间的数据传输。当调试变得复杂时,像RunCell这样的工具可以帮助跟踪跨进程边界的执行。

Multiprocessing并非总是答案。对于I/O密集型工作,threading或asyncio可能更简单更快。但当您处理大型数据集、训练模型或执行繁重计算时,multiprocessing提供了多核机器构建的性能目标。

📚