Python中join不能响应信号的问题

文章subprocess模块用法中介绍了Python中的threading.Thread.join()时不能响应信号的问题。这个问题被Python官方标记为Bug
Python官方的Issue指出这个Bug与Python的signal、基础线程库thread(C实现)和高级线程库threading(Python封装)都有关,下面首先概览这三个模块的实现,接着通过编译调试的方式来观赏这个Bug的具体过程。

一个简单的测试程序

这里提供一个可以在Ubuntu 16.04上的Python2.7上重现的代码片段。主程序调用threading_timeout_test时能够正常响应SIGINT等信号,调用threading_test时不能正常响应信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import sys, os, signal, subprocess, multiprocessing, time
import threading, thread

def on_shutdown(i, j):
print "Bye"
os._exit(0)

signal.signal(signal.SIGINT, on_shutdown)
signal.signal(signal.SIGTERM, on_shutdown)
signal.signal(signal.SIGHUP, on_shutdown)

def inner():
while 1:
time.sleep(1)

def threading_test():
ths = threading.Thread(target=inner)
ths.daemon = True
ths.start()
ths.join()

def threading_timeout_test():
ths = threading.Thread(target=inner)
ths.daemon = True
ths.start()
ths.join(100)

threading_test()

ps一下发现此时进程处于Sl状态。

Python中的signal机制

在signalmodule的开头注释中我们看到Python的信号只能为主线程所设置和捕获,这和POSIX原生signal不同了。在POSIX中,信号是传递给整个进程中的随机线程的。我们偏向于通过设置信号屏蔽字的方式,或者借助sigwait,让一个线程专门等待信号,这样将一个单线程完全异步的逻辑变为了同步的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// signalmodule.c
// 翻译版
/*
Python线程语义:

- 只有主线程可以设置signal handler
- 任何线程可以获得signal handler
- 信号只传送给主线程

我们不支持像SIGFPE之类的同步信号,也不支持使用信号做线程间通讯。
这是因为并不是所有的线程库都支持。

在一些实现中由键盘产生的信号如SIGINT会被分发到所有线程(SGI),
或者按照随机概率(POSIX有中等概率会被发送到主线程)任意线程(Solaris)。
目前的机制需要兼容这三种特性,因此signal handler会忽略掉所有getpid()不等于主线程中的结果的信号。

// 注:我认为CPython以pthread为底层的线程实现不会出现不同线程getpid()不等的情况,
// 可能作者的意思是在其他系统中可能会出现这种情况
*/

我个人理解上面这一段话的意思是Python为了简化在不同OS上的处理,选择为信号处理增加了限制。
Python在signal_signal中负责注册信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// thread_pthread.h
// 这个函数用来获得当前线程的threadid
long
PyThread_get_thread_ident(void)
{
// pthread_t的大小不定,为4或者8
// 注意这个函数对线程切换是敏感的,宜加上volatile
volatile pthread_t threadid;
if (!initialized)
// 我们将在稍后介绍这个函数的作用
PyThread_init_thread();
/* Jump through some hoops for Alpha OSF/1 */
threadid = pthread_self();
#if SIZEOF_PTHREAD_T <= SIZEOF_LONG
return (long) threadid;
#else
return (long) *(long *) &threadid;
#endif
}

// signalmodule.c
static PyObject *
signal_signal(PyObject *self, PyObject *args)
{
PyObject *obj;
int sig_num;
PyObject *old_handler;
void (*func)(int);
// 传进来的tuple有两项,第一项是sig_num,第二项是一个ob_type->tp_name是function,也就是处理函数
if (!PyArg_ParseTuple(args, "iO:signal", &sig_num, &obj))
return NULL;
#ifdef MS_WINDOWS
/* Validate that sig_num is one of the allowable signals */
switch (sig_num) {
case SIGABRT: break;
#ifdef SIGBREAK
/* Issue #10003: SIGBREAK is not documented as permitted, but works and corresponds to CTRL_BREAK_EVENT. */
case SIGBREAK: break;
#endif
case SIGFPE: break;
case SIGILL: break;
case SIGINT: break;
case SIGSEGV: break;
case SIGTERM: break;
default:
PyErr_SetString(PyExc_ValueError, "invalid signal value");
return NULL;
}
#endif
#ifdef WITH_THREAD
// 不能子线程中设置信号
if (PyThread_get_thread_ident() != main_thread) {
PyErr_SetString(PyExc_ValueError, "signal only works in main thread");
return NULL;
}
#endif
if (sig_num < 1 || sig_num >= NSIG) {
PyErr_SetString(PyExc_ValueError, "signal number out of range");
return NULL;
}
if (obj == IgnoreHandler)
func = SIG_IGN;
else if (obj == DefaultHandler)
func = SIG_DFL;
else if (!PyCallable_Check(obj)) {
PyErr_SetString(PyExc_TypeError,
"signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object");
return NULL;
}
else
func = signal_handler;
if (PyOS_setsig(sig_num, func) == SIG_ERR) {
PyErr_SetFromErrno(PyExc_RuntimeError);
return NULL;
}
old_handler = Handlers[sig_num].func;
// 清空触发位
// 当信号发生时,在`trip_signal`中会将tripped设为1
Handlers[sig_num].tripped = 0;
Py_INCREF(obj);
// 设置信号处理函数
Handlers[sig_num].func = obj;
if (old_handler != NULL)
return old_handler;
else
Py_RETURN_NONE;
}

