multiprocessing模块实现

文章中介绍了multiprocessing模块用法,下面将详细介绍这个模块的实现。其内容包括:

  1. Manager

Manager

Manager对象会使用一个Server进程来维护需要共享的对象,而其他进程需要通过Proxy来访问这些共享对象。

SyncManager和BaseManager

register

当我们调用multiprocessing.Manager()时,实际上创建了一个SyncManager,并调用它的start。在start中,会创建一个子进程来维护共享对象。

1
2
3
4
5
6
7
8
9
10
11
12
# __init__.py
def Manager():
'''
Returns a manager associated with a running server process

The managers methods such as `Lock()`, `Condition()` and `Queue()`
can be used to create shared objects.
'''
from multiprocessing.managers import SyncManager
m = SyncManager()
m.start()
return m

我们可以通过SyncManager.dict()等方法创建共享对象,并返回其代理。我们首先来查看它的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SyncManager(BaseManager):
'''
Subclass of `BaseManager` which supports a number of shared object types.

The types registered are those intended for the synchronization
of threads, plus `dict`, `list` and `Namespace`.

The `multiprocessing.Manager()` function creates started instances of
this class.
'''
SyncManager.register('Queue', Queue.Queue)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)

...

从源码上看,SyncManager继承了一个BaseManager,通过register方法在这之中添加了对不同类型共享对象的支持。

1
2
3
4
# 'dict', dict, DictProxy
@classmethod
def register(cls, typeid, callable=None, proxytype=None, exposed=None,
method_to_typeid=None, create_method=True):

这个函数主要是围绕传入的几个参数的,我们结合整个函数的源码看这些参数的作用。

  1. cls类似于成员函数里的self
  2. typeid是一个字符串,对应于”Queue”/“dict”等
  3. callable用来创建typeid对应的对象,对应于上面的Queue.Queuedict等。如果一个manager实例通过from_address()创建,或者create_method参数时False的,那么callable就可以被设置为None
  4. proxytypeBaseProxy的子类,用来创建一个用来访问这个共享对象的代理,如果设置为None,就会默认使用Autoproxy

    1
    2
    if proxytype is None:
    proxytype = AutoProxy
  5. exposed用来限定proxy上的这个方法名字是不是public的。
    如果请求访问一个没有exposed的接口,那么就会AttributeError并且调用fallback_func

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 来自serve_client方法
    try:
    ...
    if methodname not in exposed:
    raise AttributeError(
    'method %r of %r object is not in exposed=%r' %
    (methodname, type(obj), exposed)
    )
    ...
    except AttributeError:
    if methodname is None:
    msg = ('#TRACEBACK', format_exc())
    else:
    try:
    fallback_func = self.fallback_mapping[methodname]
    result = fallback_func(
    self, conn, ident, obj, *args, **kwds
    )
    msg = ('#RETURN', result)
    except Exception:
    msg = ('#TRACEBACK', format_exc())
    ...

    如果指定exposed为None,那么就会使用proxytype._exposed_,如果proxytype._exposed_也是None,那么就使用这个共享对象的所有的public方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    exposed = exposed or getattr(proxytype, '_exposed_', None)
    def all_methods(obj):
    temp = []
    for name in dir(obj):
    func = getattr(obj, name)
    if hasattr(func, '__call__'):
    temp.append(name)
    return temp

    def public_methods(obj):
    return [name for name in all_methods(obj) if name[0] != '_']
  6. method_to_typeid
    这个是一个dict,列出了所有返回值是一个proxy的方法。如果没有指定,那么就使用proxytype._method_to_typeid_,如果仍然为空,那么所有的返回值都按值复制。我们可以参考下面的方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    # 在BaseManager.register中
    method_to_typeid = method_to_typeid or \
    getattr(proxytype, '_method_to_typeid_', None)

    if method_to_typeid:
    for key, value in method_to_typeid.items():
    assert type(key) is str, '%r is not a string' % key
    assert type(value) is str, '%r is not a string' % value

    # 在全局空间
    PoolProxy._method_to_typeid_ = {
    'apply_async': 'AsyncResult',
    'map_async': 'AsyncResult',
    'imap': 'Iterator',
    'imap_unordered': 'Iterator'
    }
  7. create_method
    是一个布尔量,默认True。要不要创建一个和typeid同名的方法,并且用这个方法来通知Server进程创建一个共享对象并返回一个proxy。这个只有对Iterator类是False的,也就是说Manager不想提供一个接口让你显式创建一个Iterator

    1
    2
    3
    4
    5
    if create_method:
    def temp(self, *args, **kwds):
    ...
    temp.__name__ = typeid
    setattr(cls, typeid, temp)

    这个if分支中实际上是创建了一个temp函数,并且通过setattr将这个temp函数注册为cls中的typeid的方法。这样当我们在调用manager.XXX()时,就会执行这个temp里面的内容。我们将在后面深入探讨这个代码。

