multiprocessing模块用法

由于一些历史原因,我们常用的CPython具有全局解释器锁(GIL)机制,这把全局大锁导致Python的多线程性能非常糟糕,为了解决这个问题,一种方案是使用协程代替多线程,由于自带了yield,Python实现协程相对于C++那种动辄setjmp或者CPS,抑或利用Linux.ucontext等基于运行时的组件的方式相比要优雅了一些。不过协程对于IO密集型的程序用处很大,但如果我们想要实现并行,那么使用多进程来取代多线程就是一个必由之路。
multiprocessingmultiprocessing.dummy是Python下两个常用的多进程和多线程模块。

multiprocessing提供的多进程与多线程机制

multiprocessingmultiprocessing.dummy分别为多进程和多线程模块,但是两者的调用方式基本相同

进程池/线程池

multiprocessing.Pool(processes = pop_size)multiprocessing.ThreadPool(processes = pop_size)可以创建不同大小的进程池和线程池。这两个函数还可以传入initializerinitargs两个参数,用于进行初始化。默认情况下Pool会创建process数量的进程/线程,但是maxtasksperchild参数可以控制一条进程/线程最多处理的任务数,当超过这个数量时会重新启动一个新的进程/线程。

调用并取回结果

这里我们实际上实现了一个future需要的功能下面我们以多进程为例来展示一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
task_size = len(tasks)
ans = [None] * task_size
result = []
multiprocessing.freeze_support()
pool = multiprocessing.Pool(processes = task_size)
for (i, task) in zip(count(0), tasks):
run = pool.apply_async(callee, args = (task, i))
result.append( (i, callee) )
try:
pool.close()
pool.join()
for res in result:
ans[res[0]] = res[1].get()
except Exception, e:
print e
return ans

其中对于多进程,multiprocessing.freeze_support()语句在windows系统上是必须的,这是因为windows的API不包含fork()等函数。
apply_async表示异步调用,此时各进程在运行完毕后pool.join()回到主进程,主进程通过res.get()函数获得callee返回结果。

multiprocessing使用实例

使用multiprocessing与subprocess协作

这个案例来自于我毕业论文的一个需求。为了提高计算效率,可以编写一段主程序,用它来启动若干个外部程序,并把总的计算任务拆分发送给这些外部程序并行计算。运行外部的可执行程序可以使用subprocess模块,主程序通过PIPE和该外部子程序进行通信,这样的通信会阻塞主程序,不能达到并行的效果。为了能够异步地对多个subprocess进行通信,可以使用multiprocessing的多进程,每条进程中调用subprocess,subprocess在进程结束后取回输出,并交回给主进程合并。
这样的方法对于n个任务需要启动n个外部程序,如果外部程序的初始化成本比较大,这样的设计方案成本是划不来的,Windows系统下有不能直接fork。比较好的方法是预先初始化m个外部程序作为进程池,然后进程池中的每个外部程序依次处理[n/m]个任务,外部程序和外部程序之间是并行的。为了管理这些子程序,我们可以借助于multiprocessing的多线程,由子线程负责和外部进程进行通信。这样子线程是彼此并行的,而主线程可以阻塞起来,等所有的线程计算完毕join回来即可。虽然说Python自带的GIL给多线程的带来阻碍,但是主要的计算工作主要存在于subprocess所调用的外部程序中,因此性能损失有限。并且采用多进程由于不能共享内存,因此很难将初始化好的外部程序的句柄交给相应的子进程。

初始化外部进程

对于这样的多线程方案,首先通过以下语句启动proc_size个外部进程,并且注册proc_size个线程负责和各个外部进程进行交互。主线程使用join函数等待所有子线程返回结果。

1
2
3
4
5
6
for index in xrange(proc_size):
subproc = subprocess.Popen(['XXX.exe'], stdin = subprocess.PIPE, stdout = subprocess.PIPE
, stderr = subprocess.PIPE, bufsize=1, close_fds='posix' in sys.builtin_module_names)
t = Thread(target = initial_method, args = (index, subproc, call_back))
t.daemon = True
ths.append(t)

有关subprocess模块的用法我已经另外开了一篇文章来记录。
这里的t.daemon = True表示该线程是主线程的守护线程,守护线程会在主线程退出时自动退出。对每个线程调用start方法,线程才会启动,这时候线程使用给定的args参数调用target参数传入的initial_method方法。

1
2
3
4
for i in xrange(proc_size):
ths[i].start()
for i in xrange(proc_size):
ths[i].join()

调用外部进程计算

由于n远大于m,因此对每一个外部进程需要使用互斥锁threading.Lock()维护。将线程i按照模m分组,同剩余系的线程共享一个进程。线程取得进程资源后调用lock.acquire()为进程资源上锁,这时候如果其他线程再次调用lock.acquire()则会陷入阻塞状态,直到获得锁的线程调用lock.release()释放资源。