Python在signal_handler中负责处理信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// signalmodule.c
// 这个函数将在稍后详细讨论
static void
trip_signal(int sig_num)
{
Handlers[sig_num].tripped = 1;
...
Py_AddPendingCall(checksignals_witharg, NULL);
if (wakeup_fd != -1)
write(wakeup_fd, "\0", 1);
}

static void
signal_handler(int sig_num)
{
int save_errno = errno;

#if defined(WITH_THREAD) && defined(WITH_PTH)
if (PyThread_get_thread_ident() != main_thread) {
pth_raise(*(pth_t *) main_thread, sig_num);
}
else
#endif
{
#ifdef WITH_THREAD
/* See NOTES section above */
if (getpid() == main_pid)
#endif
{
printf("Received POSIX signal in thread %x\n", PyThread_get_thread_ident());
trip_signal(sig_num);
}

#ifndef HAVE_SIGACTION
#ifdef SIGCHLD
/* To avoid infinite recursion, this signal remains
reset until explicit re-instated.
Don't clear the 'func' field as it is our pointer
to the Python handler... */
if (sig_num != SIGCHLD)
#endif
/* If the handler was not set up with sigaction, reinstall it. See
* Python/pythonrun.c for the implementation of PyOS_setsig which
* makes this true. See also issue8354. */
PyOS_setsig(sig_num, signal_handler);
#endif
}

/* Issue #10311: asynchronously executing signal handlers should not
mutate errno under the feet of unsuspecting C code. */
errno = save_errno;
}

出于方便说明的考虑,我们将在后面查看Py_AddPendingCall的定义。

Python的threading模块

threading.Thread.join()方法

Python的低级线程模块thread并没有提供join的原语,threading.Thread.join()在Python层面进行了封装实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// threading.py
def Condition(*args, **kwargs):
return _Condition(*args, **kwargs)

class Thread(_Verbose):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
...
# 意料之中的CV,稍后我们来看看具体实现
# 注意threading中还提供了一个递归锁RLock,我们这里用不到
self.__block = Condition(Lock())

def join(self, timeout=None):
"""Wait until the thread terminates.
翻译版

join函数会阻塞直到被join的线程正常终结,因未处理的异常终结,或者`timeout`超时退出

可选参数`timeout`可以是一个浮点数,用来表示超时时间。因为`join()`方法一直返回None,
所以需要在join返回后调用`isALive`来判断究竟是发生了超时(此时线程还活着),还是线程已经终结

同一个线程可以被join很多次

join自己会抛出RuntimeError,否则导致死锁
在线程初始化或者启动前join会抛出RuntimeError
"""
if not self.__initialized:
raise RuntimeError("Thread.__init__() not called")
if not self.__started.is_set():
raise RuntimeError("cannot join thread before it is started")
// join自己会导致死锁
if self is current_thread():
raise RuntimeError("cannot join current thread")

if __debug__:
if not self.__stopped:
self._note("%s.join(): waiting until thread stops", self)
# 锁住CV对应的锁,后面看到实际是个Lock = _allocate_lock
self.__block.acquire()
try:
if timeout is None:
# 和POSIX编程一样,这里同样要放在while循环里面
while not self.__stopped:
# 等待条件变量
self.__block.wait()
if __debug__:
self._note("%s.join(): thread stopped", self)
else:
deadline = _time() + timeout
while not self.__stopped:
delay = deadline - _time()
if delay <= 0:
if __debug__:
self._note("%s.join(): timed out", self)
break
self.__block.wait(delay)
else:
if __debug__:
self._note("%s.join(): thread stopped", self)
finally:
self.__block.release()