register函数除了和几个参数有关的代码之外,剩下就这几行代码了,它们的作用就是将描述这个typeid的信息打包,存储到_registry中。

1
2
3
4
5
6
if '_registry' not in cls.__dict__:
cls._registry = cls._registry.copy()

cls._registry[typeid] = (
callable, exposed, method_to_typeid, proxytype
)

创建共享对象(Client)

if create_method中创建的temp入口方法的主要功能就是先通过_create通知Server创建一个共享变量,并返回其唯一标识token。接着创建一个连接到这个token的proxy作为这个函数的返回值,

1
2
3
4
5
6
7
8
9
10
11
12
13
if create_method:
def temp(self, *args, **kwds):
util.debug('requesting creation of a shared %r object', typeid)
token, exp = self._create(typeid, *args, **kwds)
proxy = proxytype(
token, self._serializer, manager=self,
authkey=self._authkey, exposed=exp
)
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
temp.__name__ = typeid
setattr(cls, typeid, temp)

_create

首先查看_create这个方法,它首先创建了一个_Client类型的变量conn。By the way,那么Server在哪里呢,稍后解释。

1
2
3
4
5
6
7
def _create(self, typeid, *args, **kwds):
'''
Create a new shared object; return the token and exposed tuple
'''
assert self._state.value == State.STARTED, 'server not yet started'
conn = self._Client(self._address, authkey=self._authkey)
...

我们查看__init__方法得知,这个实际上就是一个套接字。特别地,如果机器支持AF_UNIX的话,会使用UNIX Domain Socket来进行同一主机上的IPC,省去了网络协议栈的层层封装等流程。但从中也能看出,实际上BaseManager在设计上是能够跨机器进行通信的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# __init__
self._serializer = serializer # 默认值是pickle
self._Listener, self._Client = listener_client[serializer]
# 全局
listener_client = {
'pickle' : (connection.Listener, connection.Client),
'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
}
# 在connection文件中
default_family = 'AF_INET'
families = ['AF_INET']

if hasattr(socket, 'AF_UNIX'):
default_family = 'AF_UNIX'
families += ['AF_UNIX']

if sys.platform == 'win32':
default_family = 'AF_PIPE'
families += ['AF_PIPE']

在创建好conn之后,就会通过dispatch调用conn.send发送一个对methodname为”create”的调用,并且在参数表头部附上当前的typeid。当这个调用被传到Server类时,会调用Server.create方法,在Server子进程空间中创建一个共享对象,这个将在下一节讲解有关Server部分中提到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 继续是_create函数
...
try:
id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
finally:
conn.close()
return Token(typeid, self._address, id), exposed
# dispatch函数
def dispatch(c, id, methodname, args=(), kwds={}):
'''
Send a message to manager using connection `c` and return response
'''
c.send((id, methodname, args, kwds))
kind, result = c.recv()
if kind == '#RETURN':
return result
raise convert_to_error(kind, result)
def convert_to_error(kind, result):
if kind == '#ERROR':
return result
elif kind == '#TRACEBACK':
assert type(result) is str
return RemoteError(result)
elif kind == '#UNSERIALIZABLE':
assert type(result) is str
return RemoteError('Unserializable message: %s\n' % result)
else:
return ValueError('Unrecognized message type')
class RemoteError(Exception):
def __str__(self):
return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)

