multiprocessing模块用法

由于一些历史原因,我们常用的CPython具有全局解释器锁(GIL)机制,这把全局大锁导致Python的多线程性能非常糟糕,为了解决这个问题,一种方案是使用多进程来取代多线程。
multiprocessingmultiprocessing.dummy是python下两个常用的多进程和多线程模块

常用函数

multiprocessingmultiprocessing.dummy分别为多进程和多线程模块,但是两者的调用方式基本相同

进程池/线程池

multiprocessing.Pool(processes = pop_size)multiprocessing.ThreadPool(processes = pop_size)可以创建不同大小的进程池和线程池。这两个函数还可以传入initializerinitargs两个参数,用于进行初始化。默认情况下Pool会创建process数量的进程/线程,但是maxtasksperchild参数可以控制一条进程/线程最多处理的任务数,当超过这个数量时会重新启动一个新的进程/线程。

调用并取回结果

以多进程为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
task_size = len(tasks)
ans = [None] * task_size
result = []
multiprocessing.freeze_support()
pool = multiprocessing.Pool(processes = task_size)
for (i, task) in zip(count(0), tasks):
run = pool.apply_async(callee, args = (task, i))
result.append( (i, callee) )
try:
pool.close()
pool.join()
for res in result:
ans[res[0]] = res[1].get()
except Exception, e:
print e
return ans

其中对于多进程,multiprocessing.freeze_support()语句在windows系统上是必须的,这是因为windows的API不包含fork()等函数。
apply_async表示异步调用,此时各进程在运行完毕后pool.join()回到主进程,主进程通过res.get()函数获得callee返回结果。

multiprocessing和subprocess

为了提高计算效率,可以编写一段主程序,用它来启动若干个外部程序,并把总的计算任务拆分发送给这些外部程序并行计算。
运行外部的可执行程序可以使用subprocess模块,主程序通过PIPE和该外部子程序进行通信,这样的通信会阻塞主程序,不能达到并行的效果。
为了能够异步地对多个subprocess进行通信,可以使用multiprocessing的多进程,每条进程中调用subprocess,subprocess在进程结束后取回输出,并交回给主进程合并。
这样的方法对于n个任务需要启动n个外部程序,如果外部程序的初始化成本比较大,这样的设计方案成本是划不来的,Windows系统下有不能直接fork。比较好的方法是预先初始化m个外部程序作为进程池,然后进程池中的每个外部程序依次处理[n/m]个任务,外部程序和外部程序之间是并行的。
实现这种方案最好使用multiprocessing的多线程。多线程的方案和多进程的方案是类似的,由子线程负责和外部进程进行通信。这样子线程是彼此并行的,而主线程可以阻塞起来,等所有的线程计算完毕join回来即可。虽然说Python自带的GIL给多线程的带来阻碍,但是主要的计算工作主要存在于subprocess所调用的外部程序中,因此性能损失有限。并且采用多进程由于不能共享内存,因此很难将初始化好的外部程序的句柄交给相应的子进程。

初始化外部进程

对于这样的多线程方案,首先通过以下语句启动proc_size个外部进程,并且注册proc_size个线程负责和各个外部进程进行交互。主线程使用join函数等待所有子线程返回结果,

1
2
3
4
5
6
for index in xrange(proc_size):
subproc = subprocess.Popen(['XXX.exe'], stdin = subprocess.PIPE, stdout = subprocess.PIPE
, stderr = subprocess.PIPE, bufsize=1, close_fds='posix' in sys.builtin_module_names)
t = Thread(target = initial_method, args = (index, subproc, call_back))
t.daemon = True
ths.append(t)

如果Popen时需要传参数,不能将参数直接和程序名写到一个字符串里面,而是把每个参数放到单独的字符串中append到程序名所在的数组里面。
Popenstdin等参数用来重定向子程序的三个标准流,常见选项是subprocess.PIPENone。使用None继承父进程的标准流,例如当shell = False时则所有父程序的输入会被转发给子程序,但当shell = True时会先启动一个shell再运行程序,这时候实际上是接受的shell的标准输入。使用subprocess.PIPE则和子程序之间建立管道。可以调用Popen.communicate(input)来通过管道向子进程传递信息,之后程序会阻塞在communicate上,直到从子程序传回信息。
communicate方法对应的是Popen.stdin.write()方法,这两个有一些区别。此外communicate会默认调用stdin.close(),这相当于向对方发送一个EOF。所以当需要多次向子程序写数据时,并且子程序侦测来自主程序的EOF作为结束提示时,应当使用stdin.write
在写ATP时我还遇到程序子程序无法获得stdin.write()写入的数据的情况,这是需要设置shell=True
这里的t.daemon = True表示该线程是主线程的守护线程,守护线程会在主线程退出时自动退出。对每个线程调用start方法,线程才会启动,这时候线程使用给定的args参数调用target参数传入的initial_method方法。

1
2
3
4
for i in xrange(proc_size):
ths[i].start()
for i in xrange(proc_size):
ths[i].join()

调用外部进程计算

由于n远大于m,因此对每一个外部进程需要使用互斥锁threading.Lock()维护。将线程i按照模m分组,同剩余系的线程共享一个进程。线程取得进程资源后调用lock.acquire()为进程资源上锁,这时候如果其他线程再次调用lock.acquire()则会陷入阻塞状态,直到获得锁的线程调用lock.release()释放资源。