我们注意if timeout is None:这个条件,两个分支的代码相差无几,但在我的Ubuntu 16.04.4上的Python2.7.12中分别使用或不使用timeout进行join,却一个不能响应SIG,一个可以。显而易见,原因在self.__block.wait()这个方法中。而self.__block.wait()实际上在一个条件变量上进行等待。

Condition条件变量

从上面的代码我们能看到self.__block实际上是个条件变量_Condition,这个_Condition实现地比较简陋,它并不会像C++中的std::condition_variable一样隔段时间解锁看看条件是否满足,事实上它根本就不接受一个condition参数,共享同一个_Condition的线程通过wait这个_Condition上的事件,或者notify系列向这个_Condition通告这个事件。此外,_Condition将互斥锁也整合了进去,我们不要在外面挂一个mutex之类的东西了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# threading.py
_allocate_lock = thread.allocate_lock
# 互斥锁
Lock = _allocate_lock
# 递归锁(略)
def RLock(*args, **kwargs):
"""Factory function that returns a new reentrant lock.

A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it again
without blocking; the thread must release it once for each time it has
acquired it.

"""
return _RLock(*args, **kwargs)

class _Condition(_Verbose):
def __init__(self, lock=None, verbose=None):
_Verbose.__init__(self, verbose)
if lock is None:
lock = RLock()
self.__lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self.__waiters = []

def _release_save(self):
self.__lock.release() # No state to save

def _acquire_restore(self, x):
self.__lock.acquire() # Ignore saved state

def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if __lock doesn't have _is_owned().
if self.__lock.acquire(0):
self.__lock.release()
return False
else:
return True

def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.

如果调用的线程没有取得锁将抛出`RuntimeError`错误。

这个方法释放锁`__lock`,然后阻塞直到同一个CV被另一个线程中的`notify()`或`notifyAll()`通知,
或者发生超时。一旦发生以上情况,它会重新获得锁并返回。

`timeout`参数是一个表示秒数的浮点。

当锁是递归锁`RLock`,release方法并不会直接解锁它,相应地一个内部的接口被使用,
从而保证它需要调用和acquire相同的重数才能被解锁。
"""
# 必须持有锁才能操作共享变量
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# 实际调用thread模块对应实现中的`thread_PyThread_allocate_lock`函数,将在稍后看到
waiter = _allocate_lock()
# 实际调用thread模块对应实现中的`lock_PyThread_acquire_lock`函数
# 先acquire一下waiter
waiter.acquire()
# 将代表自己的锁`waiter`加入`self.__waiters`队列中,
self.__waiters.append(waiter)
# 释放掉持有的lock
saved_state = self._release_save() # 暂时不明白为啥要有个返回值
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
# 再acquire一下waiter,这下阻塞了,原来把它当信号量用
# 等signal释放掉一个锁
waiter.acquire()
if __debug__:
self._note("%s.wait(): got it", self)
else:
# 翻译
# 这里使用一个busy loop轮询是划不来的,所以我们得sleep,
# 不过如果睡一个较长的时间,程序的响应性会变差。
# 这个机制sleep interval的区间在[0.0005, 0.05]之间,每次按照2的幂增长。
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
# 这里的0是一个waitflag,表示我们需不需要等待直到获得锁,
# 因为我们现在有timeout,所以不能直接阻塞,而是不停地睡觉直到条件达成或者超时
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)
try:
self.__waiters.remove(waiter)
except ValueError:
pass
else:
if __debug__:
self._note("%s.wait(%s): got it", self, timeout)
finally:
self._acquire_restore(saved_state)

def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.

如果调用的线程没有取得锁将抛出`RuntimeError`错误。

这个方法唤醒CV上的至多n个线程,当没有线程在等待时它相当于nop函数
"""
# 必须持有锁才能操作共享变量
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
__waiters = self.__waiters
# 我们取出前n个进行通知
waiters = __waiters[:n]
if not waiters:
if __debug__:
self._note("%s.notify(): no waiters", self)
return
self._note("%s.notify(): notifying %d waiter%s", self, n,
n!=1 and "s" or "")
for waiter in waiters:
waiter.release()
try:
__waiters.remove(waiter)
except ValueError:
pass