接着dispatch函数就调用conn.recv()获取返回值。返回值的kind一共有#RETURN#ERROR#TRACEBACK#UNSERIALIZABLE四种。如果当前的dispatch调用返回正常的话,他应该会拿到一个exposed,包含了共享对象中所能够调用的所有方法名。

在调用dispatch函数之后,_create就返回了一个Token类型和从dispatch拿到的exposedToken的定义如下,被用来唯一确定一个共享对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Token(object):
'''
Type to uniquely indentify a shared object
'''
__slots__ = ('typeid', 'address', 'id')

def __init__(self, typeid, address, id):
(self.typeid, self.address, self.id) = (typeid, address, id)

def __getstate__(self):
return (self.typeid, self.address, self.id)

def __setstate__(self, state):
(self.typeid, self.address, self.id) = state

def __repr__(self):
return 'Token(typeid=%r, address=%r, id=%r)' % \
(self.typeid, self.address, self.id)

proxy

在调用_create通知Server创建完共享对象之后,temp函数就创建一个proxy来代理这个远程的共享对象,这里的proxytype是由register函数中传入的,一般是AutoProxy,我们将在后面详细介绍。

1
2
3
4
proxy = proxytype(
token, self._serializer, manager=self,
authkey=self._authkey, exposed=exp
)

紧接着就是decref,有关引用计数的问题将在Server章节中讨论

1
2
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))

创建共享对象(Server)

上面的内容是有关Client部分如何初始化,下面我们查看Server部分。需要注意的是,虽然是Server/Client结构,但是因为Manager的Server和Client都在一台机器上,并且由同样的代码进行管理,所以在形式上和传统的Socket是不一样的。由于Server是作为一个子进程存在,并且这个子进程和Server本身都是由BaseManager来管理的,并且也需要通过BaseManager提供的方法来操控。所以我们可以看到很多Server部分的工作是在BaseManager而不是Server内部处理的。
根据文档,一旦BaseManager被创建,就需要调用start或者get_server().serve_forever()来确保这个manager是和一个进程关联的。

构造函数

首先来全面查看一下BaseManager的构造函数

1
2
3
4
5
6
7
8
9
def __init__(self, address=None, authkey=None, serializer='pickle'):
if authkey is None:
authkey = current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
self._authkey = AuthenticationString(authkey)
self._state = State()
self._state.value = State.INITIAL
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]

我们查看一下authkey的相关实现,由于其中涉及了不少Process相关的实现,在这里暂时不提

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# process.py
def current_process():
return _current_process

class _MainProcess(Process):
def __init__(self):
self._identity = ()
self._daemonic = False
self._name = 'MainProcess'
self._parent_pid = None
self._popen = None
self._counter = itertools.count(1)
self._children = set()
self._authkey = AuthenticationString(os.urandom(32))
self._tempdir = None

_current_process = _MainProcess()
del _MainProcess

start方法

我们查看start方法,首先创建了一个Process,并通过connection.Pipe和这个子进程进行通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def start(self, initializer=None, initargs=()):
'''
Spawn a server process for this manager object
'''
assert self._state.value == State.INITIAL

if initializer is not None and not hasattr(initializer, '__call__'):
raise TypeError('initializer must be a callable')

# pipe over which we will retrieve address of server
reader, writer = connection.Pipe(duplex=False)

# spawn process which runs a server
self._process = Process(
target=type(self)._run_server,
args=(self._registry, self._address, self._authkey,
self._serializer, writer, initializer, initargs),
)
ident = ':'.join(str(i) for i in self._process._identity)
self._process.name = type(self).__name__ + '-' + ident
self._process.start()
...

  1. self._registry
    这个就是之前在register方法中保存下来的_registry字典
  2. self._address来自于BaseManager__init__
  3. self._authkey来自于BaseManager__init__

下面暂停对start的阅读,而step in查看_run_server这个方法,可以看到在这个方法中会首先执行initializer,并创建一个Server对象_Server。这个initializer来自于start,如果是从__init__.py调用的,那么就是None

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@classmethod
def _run_server(cls, registry, address, authkey, serializer, writer,
initializer=None, initargs=()):
'''
Create a server, report its address and run it
'''
if initializer is not None:
initializer(*initargs)

# create server
server = cls._Server(registry, address, authkey, serializer)

