在文章中介绍了multiprocessing模块用法,下面将详细介绍这个模块的实现。其内容包括:
- Manager
- conenction.Pipe
Manager
Manager对象会使用一个Server进程来维护需要共享的对象,而其他进程需要通过Proxy来访问这些共享对象。
SyncManager和BaseManager
register
当我们调用multiprocessing.Manager()时,实际上创建了一个SyncManager,并调用它的start。在start中,会创建一个子进程来维护共享对象。
1 | # __init__.py |
我们可以通过SyncManager.dict()等方法创建共享对象,并返回其代理。我们首先来查看它的实现。
1 | class SyncManager(BaseManager): |
从源码上看,SyncManager继承了一个BaseManager,通过register方法在这之中添加了对不同类型共享对象的支持。如果我们需要增加自定义的类,那么就需要放在这里
1 | # 'dict', dict, DictProxy |
这个函数主要是围绕传入的几个参数的,我们结合整个函数的源码看这些参数的作用。
cls类似于成员函数里的self,用来在类里面表示自己,在这里也就是我们的SyncManagertypeid是一个字符串,对应于”Queue”/“dict”等callable用来创建typeid对应的对象,对应于上面的Queue.Queue和dict等。如果一个manager实例通过from_address()创建,或者create_method参数时False的,那么callable就可以被设置为Noneproxytype是BaseProxy的子类,用来创建一个用来访问这个共享对象的代理,如果设置为None,就会默认使用Autoproxy1
2if proxytype is None:
proxytype = AutoProxyexposed用来限定proxy上的这个方法名字是不是public的。
如果请求访问一个没有exposed的接口,那么就会AttributeError并且调用fallback_func。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# 来自serve_client方法
try:
...
if methodname not in exposed:
raise AttributeError(
'method %r of %r object is not in exposed=%r' %
(methodname, type(obj), exposed)
)
...
except AttributeError:
if methodname is None:
msg = ('#TRACEBACK', format_exc())
else:
try:
fallback_func = self.fallback_mapping[methodname]
result = fallback_func(
self, conn, ident, obj, *args, **kwds
)
msg = ('#RETURN', result)
except Exception:
msg = ('#TRACEBACK', format_exc())
...如果指定
exposed为None,那么就会使用proxytype._exposed_,如果proxytype._exposed_也是None,那么就使用这个共享对象的所有的public方法。通过public_methods得到所有不以_开头的方法。1
2
3
4
5
6
7
8
9
10
11exposed = exposed or getattr(proxytype, '_exposed_', None)
def all_methods(obj):
temp = []
for name in dir(obj):
func = getattr(obj, name)
if hasattr(func, '__call__'):
temp.append(name)
return temp
def public_methods(obj):
return [name for name in all_methods(obj) if name[0] != '_']method_to_typeid
这个是一个dict,列出了所有返回值是一个proxy的方法。如果没有指定,那么就使用proxytype._method_to_typeid_,如果仍然为空,那么所有的返回值都按值复制。我们可以参考下面的方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# 在BaseManager.register中
method_to_typeid = method_to_typeid or \
getattr(proxytype, '_method_to_typeid_', None)
if method_to_typeid:
for key, value in method_to_typeid.items():
assert type(key) is str, '%r is not a string' % key
assert type(value) is str, '%r is not a string' % value
# 在全局空间
PoolProxy._method_to_typeid_ = {
'apply_async': 'AsyncResult',
'map_async': 'AsyncResult',
'imap': 'Iterator',
'imap_unordered': 'Iterator'
}create_method
是一个布尔量,默认True。要不要创建一个和typeid同名的方法,并且用这个方法来通知Server进程创建一个共享对象并返回一个proxy。这个只有对Iterator类是False的,也就是说Manager不想提供一个接口让你显式创建一个Iterator。1
2
3
4
5if create_method:
def temp(self, *args, **kwds):
...
temp.__name__ = typeid
setattr(cls, typeid, temp)这个
if分支中实际上是创建了一个temp函数,并且通过setattr将这个temp函数注册为cls中的typeid的方法。这样当我们在调用manager.XXX()时,就会执行这个temp里面的内容。我们将在后面深入探讨这个代码。
register函数除了和几个参数有关的代码之外,剩下就这几行代码了,它们的作用就是将描述这个typeid的信息打包,存储到_registry中。
1 | if '_registry' not in cls.__dict__: |
我们打印一下register完所有类的cls._registry为JoinableQueue,Namespace,BoundedSemaphore,Iterator,Lock,list,RLock,Queue,Array,dict,Pool,Semaphore,Value,Event,Condition
创建共享对象(Client)
这一章节的开始,我们来看register函数中是如何创建共享对象的,主要过程就是通读一下register后面的代码和_create函数的实现。在创建完共享对象之后,这个共享对象就可以看做是远程对象(稍后将讲解的Server端)的一个代理。if create_method中创建的temp入口方法的主要功能是:
- 先通过
_create通知Server创建一个共享变量,并返回其唯一标识token - 创建一个连接到这个
token的proxy,这个proxy还作为这个函数的返回值 decref,有关引用计数的问题将在Server章节中讨论
1 | # managers.py |
_create
首先查看_create这个方法,首先不得不提的是它接受一堆*args和**kwds的参数,这两个鬼东西它自己不用,而是传给dispatch,而dispatch也不用,而是直接send出去。
下面来看流程,首先创建了一个_Client类型的变量conn。By the way,那么Server在哪里呢,稍后解释。
1 | # managers.py |
查看__init__方法得知,这个实际上就是一个套接字。特别地,如果机器支持AF_UNIX的话,会使用UNIX Domain Socket来进行同一主机上的IPC,省去了网络协议栈的层层封装等流程。但从中也能看出,实际上BaseManager在设计上是能够跨机器进行通信的
1 | # managers.py |
在创建好conn之后,就会通过dispatch调用conn.send发送一个对methodname为”create”的调用,并且在参数表头部附上当前的typeid。当这个调用被传到Server类时,会调用Server.create方法,在Server子进程空间中创建一个共享对象,这个将在下一节讲解有关Server部分中提到。
1 | # 继续是_create函数 |
dispatch函数
下面介绍这个大boss,dispatch函数。
1 | # dispatch函数 |
接着dispatch函数就调用conn.recv()获取返回值。返回值的kind一共有#RETURN、#ERROR、#TRACEBACK、#UNSERIALIZABLE四种。如果当前的dispatch调用返回正常的话,他应该会拿到一个exposed,包含了共享对象中所能够调用的所有方法名。
在调用dispatch函数之后,_create就返回了一个Token类型和从dispatch拿到的exposed。Token的定义如下,被用来唯一确定一个共享对象。
1 | class Token(object): |
proxy
在调用_create通知Server创建完共享对象之后,temp函数就创建一个proxy来代理这个远程的共享对象,这里的proxytype是由register函数中传入的,一般是AutoProxy,我们将在后面详细介绍。
1 | proxy = proxytype( |
创建共享对象(Server)
上面的内容是Client部分的初始化过程,下面我们查看Server部分。需要注意的是,虽然是Server/Client结构,但是因为Manager的Server和Client都在一台机器上,并且由同样的代码进行管理,所以在形式上和传统的Socket是不一样的。由于Server是作为一个子进程存在,并且这个子进程和Server本身都是由BaseManager来管理的,并且也需要通过BaseManager提供的方法来操控。所以我们可以看到很多Server部分的工作是在BaseManager而不是Server内部处理的。
根据文档,一旦BaseManager被创建,就需要调用start或者get_server().serve_forever()来确保这个manager是和一个进程关联的。查看代码可以发现,这个start是在__init__.py上面被调用的
1 | # __init__.py |
因此要了解Server如何被创建,就要先看BaseManager是如何被创建的。
构造函数
首先来全面查看一下BaseManager的构造函数
1 | # managers.py |
我们查看一下authkey的相关实现,由于其中涉及了不少Process相关的实现,在这里暂时不提
1 | # process.py |
start方法
我们查看BaseManager.start方法,首先创建了一个Process,并通过connection.Pipe和这个子进程进行通信。关于这个管道的实现,在后面进行说明。
1 | # managers.py |
self._registry
这个就是之前在register方法中保存下来的_registry字典。-
self._address来自于BaseManager的__init__ -
self._authkey来自于BaseManager的__init__
下面暂停对start的阅读,而step in查看Process中执行的主函数_run_server,可以看到在这个方法中:
- 首先执行
initializer,这个initializer是start在args里面传进来的,如果是从__init__.py调用的,那么就是None - 创建一个
Server对象_Server - 向管道里面写入
server.address,这个会被在主进程的reader读取 - 调用
serve_forever
1 |
|
关于Server类,我们稍后来查看,现在我们继续看完start
1 | # 继续start方法 |
额外说明一下,这里的shutdown在__exit__中被调用。这里的__enter__和__exit__被用来实现with
1 | def __enter__(self): |
实际上,util.Finalize是调用type(self)._finalize_manager,使用type(self)是因为_finalize_manager是一个类方法。一般涉及到Process有关的函数,都不能是带有上下文的,而必须是自由函数。在_finalize_manager中主要就是先创建一个_Client,向Server进程self._process发送shutdown指令,接着尝试join这个进程。如果在0.2秒之后这个进程还在的话,就尝试去terminate它。从Process源码来看,它是始终有terminate这个方法的,不知道为啥还需要hasattr来判断下。
1 |
|
Server
先前围绕BaseManager类讨论了Server部分的初始化和析构。现在我们来看Server这个对象,以便了解它具体负责的内容。
首先,照例是构造函数,包含了和套接口API很像的Listener。后面的id_to_obj就是Server所维护的所有共享对象了。id_to_obj是一个dict,它的key是由'%x' % id(obj)来生成的,之所以需要转换成字符串类型是因为xmlrpclib这个库只支持32位的int,所以用str保险一点(当然之前提到,实际用的是pickle)。id_to_refcount则用来标记每个obj的生命周期。
1 | # managers.py |
回忆前面的内容,Client会调用dispatch(conn, None, 'create', (typeid,)+args, kwds)来通知子进程创建一个共享对象。看源码显而易见,肯定最终是调用Server.create这个方法的,但在这之前,会经过一个serve_client方法分发的过程。
1 | # managers.py |
Proxy
下面我们来看Proxy部分的实现。在SyncManager.register('dict', dict, DictProxy)中用到的DictProxy是由MakeProxyType来生成的,这个函数就是一个创建类的简易的“宏”。它创建名字为name的类,继承自BaseProxy,并且传入一个tuple作为这个子类所拥有的方法。对于每个方法,实际调用BaseProxy里面的_callmethod来执行。
1 | DictProxy = MakeProxyType('DictProxy', ( |
下面,我们来看看BaseProxy这个类是如何通过_callmethod转发方法的。可以看到,BaseProxy通过self._tls.connection.send()这个方法将需要调用的方法发送出去,看起来这个_tls就是个套接口了。
1 | def _callmethod(self, methodname, args=(), kwds={}): |
我们查看BaseProxy的初始化部分,看看_tls是何方神圣
1 | class BaseProxy(object): |
补充
Manager的Util部分
看看这个ForkAwareThreadLock实现
1 | class ForkAwareThreadLock(object): |
这里需要跟踪fork之后锁的原因是The child process is created with a single thread–the one that called fork(),因此可能拥有锁的子线程并不在子进程之中。
可以发现,在fork之后,会执行_run_after_forkers方法,这个方法会遍历出_afterfork_registry中的所含有lock,并且将它们换成新的threading.Lock()
connection.Pipe实现
可以想象,根据是否是POSIX系统分为了两种实现。根据sys.platform来判断系统类型。
POSIX
对于POSIX系统,借助socketpair实现双向管道,pipe实现单向管道。相比于popen和pclose,socketpair直接提供了一个双向的读写管道。
我们知道可以通过dup2来进行重定向,例如dup2(f, STDOUT_FILENO)可以将标准输出指向文件f。那么这里为什么要复制一份s1.fileno()并来创建一个_multiprocessing.Connection对象呢?
1 | import multiprocessing |
要想研究这个问题,需要查看_multiprocessing.Connection这个用C实现的模块。
通过观察_multiprocessing下面的目录结构可以看出,它会在c文件里面写上一堆特化的实现,然后在最后#include connection.h来做一个公共的实现。从#define CONNECTION_NAME "Connection" 可以看出,_multiprocessing.Connection实际上是socket_connection。
分析一个模块,首先要找它的PyMODINIT_FUNC函数、PyMethodDef结构、PyModuleDef结构。但_multiprocessing.Connection被定义为一个对象,所以我们分析它的PyVarObject_HEAD_INIT(NULL, 0)的定义。可以看到,它的初始化方法是connection_new。
我们尝试了下面的代码,发现在Linux下面抛出IOError: [Errno 9] Bad file descriptor的错误。因此可以猜测到s1和s2在销毁时会释放自己的fd,这里提前复制一份出来是单纯为了复用这两个fd。
WINDOWS
对于WINDOWS系统,借助于CreateNamedPipe实现。
注意事项
multiprocessing.Manager()的dict()不可以嵌套,也就是说下面的语句实际上会报错
1 | def inner(metainfo): |