从上面的代码我们可以看到一旦调用waiter.acquire(),程序就不能响应信号了,我们接下来到thread模块中看这个函数的实现。

thread模块

Pythread锁

承接上文,我们看到thread._allocate_lock实际调用了thread_PyThread_allocate_lock()创建了一个lockobject对象。lockobject对象中包含了一个lock_lock,这个实际上是一个PyThread_type_lock对象,由PyThread_allocate_lock()创建,也就是锁基于操作系统API的具体实现,我们将在稍后看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// pythread.h
// 注意PyThread_type_lock中的实际对象根据具体实现而不同
typedef void *PyThread_type_lock;
typedef void *PyThread_type_sema;

PyAPI_FUNC(PyThread_type_lock) PyThread_allocate_lock(void);
PyAPI_FUNC(void) PyThread_free_lock(PyThread_type_lock);
PyAPI_FUNC(int) PyThread_acquire_lock(PyThread_type_lock, int);
#define WAIT_LOCK 1
#define NOWAIT_LOCK 0
PyAPI_FUNC(void) PyThread_release_lock(PyThread_type_lock);

// threadmodule.c
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS,
start_new_doc},
{"start_new", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS,
start_new_doc},
{"allocate_lock", (PyCFunction)thread_PyThread_allocate_lock,
METH_NOARGS, allocate_doc},
...
{NULL, NULL} /* sentinel */
};


static PyObject *
thread_PyThread_allocate_lock(PyObject *self)
{
return (PyObject *) newlockobject();
}

typedef struct {
// Python对象的通用头,包含了引用计数等信息
PyObject_HEAD
PyThread_type_lock lock_lock;
PyObject *in_weakreflist;
} lockobject;

static lockobject *
newlockobject(void)
{
lockobject *self;
self = PyObject_New(lockobject, &Locktype);
if (self == NULL)
return NULL;
self->lock_lock = PyThread_allocate_lock();
self->in_weakreflist = NULL;
if (self->lock_lock == NULL) {
Py_DECREF(self);
PyErr_SetString(ThreadError, "can't allocate lock");
return NULL;
}
return self;
}

PyThread_allocate_lock系列函数的实现因系统而异。以Linux为例有两种方式,分别基于sem_t和基于pthread_mutex/pthread_cond的。
我们首先查看基于信号量的机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// thread_pthread.h
PyThread_type_lock
PyThread_allocate_lock(void)
{
sem_t *lock;
int status, error = 0;

dprintf(("PyThread_allocate_lock called\n"));
if (!initialized)
PyThread_init_thread();

lock = (sem_t *)malloc(sizeof(sem_t));

if (lock) {
// 信号量只在线程间共享
status = sem_init(lock, 0, 1);
CHECK_STATUS("sem_init");

if (error) {
free((void *)lock);
lock = NULL;
}
}

dprintf(("PyThread_allocate_lock() -> %p\n", lock));
return (PyThread_type_lock)lock;
}

void
PyThread_free_lock(PyThread_type_lock lock)
{
sem_t *thelock = (sem_t *)lock;
int status, error = 0;

// 这种用法第一是禁止WARNING,第二是用来做跳转,如`(void *)0;`
(void) error;
dprintf(("PyThread_free_lock(%p) called\n", lock));

if (!thelock)
return;

status = sem_destroy(thelock);
CHECK_STATUS("sem_destroy");

free((void *)thelock);
}

int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
int success;
sem_t *thelock = (sem_t *)lock;
int status, error = 0;

(void) error; /* silence unused-but-set-variable warning */
dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));

do {
if (waitflag)
status = fix_status(sem_wait(thelock));
else
status = fix_status(sem_trywait(thelock));
} while (status == EINTR); /* Retry if interrupted by a signal */

if (waitflag) {
CHECK_STATUS("sem_wait");
} else if (status != EAGAIN) {
CHECK_STATUS("sem_trywait");
}

success = (status == 0) ? 1 : 0;

dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
return success;
}

void
PyThread_release_lock(PyThread_type_lock lock)
{
sem_t *thelock = (sem_t *)lock;
int status, error = 0;

(void) error; /* silence unused-but-set-variable warning */
dprintf(("PyThread_release_lock(%p) called\n", lock));

status = sem_post(thelock);
CHECK_STATUS("sem_post");
}