# inform parent process of the server's address
writer.send(server.address)
writer.close()

# run the manager
util.info('manager serving at %r', server.address)
server.serve_forever()

关于Server类,我们稍后来查看,现在我们继续看完start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 继续start方法
...
# get address of server
writer.close()
self._address = reader.recv()
reader.close()

# register a finalizer
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
exitpriority=0
)

额外说明一下,这里的shutdown__exit__中被调用。这里的__enter____exit__被用来实现with

1
2
3
4
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()

实际上,util.Finalize是调用type(self)._finalize_manager,使用type(self)是因为_finalize_manager是一个类方法。一般涉及到Process有关的函数,都不能是带有上下文的,而必须是自由函数。在_finalize_manager中主要就是先创建一个_Client,向Server进程self._process发送shutdown指令,接着尝试join这个进程。如果在0.2秒之后这个进程还在的话,就尝试去terminate它。从Process源码来看,它是始终有terminate这个方法的,不知道为啥还需要hasattr来判断下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
'''
Shutdown the manager process; will be registered as a finalizer
'''
if process.is_alive():
util.info('sending shutdown message to manager')
try:
conn = _Client(address, authkey=authkey)
try:
dispatch(conn, None, 'shutdown')
finally:
conn.close()
except Exception:
pass

process.join(timeout=0.2)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process')
process.terminate()
process.join(timeout=0.1)
if process.is_alive():
util.info('manager still alive after terminate')

state.value = State.SHUTDOWN
try:
del BaseProxy._address_to_local[address]
except KeyError:
pass

Server

先前围绕BaseManager类讨论了Server部分的初始化和析构。现在我们来看Server这个对象,以便了解它具体负责的内容。
首先,照例是构造函数,包含了和套接口API很像的Listener。后面的id_to_obj就是Server所维护的所有共享对象了。id_to_obj是一个dict,它的key是由'%x' % id(obj)来生成的,之所以需要转换成字符串类型是因为xmlrpclib这个库只支持32位的int,所以用str保险一点(当然之前提到,实际用的是pickle)。id_to_refcount则用来标记每个obj的生命周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Server(object):
'''
Server class which runs in a process controlled by a manager object
'''
public = ['shutdown', 'create', 'accept_connection', 'get_methods',
'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']

def __init__(self, registry, address, authkey, serializer):
assert isinstance(authkey, bytes)
self.registry = registry
self.authkey = AuthenticationString(authkey)
Listener, Client = listener_client[serializer]

# do authentication later
self.listener = Listener(address=address, backlog=16)
self.address = self.listener.address

self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
self.mutex = threading.RLock()
self.stop = 0

回忆前面的内容,Client会调用dispatch(conn, None, 'create', (typeid,)+args, kwds)来通知子进程创建一个共享对象。看源码显而易见,肯定最终是调用Server.create这个方法的,但在这之前,会经过一个serve_client方法分发的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def accept_connection(self, c, name):
'''
Spawn a new thread to serve this connection
'''
threading.current_thread().name = name
c.send(('#RETURN', None))
self.serve_client(c)

def serve_client(self, conn):
'''
Handle requests from the proxies in a particular process/thread
'''
util.debug('starting server thread to service %r',
threading.current_thread().name)

recv = conn.recv
send = conn.send
id_to_obj = self.id_to_obj

while not self.stop:

try:
methodname = obj = None
request = recv()
ident, methodname, args, kwds = request
obj, exposed, gettypeid = id_to_obj[ident]

if methodname not in exposed:
raise AttributeError(
'method %r of %r object is not in exposed=%r' %
(methodname, type(obj), exposed)
)

function = getattr(obj, methodname)

try:
res = function(*args, **kwds)
except Exception, e:
msg = ('#ERROR', e)
else:
typeid = gettypeid and gettypeid.get(methodname, None)
if typeid:
rident, rexposed = self.create(conn, typeid, res)
token = Token(typeid, self.address, rident)
msg = ('#PROXY', (rexposed, token))
else:
msg = ('#RETURN', res)

except AttributeError:
...

Proxy

