本文只是对 multiprocessing 模块的简要介绍,更多详情,请移步官方文档。
https://docs.python.org/3/library/multiprocessing.html
multiprocessing 是一个支持使用与 threading 模块类似的 API 生成进程的包。使用子进程替代线程可以避免 GIL,因此通过使用 multiprocessing 模块可以充分利用多核。
multiprocessing 模块也引入了 threading 模块没有的 API,比如 pool 对象。下面是使用 pool 进行并行编程的例子:
xxxxxxxxxx
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
执行上面的程序,将打印:
xxxxxxxxxx
[1, 4, 9]
在 multiprocessing 中,通过 Process 对象生成进程,调用它的 start() 方法可以启动进程。Process 的 API 与 threading.Thread 相同。下面是一个多进程的例子:
xxxxxxxxxx
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
if __name__ == '__main__'
是必须的,原因请查看 Programming guidelines。
multiprocessing 支持两种用于进程间通信的数据类型:
Queue
Queue 类是 queue.Queue 的克隆。比如:
xxxxxxxxxx
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Queue 是进程和线程安全的。
Pipe
pipe() 函数返回通过管道(默认是双向的)连接的一对连接对象。比如:
xxxxxxxxxx
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
Pipe() 函数返回的两个连接对象代表一个管道的两端。每个连接对象都有 send() 和 recv() 方法。注意:如果两个进程(或线程)同时在管道的同一端读取或写入,数据可能损坏。当不同进程使用管道的不同端时,没有损坏风险。
multiprocessing 包含 threading 模块的所有同步原语的等价物。比如可以使用锁保证在同一时刻只有一个进程能向标准输出打印:
xxxxxxxxxx
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用锁,那么不同进程的输出很容易混合在一起。
multiprocessing 提供两种用于在进程间共享状态的方式:
共享内存
使用 Value 或 Array,将数据存储到共享内存映射中。比如:
xxxxxxxxxx
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
会打印
xxxxxxxxxx
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建 num 和 arr 时使用的 'd' 和 'i' 参数是 array 模块使用的类型代码的种类:'d' 表示双精度浮点数,'i' 表示有符号整型。这些共享对象是进程和线程安全的。
multiprocessing.sharedctypes 模块支持创建任意 ctypes 对象。
关于共享内存,也可参考:https://docs.python.org/3/library/mmap.html
服务进程
Manager() 返回的管理对象控制一个服务进程,服务进程持有 Pyhton 对象,允许其它进程使用代理操作这些对象。
Manager() 返回的管理器支持 list
, dict
, Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
和 Array
。比如:
xxxxxxxxxx
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
将打印
xxxxxxxxxx
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务进程管理器比共享内存更灵活,因为它们支持任意对象类型。不同计算机上的进程可以通过网络共享单个管理器。然而这种方式比共享内存慢。
Pool 类代表进程池。可以通过若干种方式向进程池提交任务。比如:
xxxxxxxxxx
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
注意:进程池的方法仅能被创建它的进程使用。
class multiprocessing.Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
Process 类用于创建子进程,该类拥有 threading.Thread 的所有方法的等价物。
应该使用关键字参数调用其构造器。group 参数必须是 None;它的存在只是为了兼容 threading.Thread。target 是被 run() 方法调用的可调用对象。默认是 None,表示什么都不做。name 是进程的名字。args 是用于 target 调用的元组参数。kwargs 是用于 target 调用的字典参数。daemon 参数用于将进程的 daemon 标记设置为 True 或 False。如果未设置,那么将从父进程继承该标记。
如果子进程重写构造器,那么它必须确保在做任何事情之前,先调用基类的构造器(Process.__init__())。
下面是一个示例:
xxxxxxxxxx
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
multiprocessing.dummy 只是将 threading 模块的 API 封装成与 multiprocessing 一样。其带来的好处是只需要很小的改动就可以把多进程程序改成多线程程序,反之亦然。比如:
xxxxxxxxxx
import multiprocessing as mp
import time
def f(x):
print("start")
time.sleep(x)
print("end")
if __name__ == "__main__":
# p 是进程
p = mp.Process(target=f, args=(1, ))
p.start()
p.join()
只需改动第一行,即可将该多进程程序改成多线程程序:
xxxxxxxxxx
import multiprocessing.dummy as mp
import time
def f(x):
print("start")
time.sleep(x)
print("end")
if __name__ == "__main__":
# p 是线程
p = mp.Process(target=f, args=(1, ))
p.start()
p.join()
multiprocessing.dummy 提供的 Pool 函数返回的是 ThreadPool 实例,它是 Pool 的子类,支持所有相同的方法调用,但是使用线程池而不是进程池。