另一种方式是使用mutex和CV的经典实现,由于实际上没有用到,所以单独讨论。

观赏Bug形成过程

在先前的讨论中我们已经确定了问题的所在是waiter.acquire()这个方法,对应到CPython内部就是PyThread_allocate_lock这个函数。PyThread_allocate_lock函数根据宏的不同选项有两种实现方式,在我的Ubuntu上提供了sem_t,所以默认使用sem_t的实现。我们跟踪这个PyThread_allocate_lock,发现这个函数能够正常加解锁,但是发送SIGINT信号却不能打断程序。

那么究竟是Python直接屏蔽了Native POSIX signal,还是出于其他的原因?为此重新编译了Python 2.7.6并进行按照下图打了Log进行调试。

得到结果如下图。

注意其中的^CSIG 2,这是我在signalmodule中的signal_handler函数的开头设置打印语句,此时Ctrl+C能够输出SIG 2,并且通过了if (getpid() == main_pid)的检测,到达了trip_signal。我们在这个函数中输出了Add pending 2 callback,和我们之前注册的时候相同。接着trip_signal调用了Py_AddPendingCall

我们接下来查看Py_AddPendingCall的代码,他在ceval.c里面,这个文件也是Python的main loop的所在地。
经过调试,Py_AddPendingCall中记录了这个信号已经被成功加到了pendingcalls[0]。截止目前已发现Python在POSIX层面是收到了SIG 2,并且挂载下半部程序。因此可以初步断定异常原因是信号处理下半部并没有能够被运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// ceval.c
// 这就是CPython臭名昭著的全局解释器锁GIL,
// GIL保证了同时只有一个线程在解释器中运行
static PyThread_type_lock interpreter_lock = 0; /* This is the GIL */
static PyThread_type_lock pending_lock = 0; /* for pending calls */

#define NPENDINGCALLS 32
static struct {
int (*func)(void *);
void *arg;
} pendingcalls[NPENDINGCALLS];
static int pendingfirst = 0;
static int pendinglast = 0;
static volatile int pendingcalls_to_do = 1; /* trigger initialization of lock */
static char pendingbusy = 0;

int
Py_AddPendingCall(int (*func)(void *), void *arg)
{
int i, j, result=0;
PyThread_type_lock lock = pending_lock;

/* try a few times for the lock. Since this mechanism is used
* for signal handling (on the main thread), there is a (slim)
* chance that a signal is delivered on the same thread while we
* hold the lock during the Py_MakePendingCalls() function.
* This avoids a deadlock in that case.
* Note that signals can be delivered on any thread. In particular,
* on Windows, a SIGINT is delivered on a system-created worker
* thread.
* We also check for lock being NULL, in the unlikely case that
* this function is called before any bytecode evaluation takes place.
*/
/* 翻译
* 自旋100次尝试请求锁。因为这个机制被用来做信号处理(限于主线程),
* 因此有较小的几率一个信号在同一个线程在`Py_MakePendingCalls()`函数中持有锁时被通告。
* 这种情况下使用多次尝试能够在以极大概率获取锁时避免死锁。
* 注意信号(这里应该是原生的信号)可以被通告到任意线程。
* 特别地,Windows上的SIGINT会被通告到system-created worker thread上。
* 我们同时检查`lock`是否是NULL,因为可能有较小的概率这个函数在(初始化)之前被调用
*/
if (lock != NULL) {
for (i = 0; i<100; i++) {
if (PyThread_acquire_lock(lock, NOWAIT_LOCK))
break;
}
if (i == 100)
return -1;
}

// 将当前请求入队
i = pendinglast;
j = (i + 1) % NPENDINGCALLS;
printf("Insert into pendingcalls[] at %d\n", i);
if (j == pendingfirst) {
result = -1; /* Queue full */
} else {
pendingcalls[i].func = func;
pendingcalls[i].arg = arg;
pendinglast = j;
}
/* signal main loop */
_Py_Ticker = 0;
pendingcalls_to_do = 1;
if (lock != NULL)
PyThread_release_lock(lock);
return result;
}