下面我们来看Proxy部分的实现。在SyncManager.register('dict', dict, DictProxy)中用到的DictProxy是由MakeProxyType来生成的,这个函数就是一个创建类的简易的“宏”。它创建名字为name的类,继承自BaseProxy,并且传入一个tuple作为这个子类所拥有的方法。对于每个方法,实际调用BaseProxy里面的_callmethod来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
DictProxy = MakeProxyType('DictProxy', (
'__contains__', '__delitem__', '__getitem__', '__len__',
'__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
))

def MakeProxyType(name, exposed, _cache={}):
'''
Return an proxy type whose methods are given by `exposed`
'''
exposed = tuple(exposed)
try:
return _cache[(name, exposed)]
except KeyError:
pass

dic = {}

for meth in exposed:
# 直接eval`def`语句来创建这个方法
exec '''def %s(self, *args, **kwds):
return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic # exec ... in 用法

# 输入(类名、基类、类内定义的命名空间变量的字典),返回新的类型
ProxyType = type(name, (BaseProxy,), dic)
ProxyType._exposed_ = exposed
_cache[(name, exposed)] = ProxyType
return ProxyType

下面,我们来看看BaseProxy这个类是如何通过_callmethod转发方法的。可以看到,BaseProxy通过self._tls.connection.send()这个方法将需要调用的方法发送出去,看起来这个_tls就是个套接口了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def _callmethod(self, methodname, args=(), kwds={}):
'''
Try to call a method of the referrent and return a copy of the result
'''
try:
conn = self._tls.connection
except AttributeError:
util.debug('thread %r does not own a connection',
threading.current_thread().name)
self._connect()
conn = self._tls.connection

conn.send((self._id, methodname, args, kwds))
kind, result = conn.recv()

if kind == '#RETURN':
return result
elif kind == '#PROXY':
exposed, token = result
proxytype = self._manager._registry[token.typeid][-1]
token.address = self._token.address
proxy = proxytype(
token, self._serializer, manager=self._manager,
authkey=self._authkey, exposed=exposed
)
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
raise convert_to_error(kind, result)

我们查看BaseProxy的初始化部分,看看_tls是何方神圣

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class BaseProxy(object):
'''
A base for proxies of shared objects
'''
_address_to_local = {}
# 能在Fork后正常工作的锁
_mutex = util.ForkAwareThreadLock()

def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True):
BaseProxy._mutex.acquire()
try:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_idset
finally:
BaseProxy._mutex.release()

# self._tls is used to record the connection used by this
# thread to communicate with the manager at token.address
self._tls = tls_idset[0]

# self._idset is used to record the identities of all shared
# objects for which the current process owns references and
# which are in the manager at token.address
self._idset = tls_idset[1]

self._token = token
self._id = self._token.id
self._manager = manager
self._serializer = serializer
self._Client = listener_client[serializer][1]

if authkey is not None:
self._authkey = AuthenticationString(authkey)
elif self._manager is not None:
self._authkey = self._manager._authkey
else:
self._authkey = current_process().authkey

if incref:
self._incref()

util.register_after_fork(self, BaseProxy._after_fork)

补充

Manager的Util部分

看看这个ForkAwareThreadLock实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ForkAwareThreadLock(object):
def __init__(self):
self._reset()
# 在每次fork之后都要reset
register_after_fork(self, ForkAwareThreadLock._reset)

def _reset(self):
self._lock = threading.Lock()
self.acquire = self._lock.acquire
self.release = self._lock.release

def register_after_fork(obj, func):
_afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj

# 弱引用键的映射类。 当不再有对键的强引用时字典中的条目将被丢弃。
_afterfork_registry = weakref.WeakValueDictionary()

def _run_after_forkers():
# 在fork之后执行
items = list(_afterfork_registry.items())
items.sort()
for (index, ident, func), obj in items:
try:
func(obj)
except Exception, e:
info('after forker raised exception %s', e)

这里需要跟踪fork之后锁的原因是The child process is created with a single thread–the one that called fork(),因此可能拥有锁的子线程并不在子进程之中。
可以发现,在fork之后,会执行_run_after_forkers方法,这个方法会遍历出_afterfork_registry中的所含有lock,并且将它们换成新的threading.Lock()

Reference