int
Py_MakePendingCalls(void)
{
int i;
int r = 0;

if (!pending_lock) {
/* initial allocation of the lock */
pending_lock = PyThread_allocate_lock();
if (pending_lock == NULL)
return -1;
}

printf("Py_MakePendingCalls: before checking main thread\n");
/* only service pending calls on main thread */
if (main_thread && PyThread_get_thread_ident() != main_thread)
return 0;
/* don't perform recursive pending calls */
if (pendingbusy)
return 0;
printf("Py_MakePendingCalls: before loop pendingcalls[]\n");
pendingbusy = 1;
/* perform a bounded number of calls, in case of recursion */
for (i=0; i<NPENDINGCALLS; i++) {
int j;
int (*func)(void *);
void *arg = NULL;

/* pop one item off the queue while holding the lock */
PyThread_acquire_lock(pending_lock, WAIT_LOCK);
j = pendingfirst;
if (j == pendinglast) {
func = NULL; /* Queue empty */
} else {
func = pendingcalls[j].func;
arg = pendingcalls[j].arg;
pendingfirst = (j + 1) % NPENDINGCALLS;
}
pendingcalls_to_do = pendingfirst != pendinglast;
PyThread_release_lock(pending_lock);
/* having released the lock, perform the callback */
if (func == NULL)
break;
// 这里的func实际上就是`checksignals_witharg`这个函数
printf("Py_MakePendingCalls: before call pendingcalls[%d] = %d\n", j, (int)pendingcalls[j].func);
r = func(arg);
printf("Py_MakePendingCalls: after call pendingcalls[%d] = %d\n", j, (int)pendingcalls[j].func);
if (r)
break;
}
pendingbusy = 0;
return r;
}

使用Py_AddPendingCall加入队列pendingcalls中的信号将会在Py_MakePendingCalls中被真正处理,这有点类似Linux中断下半部的机制。这里的func实际上是trip_signal调用Py_AddPendingCall的第一个参数checksignals_witharg,也是一个函数。checksignals_witharg这个函数很短,只调用了PyErr_CheckSignals这个函数。我们下面查看具体代码,需要注意Handlers[i].funcpendingcalls[j].func不一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// signalmodule.c
static void
trip_signal(int sig_num)
{
Handlers[sig_num].tripped = 1;
if (is_tripped)
return;
/* Set is_tripped after setting .tripped, as it gets
cleared in PyErr_CheckSignals() before .tripped. */
is_tripped = 1;
Py_AddPendingCall(checksignals_witharg, NULL);
if (wakeup_fd != -1)
write(wakeup_fd, "\0", 1);
}

int
PyErr_CheckSignals(void)
{
int i;
PyObject *f;

if (!is_tripped)
return 0;

#ifdef WITH_THREAD
if (PyThread_get_thread_ident() != main_thread)
return 0;
#endif

/*
* The is_tripped variable is meant to speed up the calls to
* PyErr_CheckSignals (both directly or via pending calls) when no
* signal has arrived. This variable is set to 1 when a signal arrives
* and it is set to 0 here, when we know some signals arrived. This way
* we can run the registered handlers with no signals blocked.
*
* NOTE: with this approach we can have a situation where is_tripped is
* 1 but we have no more signals to handle (Handlers[i].tripped
* is 0 for every signal i). This won't do us any harm (except
* we're gonna spent some cycles for nothing). This happens when
* we receive a signal i after we zero is_tripped and before we
* check Handlers[i].tripped.
*/
/* 翻译
* `is_tripped`被在没有信号到达时用来加速`PyErr_CheckSignals`的处理。
* 当信号到达时`is_tripped`会被设为1,然后在这里被清零。
* 注意,这个策略有一个特殊情况,当`is_tripped`时1但我们实际没有信号可以处理,
* 也就是所有的`Handlers[i].tripped`都是0。这个最多只会浪费几次check而已。
* 这种特殊情况发生在我们清零完`is_tripped`之后,检查`Handlers[i].tripped`之前。
*/
is_tripped = 0;

if (!(f = (PyObject *)PyEval_GetFrame()))
f = Py_None;

for (i = 1; i < NSIG; i++) {
if (Handlers[i].tripped) {
PyObject *result = NULL;
PyObject *arglist = Py_BuildValue("(iO)", i, f);
Handlers[i].tripped = 0;

if (arglist) {
// 可以发现实际调用了`PyEval_CallObject`来执行`Handlers[i].func`。
printf("PyErr_CheckSignals called %d\n", (int)Handlers[i].func);
result = PyEval_CallObject(Handlers[i].func, arglist);
Py_DECREF(arglist);
}
if (!result)
return -1;

Py_DECREF(result);
}
}

return 0;
}

刚才我们已经知道Python使用Py_MakePendingCalls往下的调用链Py_MakePendingCalls -> checksignals_witharg(即pendingcalls[j].func) -> PyErr_CheckSignals -> Handlers[i].func,而Py_MakePendingCalls这个函数在当前栈帧的主循环PyEval_EvalFrameEx中被以一定的Tick的时间间隔被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/* for manipulating the thread switch and periodic "stuff" - used to be
per thread, now just a pair o' globals */
int _Py_CheckInterval = 100;
volatile int _Py_Ticker = 0;

PyObject *
PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
...
for (;;) {
// 这个循环里面不停地根据取指而执行
...
// 这里`_Py_Ticker`实际上就是一个倒计时器,从初始值`_Py_CheckInterval`往下递减,
// 当归零时进入下面的分支,进行一次check
if (--_Py_Ticker < 0) {
if (*next_instr == SETUP_FINALLY) {
/* Make the last opcode before a try: finally: block uninterruptible. */
goto fast_next_opcode;
}
// 还原计数器
_Py_Ticker = _Py_CheckInterval;
tstate->tick_counter++;
#ifdef WITH_TSC
ticked = 1;
#endif
// 如果有待处理的signal
if (pendingcalls_to_do) {
// 调用`Py_MakePendingCalls`执行下半部
if (Py_MakePendingCalls() < 0) {
why = WHY_EXCEPTION;
goto on_error;
}
if (pendingcalls_to_do)
// 翻译
// 这个说明`Py_MakePendingCalls`执行没有成功,
// 将_Py_Ticker设为0,这样可以进行线程切换,
// 从而让其他线程可以重新尝试一次,
// 然后就可能成功
printf("Py_MakePendingCalls Failed, current thread %x main thread %x \n",
PyThread_get_thread_ident(), main_thread);
_Py_Ticker = 0;
}
#ifdef WITH_THREAD
// 下面的过程实现了线程切换。
if (interpreter_lock) {
if (PyThreadState_Swap(NULL) != tstate)
Py_FatalError("ceval: tstate mix-up");
printf("Thread %x yield \n", PyThread_get_thread_ident());
// 假如当前线程持有GIL,就释放
PyThread_release_lock(interpreter_lock);

// 此时其他线程可以竞争这个GIL

// 本线程重新获取GIL
PyThread_acquire_lock(interpreter_lock, 1);
printf("Thread %x resume \n", PyThread_get_thread_ident());
if (PyThreadState_Swap(tstate) != NULL)
Py_FatalError("ceval: orphan tstate");

/* Check for thread interrupts */

if (tstate->async_exc != NULL) {
x = tstate->async_exc;
tstate->async_exc = NULL;
PyErr_SetNone(x);
Py_DECREF(x);
why = WHY_EXCEPTION;
goto on_error;
}
}
#endif
}
fast_next_opcode:
...

下面的图来自dabeaz,形象地展示了上面源码所描述的Python线程切换的过程。

现在我们发现一个问题,在上面的代码中Py_MakePendingCalls Failed输出了,这意味着我们的信号处理函数没有成功。为什么没有成功呢?我们回看Py_MakePendingCalls的Log输出在Py_MakePendingCalls: before checking main thread戛然而止,说明此时的线程并不是主线程!我们进一步查看线程调度情况,发现在主线程调用join()之后就一直睡眠了,其中唯一一次唤醒就是收到了SIGINT,主线程将这个信号放到pendingcalls之后又回去睡觉了,之后虽然子线程屡次调用Py_MakePendingCalls检查到了有待处理的信号,但由于自己不是主线程所以也是爱莫能助。
下面我们着手解决这个问题,一个简单的方法就是让子线程也可以处理由主线程添加到pendingcalls中的信号,于是我们对代码中进行两处修改:

  1. 注释掉PyErr_CheckSignals中的if (PyThread_get_thread_ident() != main_thread)
  2. 注释掉Py_MakePendingCalls中的if (main_thread && PyThread_get_thread_ident() != main_thread)

再次编译调试,发现可以正常退出了

杂项函数的实现

线程创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// thread.c
void
PyThread_init_thread(void)
{
#ifdef Py_DEBUG
char *p = Py_GETENV("PYTHONTHREADDEBUG");
if (p) {
if (*p)
thread_debug = atoi(p);
else
thread_debug = 1;
}
#endif /* Py_DEBUG */
if (initialized)
return;
initialized = 1;
dprintf(("PyThread_init_thread called\n"));
PyThread__init_thread();
}

static void
PyThread__init_thread(void)
{
/* DO AN INIT BY STARTING THE THREAD */
static int dummy = 0;
pthread_t thread1;
pthread_create(&thread1, NULL, (void *) _noop, &dummy);
pthread_join(thread1, NULL);
}

// 这是创建线程的主入口
long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
pthread_t th;
int status;
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_t attrs;
#endif
#if defined(THREAD_STACK_SIZE)
size_t tss;
#endif

dprintf(("PyThread_start_new_thread called\n"));
if (!initialized)
PyThread_init_thread();

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
if (pthread_attr_init(&attrs) != 0)
return -1;
#endif
#if defined(THREAD_STACK_SIZE)
tss = (_pythread_stacksize != 0) ? _pythread_stacksize
: THREAD_STACK_SIZE;
if (tss != 0) {
if (pthread_attr_setstacksize(&attrs, tss) != 0) {
pthread_attr_destroy(&attrs);
return -1;
}
}
#endif
#if defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);
#endif

status = pthread_create(&th,
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
&attrs,
#else
(pthread_attr_t*)NULL,
#endif
(void* (*)(void *))func,
(void *)arg
);

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
pthread_attr_destroy(&attrs);
#endif
if (status != 0)
return -1;

// 这里直接detach了,join是Python自己实现的wait
pthread_detach(th);

#if SIZEOF_PTHREAD_T <= SIZEOF_LONG
return (long) th;
#else
return (long) *(long *) &th;
#endif
}

线程状态转移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// pystate.c
PyThreadState *
PyThreadState_Swap(PyThreadState *newts)
{
PyThreadState *oldts = _PyThreadState_Current;

_PyThreadState_Current = newts;
/* It should not be possible for more than one thread state
to be used for a thread. Check this the best we can in debug
builds.
*/
#if defined(Py_DEBUG) && defined(WITH_THREAD)
if (newts) {
/* This can be called from PyEval_RestoreThread(). Similar
to it, we need to ensure errno doesn't change.
*/
int err = errno;
PyThreadState *check = PyGILState_GetThisThreadState();
if (check && check->interp == newts->interp && check != newts)
Py_FatalError("Invalid thread state for this thread");
errno = err;
}
#endif
return oldts;
}

CV+Mutex实现锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// thread_pthread.h
typedef struct {
char locked; /* 0=unlocked, 1=locked */
/* a <cond, mutex> pair to handle an acquire of a locked lock */
pthread_cond_t lock_released;
pthread_mutex_t mut;
} pthread_lock;

int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
int success;
pthread_lock *thelock = (pthread_lock *)lock;
int status, error = 0;

dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));

status = pthread_mutex_lock( &thelock->mut );
CHECK_STATUS("pthread_mutex_lock[1]");
success = thelock->locked == 0;

if ( !success && waitflag ) {
/* continue trying until we get the lock */

/* mut must be locked by me -- part of the condition protocol */
while ( thelock->locked ) {
status = pthread_cond_wait(&thelock->lock_released,
&thelock->mut);
CHECK_STATUS("pthread_cond_wait");
}
success = 1;
}
if (success) thelock->locked = 1;
status = pthread_mutex_unlock( &thelock->mut );
CHECK_STATUS("pthread_mutex_unlock[1]");

if (error) success = 0;
dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
return success;
}

void
PyThread_release_lock(PyThread_type_lock lock)
{
pthread_lock *thelock = (pthread_lock *)lock;
int status, error = 0;

(void) error; /* silence unused-but-set-variable warning */
dprintf(("PyThread_release_lock(%p) called\n", lock));

status = pthread_mutex_lock( &thelock->mut );
CHECK_STATUS("pthread_mutex_lock[3]");

thelock->locked = 0;

status = pthread_mutex_unlock( &thelock->mut );
CHECK_STATUS("pthread_mutex_unlock[3]");

/* wake up someone (anyone, if any) waiting on the lock */
// 在退出临界区时signal
status = pthread_cond_signal( &thelock->lock_released );
CHECK_STATUS("pthread_cond_signal");
}