并发编程重要概念及比较

在本篇中比较了各种并发、并行技术。
并发(concurrency)强调的是逻辑上的同时发生,是语义上的模型(semantic model),在实际上并发程序可能是由一个处理器同时(simultaneously)处理多个任务,因此并发过程中常出现一个线程等待其他资源的情况。此时常伴随着线程的阻塞和调度。并发过程通常可划分为多个级别:Blocking、Obstruction-Free、Lock-Free、Wait-Free,其中后三种统称为Non-blocking的。
并行(parallelism)属于并发,是运行期的行为(runtime behavior),并行强调这两个并发事件实际上也是同时发生的,例如在多个处理器上运行的多个任务。但我们不能讲这两个概念绝对化,例如在处理器层面,流水线绝对是并发的,但在操作系统之上提供的机制来说,却体现出顺序的特性。
【未完待续】

处理器层面的并发

这里简单地论述处理器层面的并发相关,例如SMP架构、流水线、分支预测、多级缓存等技术。

流水线(pipeline)

流水线和吞吐量

我们通常使用GOPS(每秒千兆次操作)来定义吞吐量,表示每秒内执行操作的次数。CPU流水线的目的是为了提高吞吐量(throughput),但会增加每条指令的延迟(latency),这是因为执行一条指令需要经过更多的流水线寄存器。CSAPP使用一个称为SEQ的架构来描述流水线的通用模型,一条指令在CPU中被执行会经过取指(fetch)、译码(decode)、执行(execute)、访存(memory)、写回(write back)、更新PC(在SEQ+中和取指进行了合并)等阶段。容易想到在某一时钟周期内,每一个阶段都可以独立运行。例如当指令PC在执行阶段时,我们可以对指令PC+1进行译码,这样译码器就不会闲置,这就是流水线化的一个简单思路。这种流水线设计很常见,以486为例,其拥有两条5级流水线取指F->译码D1->转址D2->执行EX->写回WB
出于两点的考虑,流水线技术会制约了吞吐量能提高的上限:

  1. 流水线不同阶段的延迟是不同
    如下图所示,流水线的吞吐量会受到其中最长操作的局限。
  2. 一个较深的流水线可能带来很大的流水线寄存器延迟
    这是来自于增加的流水线寄存器所带来的额外开销。

流水线和冒险

控制相关和顺序(数据)相关是使用流水线并发执行一些指令需要面临的问题。其中顺序相关指后一条指令的执行依赖于前一条指令的值,控制相关指的是指令流的路径依赖于前一条指令(通常是条件测试)的结果。为了解决这样的问题,CSAPP首先升级了原先的SEQ到SEQ+,现在我们将更新PC移到最前面,并且和取指进行了合并,这样做的目的是使得我们在程序开始时通过上面一条指令结束时的状态来确定地址。CSAPP基于SEQ+引入了PIPE-这个流水线,并希望PIPE-能够实现每个时钟周期发射(issue)一条指令,因此我们需要在取出一条指令后马上确定下一条指令的位置。这对于ret和条件转移来说是较为麻烦的,因为我们要进行分支预测。分支预测有一些策略,包括总是选择(分支),从不选择(NT)。一种正向不选择(BTFNT)策略只接受跳往地址更低的分支(也就是往前跳,后向分支),因为它可能标志着一个循环的右大括号。现在的分支预测器通常分为静态规则和动态规则的,动态规则会基于一些运行期的历史进行类似“强化学习”,例如如果一段分支历史上不进行跳转的成功率大,那么就这一次预测的时候就不进行跳转,知乎专栏上给出了一个详细的例子来验证这个特性,这个实验中验证了对一个有序数组遍历时其耗时约为无序数组的1/3。分支预测失败,也就是所谓的控制冒险(hazard),它的代价是就会导致流水线刷新(flush)。此外当一条指令后面指令会到的那些程序状态时就会带来数据冒险,特别地我们可以发现控制冒险可以看做是对程序计数器PC而言的数据冒险。我们考虑整个流水线中可能出现的部件,其中通用寄存器和PC已经被证明是存在冒险的,以通用寄存器为例,其写和读发生在流水线上不同的阶段(写回和解码)。剩下来的是EFLAGS以及存储器则是不存在冒险的,以存储器为例,其读写发生在流水线的访存阶段,那么前一条指令对存储器的对后一条指令对存储器的总是可见的。
CPU流水线中通过暂停(stalling)和转发(forward)/旁路(bypassing)的机制解决数据冒险的问题。此外在汇编层面,我们可以使用条件传送而不是条件控制转移来避免分支预测出错的问题。

暂停

暂停的思路很简单,就是阻塞流水线中的一些指令直到冒险条件不满足。对通用寄存器而言,在解码阶段时流水线会检查前方执行、访存和写回阶段中是否有指令会更新该通用寄存器。如果存在那么就会阻塞处于解码阶段的指令(包括后面即将执行的下一条指令的取指阶段,也就是保持PC的值)。在阻塞时,流水线的一部分会进入空转状态,此时我们成为插入一个气泡(bubble)。如下图所示,在时刻5,原本应该执行的0x00d处的D被阻塞了,因为0x006处的W尚未完成。因此在等待其完成的时间中插入了三个气泡(分别对应EMW阶段)。

转发

上面的暂停机制非常直白和简单,但考虑到前后连续两条指令对同一个寄存器先写后读是很通常的情况,此时三个气泡是很划不来的。此时可以借助转发来避免暂停。转发机制指将结果从流水线的较晚阶段发送到流水线的较早阶段的过程,通常是从写回阶段W转发到译码D阶段作为操作数之一。我们查看下面的图,在第6周期上,0x00e的解码逻辑发现0x006上所在的写回阶段有对%eax的写,而自己恰恰要访问这个寄存器,所以与其这样不如直接拿过来用了。这样相当于只插入了两个气泡。

更厉害的是我们甚至可以在访存阶段就转发到解码阶段。我们查看下面的图,在第5周期上位于解码阶段的0x00d发现0x0060x000分别处于访存和写回阶段,这里写回阶段如上所示,而访存阶段我们发现我们正在读入数据到%eax。因此我们可以直接将这个对%eax的写也转发过去。现在我们相当于只插入了一个气泡。

能不能一个都不插入呢?也是可以的,我们从执行阶段就转发到解码阶段。我们查看下面的图,在第4周期上位于解码阶段的0x00c发现0x0060x000分别处于执行和访存阶段,这里访存阶段如上所示,而在执行阶段我们发现我们正在计算一个立即数,这个立即数将在稍后写入%eax

加载互锁

转发能够解决相当多的数据冒险,但有一类加载/使用冒险难以被解决,这是因为加载(从存储器读)存在于访存阶段,而如果下一条指令就要使用的话,那么就会“赶不上趟”。

现代处理器

在现代的处理器中,流水线被拆分地更加细,出现了所谓的12级、31级乃至更深的流水线。但是这么深的流水线阻塞的代价是非常巨大的。为此Intel使用了乱序执行组件(Out-of-Order core)。

流水线机制对程序优化的启示

流水线机制对我们程序性能优化的启示主要有下面几点:

  1. 减少连续指令的相关性
    这一点是针对数据冒险而言的。
  2. 进行循环展开
    这一点是针对控制冒险而言的。循环处必然出现分支,这就带来了可能的分支预测失败的成本。因此我们展开循环能够减少这样的跳转次数。

CPU的缓存和缓存一致性

x86往往都有高速缓存Cache,而且有多级。高速缓存基于静态RAM(SRAM)技术,区别于主存的动态RAM(DRAM)技术。现代CPU中寄存器与内存之间没有直接的渠道,而必须通过多级的高速缓存才能到内存。高速缓存的作用依然是为了弥补CPU和内存在速度上的差异,高速缓存提高效率的原理是基于时间局部性和空间局部性,我们稍后将详细讨论这个概念。虽然高速缓存对用户来说是透明的,我们的代码要不直接操作寄存器,要不直接操作内存,但它并不是不存在,如果深究下去会发现如何保障内存和对应CPU缓存的同步是个问题。但是我们不要担心诸如此类的“一个核心写另一个核心脏读”的情况,缓存一致性协议能够解决内存和多核CPU缓存之间以及缓存与缓存之间的同步性问题。但我们仍然要关注缓存这个概念以写出高质量的程序。

局部性原理

时间局部性和空间局部性原理表示被引用过一次的存储器位置很可能在不远的将来再次被引用,而存储器中的某一个地址被引用过,那么它附近的地址很可能也会被使用。我们考虑对一个向量求和

1
2
3
4
5
int s = 0;
for(int i = 0; i < N; i++){
s += v[i];
}
return s;

首先对于标量s,它并没有空间局部性一说,但是它在每次循环中都被访问,因此具有较好的时间局部性。而向量v是被逐一访问的,即具有步长为1的引用模式,因此空间局部性很好,不过用一次就不用了,所以时间局部性不好。一般来说步长越小,空间局部性越好。这样的空间局部性在对二维数组的访问上会更加明显,例如对于行存储的模型,如果以列为单位遍历,那么就会破坏空间局部性,这个在我CFortranTranslator工程的设计上曾有所考虑。

多级缓存和缓存不命中

往下深究局部性原理,我们能够想到缓存不命中的问题。我们始终希望能够在当前缓存找到我们需要的对象,这样最快,但缓存的容量总是有限的。当缓存不能命中时,我们就需要从下一级缓存中找。当系统开始运行时缓存是空的,这时候的命中称为冷不命中或者强制不命中,没有什么道理可讲。下面我们考虑如何快速地根据地址m来定位和映射Cache,由于一般高层k缓存的容量要小,所以缓存在设计的时候会分成几个桶(高速缓存组),这样可以Hash到桶里面。事实上高速缓存一般将地址分为三段,即组索引位s、行标记位t和偏移位b。这样的Hash可能存在问题,例如我们mod 4一下到四个桶里面。那么假如说我们从k+1层交替访问0号和8号块,那么我们发现其实缓存会一直不命中,即使其他的块还是空的,这被称为冲突不命中(conflict miss)。当然有的时候缓存就是太小了,这个就是容量不命中(capacity miss)。

在组织时,我们按照桶的数量和桶中元素数量的区别将高速缓存分为了直接映射高速缓存、组相联高速缓存和全相联高速缓存。直接映射(direct-mapped)高速缓存每一个桶里面只有一行,缓存根据是否命中会选择直接返回或者向下层请求对应块。整个匹配过程分为三步,第一步是组选择,我们在地址的中间部分抽出s个位作为组索引位ss的大小取决于组数S,我们有关系$S = log_2^S$。我们选择中间部分是出于对空间局部性的利用,如果我们使用高位做索引,那么我们空间局部性好的连续的块就会被映射到一个桶里面,这样的话就会导致缓存的频繁刷新。然后第二步是行匹配,我们根据标记位t去寻找一个有效位被设置了的缓存,这是因为缓存可能会失效,这个对于只有一行的直接映射缓存来说的trivial的。t是由sb决定的,假设地址长度为m,块偏移位数为b,那么我们的t = m - s - b,这样做的话,一旦我们找到匹配的t,我们就可以认为这个地址在缓存行中。由于s占了中间位,所以t实际上就占了高位。于是第三步是字选择,因为缓存行是一个包含了$2^b$字节的块,我们整体读出来是没用的,所以我们要读到指定的字需要一个offset,这时候就可以借助于缓存行中的偏移位b了。对于下面的代码,如果x和y都被映射到一个高速缓存组中,就会造成thrash。
CSAPP中指出,即使一个局部空间性看上去很好的程序也可能造成缓存抖动(thrash)。

1
2
3
for(int i = 0; i < 8; i++){
s += x[i] * y[i];
}

此外,对于异常控制流,例如中断和上下文切换,缓存工作会受影响。此时主程序的缓存和中断处理的程序缓存会相互影响导致互相变cold。这种情况下当中断返回或者上下文切换回来时缓存需要较长时间来warm up,这种情况称为缓存污染(cache pollution)。

常见的缓存替换算法

在组相联高速缓存中,每个组中存在多个缓存行。如果我们当前的缓存满了的话,就需要去确定一个最合适的牺牲块(victim block)进行替换,我们有常见的LRU和LFU算法来达成这个目的。LFU替换过去引用次数最少的行,LRU替换最后一次访问时间最久远的一行。但是当出现偶发性的批量操作时LRU容易将Hot块(访问频率高的块)移出缓存,造成缓存污染。这时候我们引入了LFU算法,LFU需要额外的一些计算,但是在性能上好于LRU,但当数据访问模式改变后LFU需要较长的时间重新进行适应(统计频率)。因此我们引入了LRU-K算法,这个算法比较距当前第K次的访问时间和当前时间的距离来移出缓存。

缓存一致性协议

MESI是实现缓存一致性的一个基础协议,MESI分别表示缓存行可能处于的四个状态Modified、Exclusive、Shared、Invalid。其中Intel使用了扩展的MESIF,增加了状态Forward,而AMD使用了MOESI,增加了状态Owned。MESI协议需要正确处理local read(LR)、local write(LW)、remote read(RR)、remote write(RW)四个场景,其中local表示本core对本地缓存读写,而remote则表示非本地core对非本地缓存的读写。一个缓存行Cache0初始状态为I,因为它并不缓存任何数据。当本地core向该缓存行请求时,CPU会向其他缓存行询问,如果存在缓存行Cache1拥有该缓存,则将对应缓存行设为S状态,表示目前有多个缓存行缓存有该数据,并且缓存行中数据和内存中一样;否则从内存中加载到Cache0,并设为E状态,表示目前只有一个缓存行缓存有该数据。当本地处理器写入时,缓存行状态变为M,此时缓存与主存之间的数据不一致,CPU通知所有对应的其他的缓存行失效。这时候相当于该core独有这个缓存行,而其他core缓存行的状态变为I。
下面的图描述了四种读写下的状态变化。容易理解的是所有的RW都会导致I,所有的RR都会导致S,所有的LW都会导致M,下面考虑LR。对于状态E,由于是独占的,所以怎么读都是E,因此是自环。同理状态M也是独占的。对于S状态,虽然不是独占,但缓存行中数据和主存一致,因而是有效的,所以也是自环。下面对于I状态,缓存行失效意味着有其他core的写导致了缓存行的刷新,所以进行LR之后实际上就和这些core进行了数据的同步,也就是回到了S状态。

伪共享

伪共享(False Sharing)是在MESI模型下多个线程对同一缓存行竞争写所导致的性能降低。我们考虑这篇博文中的一个场景一个数组int32_t arr[]被同时加载到CPU0和CPU1的L1缓存Cache0和Cache1上。现在两个线程A和B试图分别修改arr[0]arr[1]。这种情况下是不存在race condition的,但是可能导致伪共享。我们考虑初始情况下Cache0和Cache1都处于S状态,现在CPU0收到线程A的请求,写arr[0],Cache0状态变为M,而Cache1收到通知后也作废了自己的缓存行。接下来CPU1发起了写操作,根据MESI模型,CPU0会先将arr[0]写回内存,此时Cache0变为I,之后CPU1才能从内存重新读取,Cache1变成E,然后CPU1才能修改arr[1]
为了解决伪共享存在的问题,我们通常的做法是尽量避免将需要访问的数据放到同一个缓存行中,而这就是在一些指令中需要内存对齐的原因。

测试缓存大小

缓存一致性和volatile

既然我们CPU总是能读到内存里面最新的值,那为啥还需要volatile呢?原因有二

  1. CPU不一定会读缓存,可能直接读寄存器
    这时候要注意区分CPU缓存一致性和volatile之间的关系。例如出于优化的角度,编译器可能把一个在内存的值放到到寄存器里面,以避免访问内存,既然不访问内存那和缓存一致也没啥关系了。但这样就会出现问题,如果某个线程修改了这个值对应的内存,那么寄存器是不知道的,所以这时候volatile强制说不要在寄存器里面读啦,直接从内存里面读,这时候缓存就能发挥应该有的作用了。所以CPU缓存一致性解决的是CPU的Lx缓存和内存之间之间的问题,而不是CPU寄存器和内存之间的问题
  2. 有的缓存一致性也不保证是任意时刻的
    经过简单的了解,CPU缓存行的写操作也分为直写(write though)和回写(write back)两种策略。直写就是同时写缓存和内存,因此对于直写来说,确实可以做到任意时刻各缓存中的内容等于内存中内容;但回写就不一定是任意时刻了,因为它并不是立即更新缓存,而是值修改本级缓存,而将对应缓存标记为脏段,只有当所有脏段被会邂逅我们才能达到一致性。具体还可以参看缓存一致性

内核层面的同步与并发

中断

在现在的多核CPU(特别是SMP架构)背景下,中断的机制和单核有所区别。在x86中使用的是高级可编程中断控制器(APIC),APIC由本地APIC和IO APIC组成,其中本地APIC与每个处理器核心对应,IO APIC负责采集和转发来自IO设备的中断信号。
根据Intel的规定,广义上的中断可分为同步中断和异步中断

  1. 同步中断又称异常,实际上是由CPU产生的,因此显然不能被屏蔽,异常分为故障(fault)、陷阱(trap)和终止(abort),对应到中断号的0-15。异步中断又称中断,分为外部非屏蔽中断(NMI)和外部可屏蔽中断(INTR),分别对应中断号的16-31和32-47。Intel将非屏蔽中断也归入异常,所以异常一般为来自外设或CPU中的非法或故障状态,例如常见的除零错误、缺页(故障)、单步调试(陷阱)等情况。
  2. 在中断的语境中,我们主要讨论异步中断中的可屏蔽中断,它通常是来自外部设备的中断。这是因为除了一些硬件故障,来自外部IO设备的中断常是可以等待的,所以属于可屏蔽中断,当IF标志为1时,CPU可以不响应可屏蔽中断,而是将它缓存起来,在开中断后会传给CPU。当CPU在响应一个异常时,所有的可屏蔽中断都将被屏蔽,而如果此时再出现一个异常,即产生了double fault故障,一般来说系统就会宕机。
    Linux上每一个CPU都拥有一个中断栈,这个始于Linux2.6的版本,在此之前,中断共享其所在的内核栈,后来Linux认为对每个进程提供两页不可换出的内核栈太奢侈的,所以就将其缩小为1页,并且将中断栈独立了出来。
    中断处理的原则是快,否则很容易被覆盖。因此,中断上下文又被称为原子上下文(《Linux内核设计与实现》),该上下文中的代码不允许阻塞,这是因为中断上下文完全不具备进程的结构,因此在睡眠之后无法被重新调度。
    在操作系统如Linux中,中断还可以被分为软件中断(内部中断)和硬中断,硬中断是来自硬件的中断,它可以是可屏蔽的,也可以是不可屏蔽的。软件中断一般是由int指令产生的,由操作系统提供,在Linux中软中断对应于中断号48-255。软件中断是不可屏蔽的(不然干嘛调用int呢),但在操作系统中软件中断也可能是由一个硬中断产生的,例如一个来自打印机的硬中断可能产生一个软件中断给内核中的相关处理程序。我们需要区分软件中断软中断,前者指的是INT指令,后者特指Linux的一种中断推后处理机制(在Windows和Linux的分别被称为中断延时处理和中断下半部)。

中断程序的注册和处理

需要使用中断的驱动程序会调用request_irq()向内核注册一个中断号irq和中断处理函数irq_handler_t handler,并激活对应的中断线。request_irq()会接受一个flag,flag中的一个选项IRQF_DISABLED表示禁用所有中断,如果不设置这个值,那么该中断处理程序可以被非同种中断打断,这也就是说Linux中硬中断是可以嵌套的。不过Linux禁止来自同种类中断的打断,它会挂起后来的中断,这主要是为了防止重入现象的发生,从而简化系统实现。Linux通过巧妙的机制来防止同种类中断重入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 当中断来临时,IRQ_PENDING被设置,表示正在排队
// action为当前的中断处理函数指针
action = NULL;
// 如果IRQ_INPROGRESS不为0,说明有正在处理的同种中断,所以拜拜
if (likely(!(status & (IRQ_DISABLED | IRQ_INPROGRESS)))) {
action = desc->action;
// 如果当前没有中断等待处理,我们清除IRQ_PENDING,设置IRQ_INPROGRESS
status &= ~IRQ_PENDING; /* we commit to handling */
status |= IRQ_INPROGRESS; /* we are handling it */
}
desc->status = status;

/*
* If there is no IRQ handler or it was disabled, exit early.
* Since we set PENDING, if another processor is handling
* a different instance of this same irq, the other processor
* will take care of it.
*/
if (unlikely(!action))
goto out;

需要特别注意的是“只要非同种中断发生就可以抢占内核”这句话也是不准确的,因为开中断发生在中断处理函数handle_IRQ_event中,此时如果没有设置IRQF_DISABLED,Linux就会立马开中断。

1
2
3
4
5
irqreturn_t handle_IRQ_event(unsigned int irq, struct irqaction *action)
{
...
if (!(action->flags & IRQF_DISABLED))
local_irq_enable_in_hardirq();

但是在这个函数前的一条调用链中都是关中断的,这是CPU的一个特性,即中断发生时触发中断门会自动关中断,也就是置IF为0,可参考这篇文章。关中断的好处在于可以防止中断嵌套,防止内核抢占,但不能禁止来自SMP架构下其他处理器的并发访问,为了解决这种并发访问,通常的做法是借助于例如自旋锁的机制。此外在关中断时要注意关中断会导致异步IO、调度等一系列依赖中断的功能失效,所以屏蔽中断后一定要尽快执行完临界区内代码。
此外,flag中还有一些其他的选项,例如IRQF_SAMPLE_RANDOM,这个被Linux用来实现随机数,会将每次的中断间隔填入内存熵池。与之对应的是IRQF_TIMER,这个表示系统时钟中断,显然系统时钟具有固定间隔,是不适合用来实现随机数的。

中断的开启与关闭

我们使用local_irq_disablelocal_irq_enable来控制当前处理器的中断,他们对应到x86架构上就是常见的clisti命令。不过Linux更推荐使用local_irq_save来禁止中断、local_irq_restore来恢复到原来的状态(因为可能一直就是禁止中断的)。

1
2
3
4
5
6
7
8
9
static inline void native_irq_disable(void)
{
asm volatile("cli": : :"memory");
}

static inline void native_irq_enable(void)
{
asm volatile("sti": : :"memory");
}

《Linux内核设计与实现》指出在2.5版本前存在一个禁止所有处理器中断的内核函数cli(),这个函数非常暴力,乃至它实际上提供了对其他所有中断处理程序的互斥访问机制,在这一把全局大锁下我们实际上就可以保证共享数据的互斥了,也就不要像后面版本中那样关闭本地中断后还需要用自旋锁来维护。取消这个全局大锁的好处是使用细粒度的锁能提高性能。
相对于local_系更轻量级的方案是只禁用一种中断,也就是disable_irqdisable_irq_nosync系列,他们禁止中断控制器上的指定中断线irq,其中_nosync系列立即返回,而前面的需要等待已有的中断处理完毕,其实就是在调用_nosync之后调用synchronize_irq去等待,借助于raw_spin_lock_irqsave。《Linux内核设计与实现》指出disable_irqenable_irq不是幂等的,因此要成对调用才能确保正确回到原始状态。此外还指出这个是保证不会sleep的,但是我在4.17的内核中看到现在的实现是wait_event一个事件,里面涉及一个might_sleep宏的调用。

共享中断

Linux的中断是可以共享的,也就是说同一个中断可以有多个处理程序响应。一个共享中断,例如定时器中断,必须通过request_irq设置IRQF_SHARED标志。

中断下半部

Linux中,中断下半部可以被硬中断打断,所以可以认为硬中断具有更高的“优先级”(不过Linux中并没有中断优先级的概念,虽然也有个中断线程化的东西)。这道理是显然的,因为我们又要快速地响应设备,又不能在中断上下文停留过久,所以才发明了中断下半部这个东西。
Linux中的中断下半部的实现有三种机制:Orignial Bottom Half机制、Task Queue机制、软中断Softirq机制、tasklet和工作队列,其中前两种已被替代。

Orignial Bottom Half(BH)

使用32个链表串联,全局中只有一个BH能够运行。

Task Queue

软中断

从2.3开始,软中断和tasklet被引入,其中tasklet是基于软中断实现的。软中断和软中断之间是不会抢占的,事实上它们被抢占也就是如之前提到的只能被硬中断抢占,但包括相同类型的软中断都可以在SMP的不同CPU上并行执行。注意软中断处理函数属于中断上下文,从而在处理软中断时也不允许休眠,所以其实软中断的优先级还是很高的。
Linux中使用softirq_action结构表示软中断,Linux中设置了一个长度为32的softirq_action数组来存放这些中断。

1
2
3
struct softirq_action {  
void (*action) (struct softirq_action *); /* 软中断的处理函数 */
};

可以看出,软中断实际上很像一个回调函数。那么软中断和回调函数之间有什么区别呢:

  1. 软中断能够实现不同优先级代码的跳转
  2. 软中断的重入性和回调函数不同

软中断的检查与执行一般出现在以下几种情况:

  1. 从硬中断返回时。这是一个很常见的情况,因为从硬中断返回之后往往会立即开启软中断,但是在软中断时可以自由地抢占了。
  2. ksoftirqd内核线程。
    这个内核线程是为了处理可能存在的大量的软中断。对于如网络子系统之类的使用软中断的系统,他们可能会频繁的触发软中断,并且在软中断处理函数中可能还会重复触发软中断。这样就会存在大量的软中断抢占其他任务执行时间的问题。对于这个问题,单纯地选择执行完所有当前的软中断或者所有重复触发的软中断都放到下一跳执行都是不合适的。
  3. 来自网络子系统等显式要求检查软中断的部分。

软中断的执行非常有意思,通过移位操作来实现按顺序check并执行32个软中断的行为,具体可以查看《Linux内核与实现》

虽然不如硬中断处理函数那么严苛,但由上可见软中断的使用还是有很大限制的,目前使用软中断的主要有网络IO、SCSI、内核定时器、tasklet等、RCU等。
但注意工作队列由内核线程eventX执行,允许被调度甚至睡眠。
Linux的外部中断处理过程可以参考文章

tasklet

相对于softirq,相同类型的tasklet在一个时刻只有一个在执行。tasklet通过TASKLET_SOFTIRQHI_SOFTIRQ两个软中断触发,实际上也是出于中断上下文之中的,所以不能睡眠。

进程与线程设施

进程模型

五状态进程模型包括新建、就绪(等待CPU)、阻塞(等待事件)、运行和退出。在五状态模型之外,还有挂起操作。挂起操作指的是将进程交换到外存,这常常是由于内存紧张或者该进程被阻塞的缘故。挂起不同于阻塞或者就绪,被挂起的进程犹如进入一个平行世界,当等待的事件到达时,它能够从挂起阻塞直接切换成挂起就绪。一个挂起的进程必须脱去挂起这层壳之后才能重新进入五状态模型,如一个挂起就绪态进程必须换回到内存切换成就绪态才能被调度。
在Linux中,进程的状态主要有TASK_RUNNINGTASK_INTERRUPTIBLETASK_UNINTERRUPTIBLE_TASK_TRACED_TASK_STOPPED。其中TASK_RUNNING表示进程处于执行或者排队等待执行的状态,此时进程可能位于用户态,也可能位于内核态。TASK_INTERRUPTIBLE表示进程正在被阻塞,等待条件达成或者信号。TASK_UNINTERRUPTIBLE是不可中断的睡眠状态,这是指这个进程不响应SIGKILL等外部信号,因此必须是十分短暂的,在ps -aux命令中显示为D。顺带一提另一令不能响应外部信号的僵尸进程Z(区别于失去父进程的孤儿进程),僵尸进程的资源已经被全部释放,只留下包括task_struct在内的一点信息用来给父进程提供返回码等信息,如果此时父进程被阻塞而不能回收子进程,那么子进程就会进入僵尸状态。

线程模型

对Linux内核来说,线程和进程是不区分的,无论是fork()还是pthread_create(),最后都是调用do_fork(),普通进程和轻量级进程(线程)的区别在于调用参数不同。线程的创建一般会使用CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHANDclone_flags参数选项,表示共享地址空间、文件系统、描述符和信号处理,而fork只有CLONE_CHLD
pthread(POSIX threads)是POSIX下的一套线程模型,它运行在内核外。在Linux平台上,在glibc库源码的nptl目录下可以看到pthread的具体实现。NPTL(Native POSIX Thread Library)是POSIX线程模型的新实现,它的性能和稳定性方面都优于从前用进程模拟线程的的LinuxThreads。
fork出的子进程默认会拷贝父进程的一系列资源,包括内存(包括堆栈)、文件描述符(特别地exec也会保留文件描述符,可以参考我有关subprocess的文章)、信号设定、Nice优先级值、工作目录等。

信号

Linux中信号是一个实现异步的机制。信号分为同步信号(synchronous signal)和异步信号(asynchronous signal)。同步信号类似于SIGFPE、SIGSEGV,通常标记着一个无法恢复的错误,来自当前执行上下文中,通常由一个trap陷入内核,并由一个trap handler触发。异步信号则来自当前的执行上下文之外

系统调用

在稍后的论述中,我们将看到进程在用户态和内核态之间切换时常伴随有很多奇妙的工作,不过首先来先行讨论一下系统调用。进程从用户态进入内核态可以通过系统调用、异常(如缺页异常)和来自外部设备的硬中断,而系统调用是最常见的一种,也是应用程序唯一主动进入内核的机制。
Linux的系统调用以asmlinkage long sys_打头,据说asmlinkage保证从栈中提取函数的参数,其实就是CPP_ASMLINKAGE __attribure__((syscall_linkage)),其中CPP_ASMLINKAGE就是在C++上用extern "C"导出符号。system_call系统调用负责系统调用并陷入内核,从前它是一个int 0x80指令,现在它是sysenter指令,它精简了int的流程。这是因为保护模式中CPU首先从中断描述符表(IDT)中取出对应的门描述符,比较中断描述符级别DPL和int调用者级别CPL,我们禁止CPL>DPL的调用,但是对于系统调用来说比较描述符的这一步是能省掉的,因为我们可以确定这个CPL和DPL。system_calleax中获得系统调用号,并定位到对应的NR_调用位置。我们需要特别注意的是由于内核抢占和SMP的关系,我的系统调用一定要是可重入的。

调度

多任务系统常使用非抢占式(cooperative)或者抢占式(preemptive)的调度策略,Linux支持抢占式的任务调度。和处理器的流水线设计类似,一个好的调度策略需要balance延迟和吞吐量,也就是说它应该能在一定程度上识别预测出IO型和计算型的程序,并且分别给予它们更短的响应时间或者吞吐量。此外,调度的时间间隔,或者说每个进程所拥有的时间片的大小也对应着处理器流水线深度的情况,太频繁的调度会导致系统在调度器上浪费太多的资源,所以我们也要控制调度的切换代价。Linux的调度算法从简单粗暴,变为了O(1),并在最后使用了RSDL也就是目前的CFS算法。

CFS调度算法

CFS根据NICE给每个进程分配一个处理器使用比,一个实际消耗使用比小于当前进程的进程将抢占当前进程。注意到这种策略相对于传统的,按照NICE值给高优先级进程分配更长时间片要更为贴近现实,因为NICE值较高(也就是优先级较低)的进程通常是计算密集型的,而如果按照NICE值,这些进程将获得较短的时间片,反而是优先级高的获得较多的时间片,旱的旱死,涝的涝死。此外,直接用NICE去标定时间片还会涉及定时器节拍改变导致时间片改变、优化NICE到时间片映射的问题,最终的结论是进程之间的切换频率如果非固定的,效果会更好。
CFS理论上希望一个进程能运行多久取决于处理器使用比,因此指定目标延迟(如20ms)的情况下,每个进程实际能得到的运行时间就和它占到的比例以及总的可运行进程数有关了。当然,我们的分配也是要有最小粒度的,例如最小的时间片长度不能小于1ms。Linux使用task_struct中的sched_entity结构来描述。在具体实现的时候我们并不会望文生义地来做,而是在结构sched_entity中使用vruntime表示加权的虚拟运行时间,这里的加权指的是默认进程的权重比上当前进程的权重。在update_urr中,Linux使用now - curr->exec_start计算出delta_exec时间并使用__update_curr,将未加权的加入到sum_exec_runtime,加权的加入到vruntime中。根据CFS的原则,我们现在应当从CFS队列cfs_rq(这里rq表示run queue的意思,在Linux源码中有很多这样的简写)中调用__pick_next_entity()选取vruntime最小的进程,Linux在这里使用了红黑树来实现。

Linux的调度器

schedule()函数负责选中最高优先级的调度类中最高优先级的进程,整个过程最后会到idle,对应到Linux中的0号(idle)进程。有多个调度类的原因是由于Linux中不但存在普通进程(对应于CFS),还存在实时进程。在选中之后,schedule()会调用context_switch()来进行上下文切换。
一般来说,schedule()函数是由进程调用的,当进程死亡(调用do_exit)、阻塞等需要放弃当前时间片时就会调用schedule()。我们现在考虑最主要的等待某个条件而陷入阻塞的情况,这时候线程会将自己放入等待队列,等待一个condition。这个过程涉及到多个函数,其中DEFINE_WAIT宏用来创建一个wait_queue_entry结构,add_wait_queue负责将一个进程增加到等待队列,prepare_to_wait负责等待队列上的一个进程置入睡眠,finish_wait负责将一个进程移出等待队列。这些函数中都使用了内部结构spin_lock来维护等待队列。《Linux内核设计与实现》书中举了下面的典型的用例,我们看到整个过程用while所维护,期间会发生若干次schedule()。此外我们发现进程可能会被信号所唤醒,也就是所谓的虚假唤醒/伪唤醒,这时候我们同样退出循环,而不是继续循环等待,这也就是为什么我们在后面看到使用条件变量要while-wait的原因。唤醒操作由wake_up()函数负责,它会负责调用try_to_wake_up,该函数会唤醒等待队列上的所有进程,注意系统中有很多个这样的等待队列,每个等待队列上的所有进程等待同一个事件。一般来说,wake_up的调用者就是使得条件达成的那一方,这个和条件变量的逻辑是相同的。
刚才说到有虚假唤醒,其实还有无效唤醒,也就是进程带着condition进入睡眠。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 3.11.10
// inotify_read
DEFINE_WAIT(wait);
start = buf;
group = file->private_data;
while (1) {
prepare_to_wait(&group->notification_waitq, &wait, TASK_INTERRUPTIBLE);
mutex_lock(&group->notification_mutex);
kevent = get_one_event(group, count);
mutex_unlock(&group->notification_mutex);
if (kevent) {
...
ret = copy_event_to_user(group, kevent, buf);
fsnotify_put_event(kevent);
...
continue;
}
...
if (signal_pending(current))
break;
...
schedule();
}
finish_wait(&group->notification_waitq, &wait);
...

Linux的抢占

被动式的调度指的是使用一个调度器决定哪一个任务会在下一刻运行,而不是由进程主动放弃处理器。由调度器决定暂时停止一个任务并让另一个任务开始执行的行为就是抢占式(preempt)调度。抢占式调度分为用户抢占和内核抢占两种。
先前提到,进程可以通过主动调用schedule()来放弃剩余时间片,但对于时间片耗尽的情况,内核中的scheduler_tick()函数(此时关中断,需要在2.6版本左右查看)会通过set_tsk_need_resched(p)函数设置一个need_resched标志,这个标志从逻辑上讲应该是全局性的,但为了方便从高速缓存中读取,它自2.2版本后被放到了每一个进程的task_struct中,自2.6内核后放到thread_info中。此外在刚才的try_to_wake_up()唤醒过程中,如果被唤醒的进程的优先级更高,这个标志也会被设置。从系统调用/中断处理程序中返回的情况也包括在内,这也对应着用户抢占。

用户抢占

Linux的用户抢占并不是在用户态下,而是发生在进程即将从内核态返回用户态时,这对应两种情况,从系统调用返回和从中断处理程序返回。如果不开启内核抢占,抢占式调度会给每一个进程的运行分配一个固定或者动态计算的时间片(timeslice),进程在内核态的运行会直至结束(主动放弃(yield)/时间片耗尽/阻塞)。这样的假设方便了内核的编写,特别是在单处理器(UP)下,因为我们考虑到在内核态中不存在对进程上下文的切换,所以内核并不需要考虑对临界资源的竞争访问问题,因此用户程序也可以假设在一次系统调用过程中不需要保护内核临界资源。但需要注意的是中断仍然存在,所以在进入临界区前还需要关中断。

内核抢占

通过内核抢占,系统允许高优先级的进程抢占低优先级的进程的内核态,这将能提高系统的实时性能。内核抢占可能发生在中断处理程序返回到内核空间前、内核代码具有抢占性、内核代码显式调用调度器schedule、内核中的任务被阻塞时。此外,内核抢占是可以被关闭的,即所谓的关抢占的几种情况:

  1. 当内核调度器scheduler正在运行时
    这个是显然的,我们可以直接在schedule()函数中看到相应实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 2.6.11
    // /include/linux/preempt.
    #define preempt_disable() \
    do { \
    inc_preempt_count(); \
    barrier(); \
    } while (0)
    // /kernel/sched.c
    need_resched:
    preempt_disable();
    prev = current;
    release_kernel_lock(prev);
    need_resched_nonpreemptible:
    rq = this_rq();

    另及,Linux通过内核抢占锁preempt_count来跟踪一个进程的可抢占性,当内核代码具有抢占性时(此时preempt_count为0,且need_resched被设置),则调用preempt_schedule_irq -> schedule进行内核抢占,这过程的发生场景之一就是从时间中断返回。我们看到这里关中断实际上是自增了preempt_count使它不等于0。这里的do{}while(0)是出于宏的考虑,防止和其他语法结构(如没有括号的if)混用导致错误。

  2. 中断的bottom half
    通常有上文所述的三种方式,当内核执行软中断和tasklet禁止内核抢占,但注意此时可以被硬中断打断。
  3. 当进程持有自旋锁读写锁
    这实际上是为了保护临界资源,在有关自旋锁的讨论中会详细说明。事实上当进程持有锁时preempt_count会自增,释放锁时preempt_count会自减,这也就是使用preempt_count的缘由,进程持有锁的数量直接影响到它的可抢占性。
  4. 内核正在处理中断
    这时候也就是所谓的中断上半部,中断在操作系统中拥有最高的优先级,我们也在前文中论述了中断上下文中不能睡眠的原因,这里不能抢占的原因也是类似的。
  5. 内核操作Per-CPU data structures
    在SMP架构中,不同的CPU仍然会维护一些私有数据,此时抢占可能造成一个进程被调度到另一个CPU上去,此时Per-CPU变量就会发生改变。我们可以查看相关代码

    1
    2
    3
    4
    // 2.6.39.4
    // /include/linux/smp.h
    #define get_cpu() ({ preempt_disable(); smp_processor_id(); })
    #define put_cpu() preempt_enable()

    get_cpu就是获得当前的CPU,我们看到首先这一步骤就关了抢占,其中smp_processor_id会调用raw_smp_processor_id这个宏,然后根据不同的CPU来讨论。例如对x86来说,这个宏是#define raw_smp_processor_id() (percpu_read(cpu_number)),然后是#define percpu_read(var) percpu_from_op("mov", var, "m" (var))。然后我们就可以根据获得的CPU号操作unsigned long my_percpu[NR_CPUS]这样的每个CPU独有的数据了。注意所有这样的操作是不需要加锁的,因为这是当前CPU独有的数据,而内核抢占又被关闭了,所以不存在并发访问问题。
    《Linux内核设计与实现》中指出了对每个CPU维护私有数据的好处:

    1. 减少了竞态条件和锁的使用,这个在上面已经提到
    2. 减少了缓存失效的可能
      根据MESI,同一数据可能存在于多个CPU的缓存中,这样一个CPU造成的修改会导致缓存失效。而一些性能不佳的代码会造成缓存抖动,也就是缓存的不停刷新,这样极大地降低了效率。Linux2.6内核提供了per_cpu接口能够缓存对齐(cache-align)所有数据,这样可以保证不会讲其他处理器的数据带到同一缓存上。
  6. 关中断
    这是一种特殊情况,关中断后抢占机制自然无法实现了,Linux关中断通常借助于local_irq_disable或者local_irq_save

可重入函数

考虑单线程模型下的并发,一个典型的模型是signal机制(借助于软中断实现)。一些条件会影响函数的可重入性,例如全局变量的使用,假如说在函数func涉及读写全局变量errno,在运行过程中被signal中断,而中断处理程序也会访问这个errno,那么当继续进行func时就可能读到无效的errno。容易联想到一些和硬件有副作用的函数也是不可重入的,如fprintfmalloc等。
异步可重入和线程安全是两个不同的概念,一般来说线程安全的函数不一定是可重入的,如malloc,而反之则可以使用锁机制来避免。所以我们可以体会到Linux提供的signal机制虽然能够实现异步,但是却十分不体面。

内核和运行时向外提供的并发与同步设施

多进程与多线程

进程是UNIX设计模型中的一个重要部分,UNIX推崇以多进程+IPC的方式组成应用系统,充分贯彻了KISS的方针。在UNIX产生的年代,这种方式无疑是很健壮的。
从定义上看,进程是资源分配的最小单位,那么线程是程序执行的最小单位。线程通常和同一程序下的其他线程共享一套地址空间,其中包含应用程序拥有的程序代码与资源,每个线程自己维护一套运行上下文。

同步与异步

在进程IPC中同步和异步是两种编程模型,描述了IPC中被调用者的行为。在同步模型中一个调用只有在等到结果时才返回,被调用者并不会进行通知。而异步模型中调用会立即返回,而由被调用者选择在结果达到后通过回调函数或者信号等机制进行通知。UNP书中还强调异步过程中用户不需要手动将数据从内核复制到用户(即不需要在被通知后调用read等函数)。
需要和同步异步进行区分的是阻塞和非阻塞的概念,这两个描述了调用结果未到达时调用者的状态。阻塞调用会直接睡眠线程。而非阻塞调用不会阻塞线程,这时候线程可以继续执行下面的逻辑,消息到达时线程收到一个异步信号或者回调,或者采用类似协程的方法,这时候适用异步非阻塞模型。当然线程也可以选择在原地自旋进行轮询,这就是同步非阻塞模型。
IO多路复用是一种特殊的同步模型,这是因为在poll函数中仍然需要手动将数据从内核复制回来,并且它们在消息到来前必须在一个循环中轮询,而不是立即返回,跳出循环。不过IO多路复用并不属于同步阻塞模型,因为当一个fd在等待结果时,线程可能在处理来自其它fd的返回结果。IO多路复用也不属于同步非阻塞模型,因为事实上线程还是会被阻塞的。
以UNIX套接口为例,SS_NBIOSS_ASYNC标志分别表示非阻塞和异步的选项,其中非阻塞套接口在请求资源不满足时会返回EWOULDBLOCK,而异步套接口则会通过SIGIO信号来通知进程。

协程

协程(Coroutine)是具有多个入口点的函数,协程内部可通过yield进行中断,此时处理器可能被调度到执行其他的函数。虽然线程也可以被睡眠,睡眠本身涉及用户态与内核态的上下文切换,开销较大。对于套接字等IO密集型的应用,协程在提高CPU使用率上比线程轻很多。
协程通常分为stackful和stackless两种。stackless实现不保存调用栈和寄存器等上下文,因此效率比较高。stackless的协程实现例如C++中的setjmp、Python中的生成器等。stackful即栈式构造,这时候协程拥有自己的堆栈等上下文。stackful的协程类似于C++中的ucontext、libco等。

C++下协程的实现方式

基于ucontext

ucontext是POSIX上的一套用于保存上下文的库,这里上下文包括寄存器(通用和浮点)、信号等。

黑科技1

Protothreads基于C的是围绕switch展开的一系列黑科技实现的协程。我们首先看它定义的一些原语,原来这就是一个简易的,关于行号的状态机。我在这里模仿Protothreads实现了一个简易版的协程

1
2
3
4
5
#define PT_BEGIN() bool ptYielded = true; (void) ptYielded; switch (_ptLine) { case 0:
#define PT_YIELD() \
do { ptYielded = false; _ptLine = __LINE__; case __LINE__: \
if (!ptYielded) return true; } while (0)
#define PT_END() default: ; } Stop(); return false;

为了理解这段代码,我们回想C++实现协程需要实现的一个核心问题,并不是调度,而是实现从多个入口点进入。这就意味着我们需要在退出协程函数时记录下已经到了哪里,然后在下一次进入函数时回到这个地方。对于第一个需求,我们可以借助于__LINE__这个宏表示执行到的行号,每次将这个行号赋值给一个int。对于第二个需求,我们可以借助于switch实现跳转。这里用switch而不用goto的原因是switch能够根据int变量来实现goto的功能,而goto需要依赖静态的label。这种巧妙的机制不禁让我想起了称为Duff’s device的优化方案。
不过这种机制有一个语法缺陷就是在里面嵌套的switch语句会产生干扰。

基于setjmp和longjmp

setjmp和longjmp类似跨栈帧的带上下文的goto。
int setjmp(jmp_buf env)负责将函数当前的上下文保存在jmp_buf中。

约束线程间并发行为

线程间实现互斥与同步的任务常具有下面两种形式:

  1. 多个线程同时访问一个共享资源,需要维护该共享资源的完整性。这就是Race condition问题,将在本节讨论。
  2. 一个线程向另一个线程通告自己的结果/等待另一个线程的结果,这将在章节数据共享中讨论。

总的来说,为了实现同步,等待资源的一方可以处于用户态(忙等)或者内核态(睡眠),而获得资源的一方可以选择锁进行同步,或者使用原子操作保证自己访问不被打断。这分别下面的基于锁和原子操作的工具。Linux、Windows和C++11的标准库中都对这些工具提供了不同程度的支持,具体可参考文档

Race condition

竞态条件常出现在逻辑电路这样的硬件环境中,对于软件环境而言,当两个线程竞争同一资源时,如果对资源的访问顺序敏感,就称存在竞态条件(race condition),导致竞态条件发生的代码区称作临界区。

volatile

在有关CPU缓存一致性的章节中,我们讨论的volatile、寄存器、主存和CPU缓存之间的关系。这里我们讨论语言(C/C++)层面的volatile的有关特性。
C++中的volatile并不以任何形式保证线程安全,它仅用来告知编译器期修饰的变量是易变的,要避免对其进行优化,这里所谓的优化常常是将变量缓存到寄存器中而不是每次都从内存读取。volatile并不蕴含禁止编译器或者处理器进行重排或乱序,我们需要通过编译器屏障或者内存屏障来实现这一点。
volatile关键字有时是有用的,例如可以在自旋锁中防止多次循环中始终读取寄存器中的值。但滥用volatile不仅不会提高程序的安全性,而且会导致程序变慢。对于以为可以加“volatile”就可以解决的问题,一般可以使用std::atomic来避免使用内核锁的开销。

内核锁

内核锁一般基于内核对象互斥量/信号量,它们通常是阻塞锁,会导致线程进入睡眠。锁的存在通常限制了并发范围,变并行访问为串行访问。在使用锁维护临界资源时应当争取让序列化访问最小化,真实并发最大化。

互斥量

在C++中一般不直接使用std::mutex,而使用lock_guardunique_locklock_guardunique_lock利用了RAII来自动管理加锁解锁操作,它们能够应用到包括互斥量的所有Lockable的对象上。unique_lock相比于lock_guard更灵活,用户能够手动地加/解锁,例如在条件变量中就需要unique_lock以便解锁,而在取得对临界资源后进行处理时也可以暂时解锁。unique_lock还是可移动的,以下面的代码为例,这里1处是一个直接返回lk,编译器可能进行NRVO,当然即使不这么做,2作为一个直接初始化操作,也可以接受get_lock返回的将亡值,从而完成移动构造std::unique_lock<std::mutex>。因此这里锁的控制权从get_lock转移到了process_data

1
2
3
4
5
6
7
8
9
10
11
12
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk; // 1
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock()); // 2
do_something();
}

一般会将mutex和临界资源一起放到一个类中进行管理,此时宜保证该临界资源是非public的,并且不会以引用或者指针的形式传出。这些场景例如成员函数直接返回指针或者引用,友元函数,或者一个接受函数指针P作为参数的函数,且P接受了临界成员的指针或引用,并将其泄露到类外(C++ Concurrency in Action),因此我们还要避免在持有锁时调用用户提供的代码。
虽然我们的愿景是希望最大化真实并发,因此要追求较小粒度的锁(small granularity),一个较小的粒度表现在锁所保护的临界数据较少,并且持有锁的时间较短,但粒度不可能无限小。例如考虑删除双向链表中的一个节点,需要修改三个节点的数据,如果对这三个修改单独加锁,其实等于没有加锁。因此一个直截了当的解决方案是在删除过程中锁住整个链表。如果是仍然希望为每个节点维护一把锁,那么对于删除操作必须获得被删除节点和其相邻的共三把锁,当重新连接节点时,必须在获得当前节点锁的前提下尝试获取下一节点的锁(但一旦持有了下一节点的锁就可以释放当前节点的锁了),以防后继节点产生变化。与此同时,我们还要考虑在遍历的时候需要对要访问的节点上锁,因此遍历时同样需要按照和删除相同的上锁步骤。
下面我们考虑一个线程安全的栈,其中实现了empty()top()size()push()pop()等常见方法。在多线程下,下面的代码中pop()可能使得top()的结果无效,这是因为在1和2两个方法可能有另一个线程执行了pop()。容易看出无论这些方法内部怎么加锁都无法避免这种情况,因为这里的竞态发生在这些方法之间,C++ Concurrency in Action特别指出这属于接口设计的问题。在后面内存模型的部分,我们能看到类似的问题,原子操作虽然避免了竞态,但原子操作之间可能存在的乱序必须要被考虑。书中还指出了另一个更严重的2和3之间竞争的错误,假设有两个线程并行执行该段代码,我们期望的顺序是top[1] -> pop[1] -> top[2] -> pop[2],中括号表示执行该方法的线程。然而实际执行顺序可能是top[1] -> top[2] -> pop[1] -> pop[2]。这就导致了从栈顶开始的两个元素有一个被处理了两次,另一个完全没有被处理。一个简单的解决方案是将这两个调用合并成一个带返回值的pop,使用一个mutex来管理,但Tom Cargill指出这处理方式是有问题的。这涉及到为什么标准库在设计时选择将top()pop()两个方法,原因是可能发生在成功将元素pop后而拷贝函数失败,这个元素就被丢失,所以先取top()pop()保证了top()失败情况下可以选择不执行pop()

1
2
3
4
5
6
stack<int> s;
if (! s.empty()){ // 1
int const value = s.top(); // 2
s.pop(); // 3
do_something(value);
}

死锁

使用互斥量的另一个问题是死锁,这时候锁机制避免了竞态,却可能产生线程对锁的竞争,即线程间互相等待对方的锁。死锁的产生满足四个条件:互斥、占有且请求、不可抢占和循环等待。其中等待且请求条件很关键,当完成一个操作需要获得两个及以上的锁时,死锁往往就会发生。为了解决死锁就要想办法破坏它的四个条件之一。从占有且求的角度来解决,我们可以借助于Dijkstra的银行家算法。在C++11中,我们可以使用标准库函数std::lock来同时上锁,不过现实情境下我们往往难以在申请锁时就确定自己需要哪些锁。从破坏循环等待条件的解读来设计的一个解决方案是永远按照一个顺序来获得锁,以哲学家就餐问题为例,我们将筷子进行编号,并约定哲学家们总是先尝试获得编号较低的筷子,用完后总是先释放编号较高的筷子,这样就能避免死锁问题。注意到约定哲学家们总是先拿起左手边的筷子,再拿起右手边的筷子恰恰会导致死锁问题,因为在这里我们并不是对每一个哲学家的操作指定一个相对的规则,而是为所有的资源(锁)的获取直接指定一个绝对的顺序。于是我们发现有时候去确定一个顺序并不是很容易。对于swap函数来说,我们可以按照参数的顺序来加锁,例如先获得第一个参数的锁,再获得第二个参数的锁。可惜这个是相对的,例如考虑如下面代码所示的两个规则,容易发现这两个线程并行执行时死锁就会发生了。

1
2
3
4
// thread 1
swap(a, b);
// thread 2
swap(b, a);

为了解决这个问题,一个方法是对每个对象求Hash,从而进行排序,另一种是借助于std::lock函数。这个函数的作用是将多个互斥量同时上锁(失败时则抛出异常并释放已经获得的锁),下面代码展示了一个线程安全的swap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}

friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m, rhs.m); // 1
// std::adopt_lock告知这个lock_guard已获得锁
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // 3
swap(lhs.some_detail, rhs.some_detail);
}
};
// 注意1/2/3也可以换为以下代码
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock); // unique_lock 不对互斥量上锁
std::lock(lock_a, lock_b); // 互斥量在这里上锁

std::lock的实现借助了try_lock_Try_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template<class _Lock0, class _Lock1, class... _LockN> inline
void lock(_Lock0& _Lk0, _Lock1& _Lk1, _LockN&... _LkN)
{ // lock N mutexes
int _Res = 0;
while (_Res != -1)
_Res = _Try_lock(_Lk0, _Lk1, _LkN...);
}

template<class _Lock0> inline
int _Try_lock(_Lock0& _Lk0)
{ // try to lock one mutex
if (!_Lk0.try_lock())
return (0);
else
return (-1);
}

template<class _Lock0, class _Lock1, class... _LockN> inline
int _Try_lock(_Lock0& _Lk0, _Lock1& _Lk1, _LockN&... _LkN)
{ // try to lock n-1 mutexes
int _Res;
// 如果第一个锁_Lk0直接失败,则返回失败0
if (!_Lk0.try_lock())
return (0);
try{
// 否则递归地尝试获取第二个锁_Lk1,如果失败则解开第一个锁并修改_Res为失败0
if ((_Res = std:: try_lock(_Lk1, _LkN...)) != -1)
{ // tail lock failed
_Lk0.unlock();
++_Res;
}
}catch(...){
// 如果出现异常同样解开第一个锁并返回失败0
_Lk0.unlock();
throw;
}
return (_Res);
}

值得注意的是引起死锁的并不一定是锁,而可以扩展到造成互相等待的其他情况,例如一组线程互相join等待对方,相互阻塞,这导致整个程序无法往下运行,或者线程在持有锁时同时等待其他线程。
另一种解决死锁的办法从锁的角度,为锁提供层级。一个已持有低层级锁的线程是不能试图获得高层级的锁的,试图违反这一约定的行为将导致抛出异常或终止程序。注意到不能同时持有相同层级上的锁,所以这些互斥量往往形成一条链。一个层次互斥量hierarchical_mutex的实现可以对每个线程使用一个thread_local全局变量进行维护当前线程所持有的锁的层级,默认取UINT_MAX,这样线程可以获得任何层级的互斥量。
但即使可以避免死锁也要注意锁的粒度(保护数据规模与持有时间)对性能的影响,一个总的原则是一个锁应当被持有尽可能少的时间,并且在持有过程中只应该去完成必要的工作。特别地,持有一个锁的同时等待另一个锁,即使不造成死锁,也要考虑其性能问题。以判定两个int是否相等为例,在先前我们看到了一个swap函数的实现方案,同时对两个互斥量进行加锁。但这里考虑到实际上int非常小,所以比较好的是分别对两个int加锁,复制副本,并比较两个副本,从而避免同时持有两个锁。注意和前面top()pop()所遇到的问题一样,在对intA和intB的读操作(LOAD)间可能发生另一个线程对intA的写操作,导致先前读取到的是旧值。

自旋锁

自旋锁是一种忙等锁(busy waiting),它适用于短时间锁定某个资源,这样可以避免内核锁所需要的线程睡眠(两次线程上下文切换)等一系列的开销,但持有过长的自旋锁会导致浪费大量CPU资源。特别是在单核CPU上,自旋锁的使用需要审慎考虑,因为在单核CPU上同一时间只能运行一个线程,这时候如果等待锁的线程先运行,那么它势必进入空等直到时间片用完,因为获得锁的线程势必不能运行以释放锁。因此在单核CPU上使用内核锁进入睡眠是一个好的选择。自旋锁常被用在对称多处理器(SMP)系统中,在多CPU的情况下保护临界区。

自旋锁与中断处理

处理中断时不能使用互斥量、信号量等让线程进入睡眠的锁,因此自旋锁常用于内核中有关中断处理的部分。内核锁必须在进程上下文(对应于中断上下文)中才能使用,这里的关闭中断的目的是为了关闭调度,因为关闭了时钟中断(时钟中断是可以被关闭的),调度器就无法运转了。这样就产生了睡死的现象。

持有自旋锁时不能进入睡眠

自旋锁适用于不能睡眠的场景,但双向地来说,持有自旋锁时也不能进入睡眠,否则会引起死锁。
为了理解原因,首先要了解为什么自旋锁常伴随关中断关抢占。Linux中提供了各个品种的自旋锁操作函数。其中spin_lock系列的关闭了抢占而不关中断,而spin_lock_irqsavespin_lock_irqspin_lock_bh会一道把中断也关了。
关抢占的原因是如果一个低优先级的线程A获得自旋锁而紧接着被一个高优先级的进程B抢占,那么会造成这两个线程死锁,直到时间片用尽,也就是所谓的优先级反转(优先级倒置)现象,会严重影响性能。
关中断的原因是如果一个进程A获得自旋锁然后被一个中断打断,如果这个中断处理器也试图获得同一个自旋锁,那么就会造成在中断内部的死锁(自旋锁不能嵌套上锁,否则会造成自死锁现象),并且中断处理无法被抢占(但可以被其他中断打断)。可以参考文章知乎
既然使用自旋锁应当关闭中断或者调度,那么原因就很明显了,如果进程A获得了自旋锁并且阻塞在内核态,此时内核调度了进程B(阻塞可导致调度),而B也试图获得自旋锁,那么B将永远自旋,A将永远睡眠,这类似于开中断时在中断内的死锁情况,不过在这种情况下仍有可能B时间片用完从而再次重新调度。此外另一种解释认为对于不关中断的自旋锁在睡眠后可能会被重新调度,从而造成自死锁的现象。
这种思想同样值得用在处理异常、信号等会破坏程序执行顺序的地方。

临界区

相对于互锁访问函数这种“有限的”原子操作,临界区允许对一段代码进行“原子操作”,临界区相对于互斥量比较轻便,这是由于互斥量能够跨进程,并且支持等待多个内核对象。同时线程在进入一个被占用的临界区时会首先尝试自旋锁,在自旋锁循环一定次数失败后再让线程进入睡眠。

临界资源的初始化

我们考虑临界资源的初始化问题,一个重要的情景就是实现单例模式。在C++11后,可以方便地使用局部静态变量(Meyers Singleton满足初始化和定义完全在一个线程中发生,并且发生在所有其他线程访问之前)或者std::call_once(在C++11前我们只能使用Linux系统中的替代品pthread_once)实现线程安全的单例模式。
不过首先先看看一个使用锁的朴素的,也是开销巨大的方案。查看下面的代码,我们可以发现这里用一把大锁保证了不会有两个线程竞争创建/访问p_singleton的实例。但同时需要意识到当实例被唯一地创建好后,这个函数就不需要要锁来保护了,因为在这种情况下它简单到可以作为原子操作,然而事与愿违的是每次调用这个函数都需要获得锁。因此我们需要一种仅保护临界资源初始化过程的机制

1
2
3
4
5
6
7
8
Singleton * p_singleton = nullptr;
Singleton * get_singleton() {
std::lock_guard<std::mutex> lock(mtx);
if (p_singleton == nullptr) {
p_singleton = new Singleton();
}
return p_singleton;
}

双重检查锁定模式(Double-checked locking pattern, DCLP)指的是在加锁前先进行一次验证是否可以加锁。下面使用双重检查锁定模式来减少加锁的开销,具体的做法是先检查一遍p_singleton是否为nullptr

1
2
3
4
5
6
7
8
9
10
Singleton * p_singleton = nullptr;
Singleton * get_singleton() {
if (p_singleton == nullptr) { // a
std::lock_guard<std::mutex> lock(mtx);
if (p_singleton == nullptr) { // b
p_singleton = new Singleton();
}
}
return p_singleton;
}

但其实这种加锁方式也是有理论上的风险的,例如我们的p_singleton不是原子的,甚至都不是volatile的,基于我们与编译器的约定,编译器完全可以认为p_singleton的值不会发生变化,因此直接将两层if削掉一层。
但是即使我们将p_singleton套上std::atomic、加上volatile,这个代码仍然是错误的。原因在于p_singleton = new Singleton()这条语句不是原子的。我们可以把该语句分为三步

  1. 分配内存
  2. 构造对象
  3. 指针指向对象

编译器在理论上(但实践中编译器没有理由去进行这样的重排)会在2构造对象前执行1内存分配和3指针指向操作。假设线程在1/3步骤完毕之后被挂起而来不及执行2步骤,而另一个线程开始访问a处的代码,注意到此时p_singleton已经不是nullptr了,于是函数会返回一个未初始化的内存。继续思考我们发现,这里的问题是由于第一个线程此时已经在初始化p_singleton,这第二个线程就不应该有机会执行到a处的代码,试想即使第二个线程知道初始化再被另一个线程执行,那它也做不了任何事情,因为代码中写了要么初始化并返回指针,要么直接返回指针。选择前者会破坏第一个线程的初始化过程,选择后一个会造成上面说的结果。因此在a处对p_singleton进行保护是非常有必要的。在稍后的章节中,我们将对双重检查锁定模式进行进一步的讨论。

因此实际上使用上面提到的std::call_once是一个更好的解决方案。在下面的代码中,只会输出一行Called once

1
2
3
4
5
6
7
8
9
10
11
12
13
std::once_flag flag;  
void do_once()
{
std::call_once(flag, [](){ std::cout << "Called once" << std::endl; });
}
int main()
{
std::thread t1(do_once);
std::thread t2(do_once);

t1.join();
t2.join();
}

这里的std::once_flag不能被拷贝和移动,其实相当于一个锁,call_once实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template<class _Fn,
class... _Args> inline
void (call_once)(once_flag& _Flag, _Fn&& _Fx, _Args&&... _Ax)
{ // call _Fx(_Ax...) once
// 定义一个_Tuple类型
typedef tuple<_Fn&&, _Args&&..., _XSTD exception_ptr&> _Tuple;
// _Seq的值索引上面的_Tuple
typedef make_integer_sequence<size_t, 1 + sizeof...(_Args)> _Seq;

_XSTD exception_ptr _Exc;
// 将回调函数参数打包到_Tuple类型里面,最后一个是exception_ptr
_Tuple _Tup(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)..., _Exc);

// 使用_Tup里面的上下文特化_Callback_once函数模板
_Lambda_fp_t _Fp = &_Callback_once<_Tuple, _Seq, 1 + sizeof...(_Args)>;

// 在xonce.cpp中实际调用了__crtInitOnceExecuteOnce的WINAPI
if (_Execute_once(_Flag, _Fp, _STD addressof(_Tup)) != 0)
return;

if (_Exc)
_XSTD rethrow_exception(_Exc);

_XGetLastError();
}

// xonce.cpp
_STD_BEGIN
_CRTIMP2_PURE int __CLRCALL_PURE_OR_CDECL _Execute_once(
once_flag& _Flag, _Lambda_fp_t _Lambda_fp, void *_Pv) _NOEXCEPT
{ // wrap Win32 InitOnceExecuteOnce()
static_assert(sizeof(_Flag._Opaque) == sizeof(INIT_ONCE), "invalid size");

return (__crtInitOnceExecuteOnce(
reinterpret_cast<PINIT_ONCE>(&_Flag._Opaque),
reinterpret_cast<PINIT_ONCE_FN>(_Lambda_fp),
_Pv, 0));
}

读写锁

相对于临界区,读写锁能提供更精细的控制,它适用于写操作远少于读操作的数据结构。读写锁允许一个写线程独占访问,而多个读线程并行访问。当写线程需要独占访问时,它需要获得一个排它锁,如果此时有另外的线程持有排它锁或者共享锁,那么本线程就会被阻塞。当读线程需要共享访问时,只要没有线程持有排它锁,那么他就可以立即获得共享锁。读写锁的过程可以参照来自博文的论述

1
2
3
4
5
6
7
8
9
10
11
12
13
Read object begin
P(object.lock)
AtomicAdd(object.activeReader, 1)
V(object.lock)
Do Actual Read
AtomicAdd(object.activeReaders, −1)
end
Write object begin
P(object.lock)
while object.activeReaders != 0 do delay
Do Actual Write
V(object.lock)
end

对读写锁Windows API提供了所谓的SRW系列函数,Linux提供了rwlock系列函数。在C++14中终于提供了std::shared_timed_mutex来实现读写锁,C++17中提供了std::shared_mutex来实现一个没有定时的读写锁,从设计上看来这是有点本末倒置的,为什么要在C++17实现一个功能更少的类呢?SoF指出这是出于性能原因,在C++14制定时,原本的std::shared_mutex拥有定时功能,但稍后一些人指出去掉定时功能能够提高效率,例如在Windows上的SRWLOCK机制提供了高效简便地实现一个没有定时的读写锁的方案,因此后来C++14版本的std::shared_mutex被重命名到std::shared_timed_mutex,而一个没有定时的std::shared_mutex在C++17提供。在C++17中我们的读锁可以声明为std::shared_lock<std::shared_mutex>(shared_lock来自C++14),写锁可以声明为std::unique_lock<std::shared_mutex>(unique_lock来自C++11),如此跨越三个版本的标准才最终完成的实现,你只有在C++中才能看到。
从实现上来看,无论是关键段、互斥量、信号量,甚至是条件变量都可以实现读写锁。

  1. 关键段的实现方式
    这里摘录了CSDN上的一个实现。其思想如下
  2. 互斥量的实现方式
    使用互斥量时我们需要注意在进行读操作时我们要获取写锁以免脏读,但可能出现多个读线程竞争写锁的情况,所以我们需要一个读锁。只有竞争到读锁的线程才能去锁定写锁。其过程如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 写
    lock(mutex_write);
    write();
    unlock(mutex_write);

    // 读
    lock(mutex_read);
    if(readers == 0)
    lock(mutex_write);
    readers++;
    unlock(mutex_read);
    read();
    lock(mutex_read);
    readers--;
    if(readers == 0)
    unlock(mutex_write);
    unlock(mutex_read);
  3. 信号量的实现方式
    这里的Swait(sem, t, d)表示信号量sem的P操作需要t个资源,并且会消耗d个资源,Ssignal(sem, d)表示信号量sem的V操作产生d个资源。这里Swait类似std::lock,可以同时对若干个信号量上锁,从而避免死锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 初始化
    max_reader = n; // 最多允许n个读者读
    Sinit(sem_read, max_reader);
    Sinit(sem_write, 1);

    // 写
    Swait(sem_write, 1, 1; sem_read, max_reader, 0);
    write();
    Ssignal(sem_write, 1);

    // 读
    Swait(sem_write, 1, 0; sem_read, 1, 1);
    write();
    Ssignal(sem_read, 1);
  4. 条件变量的实现方式

不变量与恶性条件竞争

不变量(invariant)是某个特定数据结构始终保持的特性。例如通过链表的前向指针始终能到达前驱结点。不变量对应的特性常常在更新过程中被破坏,特别是当更新涉及到修改多个值,或者需要多个过程时。这就类似于在最终一致性系统的窗口内强一致性被破坏。当不变量遭到破坏时,才会产生竞态条件(C++ Concurrency in Action: Ch3)。
为了解决竞态条件,一种方法是确保只有当前进行修改的线程才能看到不变量被修改的中间状态,也就是将临界资源保护起来,前面看到的互斥量等属于这种机制。另一种方法是借助于锁无关编程技术,这种技术将对数据结构的修改分解为若干个不破坏不变量的原子操作。还有一种办法是借助于事务的STM技术,将所需要的操作存储于日志中,再合并提交。

锁无关

锁无关(Lock-Free)是一种比无干扰(Obstruction-Free)高层次的并发模型,它是一个容易混淆的概念。锁无关与其他模型的本质区别并不是不用锁(Lockless),而是确保各个线程在访问共享资源时之间不会互相阻塞,从而使得整个程序整体上能够始终向后执行。相对应地,如果使用内核锁,如果一个获得内核锁的线程被挂起或者挂掉,这容易导致其他拥有锁的线程陷入永久等待。但即使借助于原子操作,也会产生死锁、竞态的问题,例如自旋锁的死锁问题和使用CAS时可能出现的ABA问题
加锁操作通常存在着一些问题,锁无关的编程虽然复杂,但相对于使用锁,锁无关的可伸缩性和性能方面会强于锁相关的算法,并且如果我们能够有序地组织各个线程“各行其道”,就能减少锁的使用。通常来说一个基于锁的算法在高竞争的系统中有较好的效率,因为当发生竞态时线程进行睡眠而不是立即重试,但在一般的情境中,不使用锁往往能避免上下文切换的开销。
相应的还有一个无阻塞(Non-blocking)的概念,无阻塞的限制条件要弱于锁无关。属于无阻塞算法而不属于无锁算法的常见例子包括自旋锁。在自旋锁中,所有的线程都不会进入睡眠,因此是非阻塞算法;而考虑当获得锁的线程因为一些原因被暂停时,所有的其他线程仍然需要在原地自旋忙等,因而自旋锁不是无锁算法。由此可见,锁无关编程中定义了如原子操作、CAS之类的范式,它们本身都是不涉及到锁的,但为了确保我们的读写成功,往往需要尝试若干次,但这个多次尝试造成的等待本身不属于锁无关的范畴。其实锁机制本身也是和等待没有关系的,因为有些操作中的等待是客观的,我们不能看到锁,就想到阻塞,就想到等待,这是不正确的,即使我们不停while去轮询,没有锁,但还是存在等待。

原子操作

原子操作是常见的实现锁无关编程的方式,常见的原子操作有CAS、FAA、TAS、TTAS等。原子操作指的是不可被中断的一系列操作,在原子操作保证当前操作中不发生线程切换,因此保证了其他的线程不可能访问这个资源。原子操作一般有两种实现方式,第一个是使用锁或者CPU的特殊指令等机制来维持原子性,第二个是当出现并发写等破坏原子性的情况时让操作失败,因此对于第二种情况需要使用一个循环不断尝试。这里需要注意的是,原子操作并不一定就能够提高效率,也就是所谓的scalability,这是由于涉及对共享对象操作的原子指令都可能造成cache invalidation,也就是需要重新刷新缓存行。另外,原子操作本身也是很慢的,如下图所示

我们知道x86汇编要求对任意位置的1字节,以及对2/4/8对齐的2/4/8长度的整型读取都是原子的,但是C++11前我们却不能假设甚至是对一个int赋值的操作是原子的,而在C++11后我们需要使用std::atomic来显式声明一个原子的变量。这一方面是由于C++无法保证通过一条指令从内存的存取(考虑一些违反strict aliasing的胡乱cast破坏了对齐)。另一方面也是C++11前根本没有考虑对多线程提供语言级别的支持(这点Java就做得比较好),C++标准规定data race,即并发地去修改一个对象是UB的,所以编译器可以不考虑多线程的情况而进行优化,产生错误。因此通常的方式是直接使用操作系统提供的API,一般来说如果能够有一个原子的CAS,那么就能够借助它实现其他的原子操作。
为了展示问题的复杂性,下面展示了一个早期GCC编译器的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
extern int v;

void f(int set_v)
{
if (set_v)
v = 1;
}
// GCC 3.3.4--4.3.0 O1
f:
pushl %ebp
movl %esp, %ebp
cmpl $0, 8(%ebp)
movl $1, %eax
cmove v, %eax ; load (maybe)
movl %eax, v ; store (always)
popl %ebp
ret

在这个问题中考虑调用f(0),理想情况下v的值无论如何都不会变动的,但是在gcc生成的代码中,我们看到一个始终执行的存储movl %eax, v。显然编译器认为f并不会被修改,而且这对仅面向单线程优化的编译器是完全有理由说得通的。但如果在load和store之间发生了切换,并导致竞态。
除了上面的例子,一些常见的优化,例如循环展开都会造成严重后果。

互锁访问函数和CAS操作

操作系统提供的原子API常借助于某些CPU(比如Intel处理器)提供的指令,能够对某些类型实现某些原子操作。注意到对SMP架构而言,会出现多个核心并发写的情况,这个涉及到后面的内存模型,并且在这里我们可以暂时忽略这个问题。
Windows API提供了一系列Interlocked开头的互锁访问函数,这些函数在处理器层面被保证独占访问。其中一个很关键的便是InterlockedCompareExchange(PLONG dest, LONG value, LONG old)函数,这个函数提供了对LONG类型的原子的CAS操作。InterlockedCompareExchange*destold进行比较,如果相等就将*dest设为value。显然,通过内核锁能够方便地实现原子语义,但原子操作通常会借助于这样的CAS操作,因为这样能避免线程进入睡眠。借助于CAS可以实现其他的原子操作,例如下面的对LONG进行原子赋值的InterlockedExchange函数。在C++11的std::atomic类型中,我们会看到更多的CAS的应用。

1
2
3
4
5
6
7
LONG InterlockedExchange(LONG volatile * target, LONG value){
LONG old;
do{
old = *target;
}while(! InterlockedCompareExchange(target, value, old));
return old;
}

这里的do-while循环保证了在InterlockedCompareExchange失败之后再来一遍能够再来一遍直到成功,但是不能将这个循环和自旋锁中的忙等混淆,从而认为CAS不是锁无关的。这是因为CAS实际上并没有“持有”临界资源,它只需要一个指令就能结束战斗。因此任意一个线程的暂停并不会使得其他线程进入忙等,甚至能够使得CAS的成功率更高。因此可以看出CAS在这里对竞争访问实际上是“消极防御”的态度,也就是所谓的乐观锁(Optimistic Locking),乐观锁是一种非独占锁,它并不是向内核锁一样直接让竞争者们睡眠,而是返回一个失败的状态。相对应的,之前的内核锁和自旋锁等机制属于悲观锁、独占锁。相比乐观锁,悲观锁有以下的弱点(也可以理解为基于锁算法的弱点)

  1. 上下文切换造成的性能开销
  2. 可能造成的死锁问题
  3. 优先级倒置

使用CAS操作实现的并发缓冲队列

常见的用CAS实现的Lockfree算法例如缓冲队列。首先对于一读一写的模型我们可以仅通过约束读指针和写指针的行为即可实现,并不需要接触并发模型。下面我们考虑多对多的模型,以论文Implementing Lock-Free Queues中的论述为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Enqueue(x) {
q = new record();
q->value = x;
q->next = NULL;

do {
p = tail; // 使用tail维护链表尾指针的位置
} while( CAS(p->next, NULL, q) != true); // 1

CAS(tail, p, q); // 2
}

DeQueue() {
do{
p = head;
if (p->next == NULL){
return ERR_EMPTY_QUEUE;
}
} while( CAS(head, p, p->next) != TRUE );
return p->next->value;
}

这里有一个疑问,就是为什么2这句不使用循环保护起来,这是因为这个语句是始终能够成功的。我们考虑成功进行了1处CAS的线程T1,它使得tail->next不为NULL了,假如此时另一个线程T2执行到1,那么它的CAS一定是失败的。这个过程一直到语句2之后tail被成功更新成q,因此实际上可以把tail->next看成一个锁一样的东西。既然如此,我们容易发现一个违背锁无关性质的可能性,也就是当线程T1在执行语句2时挂掉了,那就会阻塞所有其他在循环中的线程。因此我们提出下面的改良版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
EnQueue(x)
{
q = new record();
q->value = x;
q->next = NULL;

p = tail;
oldp = tail;
do {
while (p->next != NULL)
p = p->next;
} while( CAS(p->next, NULL, q) != TRUE); // 1

CAS(tail, oldp, q); // 2
}

考虑到可能来自其他线程的未提交(指未执行语句2)的添加,我们发现语句p = tail中的tail并不一定是结尾,这也导致了为了维护离开循环时p必须指向结尾这个特性,线程需要在循环内自旋,从而导致上述的死锁现象的产生。为了解决问题,在这一版本中我们索性放宽假设,认为tail只是“接近”结尾,因此现在我们需要使用一个内层的while循环来从tail开始尝试更新结尾。这样即使T1线程挂在语句2,没能更新完tail指针,线程T2也可以自动跟踪到T1在1处的修改。此时我们也不要担心语句2的失败问题,因为有的时候它应该失败。考虑下面的执行顺序:原先链表中只有一个元素1,此时线程T1添加了一个元素2,并且成功执行语句1,将p指向了元素2的位置。此时发生了调度,线程T2获得处理器,它需要在队列中加一个元素3,T2在刚进入循环时它发现自己的tail是指向1的,但它在内层的while循环中根据pnext指针走到了刚被T1添加进去的元素2处。因此T2在元素2的末尾增加了元素3,并且更新自己的p指向元素3。T2继续执行语句2,此时tail == oldp指向元素1,所以CAS成功,tail指向了元素3。接着T1重新获得了处理器,此时tail已经被T2修改到指向元素3了,于是不能匹配oldp,这个CAS就会失败。容易看到这个失败不会影响tail指向精确的队列结尾。但是如果我们稍稍修改下上面的运行顺序,按照T1添加元素2 => T2添加元素3 => T1修改tail => T2修改tail(失败)来执行,那么我们就会发现tail被更新到指向元素2而不是元素3。所以我们看到先前我们放宽的假设是非常有必要的,在论文中作者指出这种情况下tail指针距离列表的准确结束位置最多相差2 * p - 1个节点。其实这个“最多”还是有点多的,所以在实践中我们常常结合两种方案来使用。

CPU提供的原子操作

如果细究上面WINAPI的InterlockedCompareExchange,容易猜到它的实现方式来自于CPU的硬件支持,因为它只能为特定数据类型提供服务。事实上,这里用了x86中的cmpxchg命令
CPU原子操作的实现借助于总线锁、缓存锁等机制。

C++的原子操作库

在上面的章节中,我们概览了锁无关编程的一些思想。从现在开始,我们将讨论C++11标准库提供的原子操作支持。

std::atomic_flag

std::atomic_flag是C++11原子库的一个基础设施,它被广泛地运用到下面的std::atomic类模板的实现中。std::atomic_flag基于TAS(test-and-set)操作维护了一个布尔量flag,提供了test_and_setclear两个方法,可以保证对flag的写不会冲突,读不会脏读。test_and_set尝试将flag从false设为true,如果发现flag已被设置,否则原子地设置flag为true,该函数返回的是flag先前的值。由于std::atomic_flag原子地维护了一个flag,它常被用来实现自旋锁。下面的代码来自MSVC的atomic库,它在std::atomic_Atomic_copy方法中被调用。

1
2
3
4
5
6
7
8
9
10
11
12
inline void _Lock_spin_lock(
volatile _Atomic_flag_t *_Flag)
{
while (_ATOMIC_FLAG_TEST_AND_SET(_Flag, memory_order_acquire))
_YIELD_PROCESSOR;
}

inline void _Unlock_spin_lock(
volatile _Atomic_flag_t *_Flag)
{
_ATOMIC_FLAG_CLEAR(_Flag, memory_order_release);
}

此外,容易发现TAS操作也可以通过CAS实现,其代码很简单

1
return InterlockedCompareExchange(&flag, true, false);

在MSVC的标准库实现中test_and_set借助了Interlock互锁访问函数,保证了访问的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 #if defined(_M_ARM) || defined(_M_ARM64)
#define _INTRIN_RELAXED(x) _CONCAT(x, _nf)
#define _INTRIN_ACQUIRE(x) _CONCAT(x, _acq)
#define _INTRIN_RELEASE(x) _CONCAT(x, _rel)
#define _INTRIN_SEQ_CST(x) x
#else /* defined(_M_ARM) || defined(_M_ARM64) */
#define _INTRIN_RELAXED(x) x
#define _INTRIN_ACQUIRE(x) x
#define _INTRIN_RELEASE(x) x
#define _INTRIN_SEQ_CST(x) x
#endif /* defined(_M_ARM) || defined(_M_ARM64) */
inline int _Atomic_flag_test_and_set(volatile _Atomic_flag_t *_Flag,
memory_order _Order)
{ /* atomically test flag and set to true */
switch (_Order)
{
case memory_order_relaxed:
return (_INTRIN_RELAXED(_interlockedbittestandset)(_Flag, 0));

case memory_order_consume:
case memory_order_acquire:
return (_INTRIN_ACQUIRE(_interlockedbittestandset)(_Flag, 0));

case memory_order_release:
return (_INTRIN_RELEASE(_interlockedbittestandset)(_Flag, 0));

case memory_order_acq_rel:
case memory_order_seq_cst:
return (_INTRIN_SEQ_CST(_interlockedbittestandset)(_Flag, 0));

default:
_INVALID_MEMORY_ORDER;
return (0);
}
}
inline void _Atomic_flag_clear(volatile _Atomic_flag_t *_Flag,
memory_order _Order)
{ /* atomically clear flag */
static_assert(sizeof(_Atomic_flag_t) == sizeof(_Uint4_t),
"Unexpected _Atomic_flag_t size");

switch (_Order)
{
case memory_order_relaxed:
case memory_order_release:
case memory_order_seq_cst:
_Atomic_store_4((volatile _Uint4_t *)_Flag, 0, _Order);
break;

default:
_INVALID_MEMORY_ORDER;
break;
}
}

std::atomic

在前面的讨论中我们已经明白C++中的基本类型并不保证是原子的,所以std::atomic<T>定义了一系列具有原子行为类型std::atomic禁用了复制构造函数和复制赋值运算符(同时也不允许移动)。这是由于这两个操作发生在两个对象间,势必要破坏原子性,而一个std::atomic的所有操作都是原子的。
对于用户自定义类型(UDT)typename Tstd::atomic主模板要求T满足standard layout、trivial default constructor和trivial destructor,即编译器可以使用memcpy等进行bitwise的复制并使用memcmp进行bitwise的比较。在CAS操作的实现中调用了memcpymemcmp。这看起来限制很大,我们希望有用户自定义的复制构造函数,这样我们就可以进行member-wise的操作了,对此想法,C++ Concurrency in Action一书中指出,如果有自定义的复制构造函数,那么就势必要将锁定区域(下文中会交待其实std::atomic的主模板实现中可能会有自旋锁)内的数据交给这些用户代码,如果在用户代码中再使用了锁,就可能产生死锁的现象。容易发现我们常用的智能指针std::shared_ptr并不能被放到std::atomic里面,但是我们又确实有这个需要,因此标准库通过重载std::atomic_系列函数为std::shared_ptr提供了原子操作的支持。而对于std::atomic类型,我们既可以通过std::atomic_系列函数,也可以通过std::atomic模板中提供了compare_exchange_weakcompare_exchange_strongloadstore等操作。
一般标准库会对一些大小满足能够直接使用某些处理器的原子指令的类型进行特化,例如指针类型、integral类型和一些用户定义类型。对于指针类型,std::atomic会进行偏特化。原子的指针运算可以通过fetch_开头的函数和相应的operator运算符来实现。对integeral类型std::atomic类模板也会进行特化,其实现类似指针类型,并且添加了对位运算的支持。对于“复杂”的整型计算如乘法,虽然atomic未提供,但可以通过compare_exchange_weak等函数间接实现。从C++20开始,std::atomic类模板提供对浮点类型的特化。注意在这之前,compare_exchange_strong等CAS方法对浮点数可能出现问题,原因显而易见是memcmp的锅,C++浮点数之间比较时甚至都不能使用==,遑论memcmp
在上文中提到,除了std::atomic_flagstd::atomic<typename T>类模板都是不保证不使用锁的(情况特定于处理器和标准库实现),用户可通过bool is_lock_free()函数判断是否Lockfree的。以PJ Plauger的实现为例,主模板的load()内部就使用了上文提到的用std::atomic_flag实现的自旋锁。

1
2
3
4
5
6
7
inline void _Atomic_copy(volatile _Atomic_flag_t *_Flag, size_t _Size,
volatile void *_Tgt, volatile const void *_Src, memory_order _Order)
{
_Lock_spin_lock(_Flag);
_CSTD memcpy((void *)_Tgt, (void *)_Src, _Size);
_Unlock_spin_lock(_Flag);
}

这里要提一句,主模板的template<class _Ty> struct atomic的实现继承了_Atomic_base<_Ty, sizeof (_Ty)>。这个_Atomic_base<_Ty, sizeof (_Ty)>模板又继承了_Atomic_impl<_Bytes>模板,其作用相当于把_Atomic_impl<_Bytes>中全void *的东西封装回了_Ty。我们查看最核心的_Atomic_impl<_Bytes>模板,它是和数据字节数相关的,分别对1/2/4/8字节的进行了偏特化。模板里面定义了最重要的_Is_lock_free_Store_Load_Exchange_Compare_exchange_weak_Compare_exchange_strong等操作,全部是void*的。我们刚才看到的_Atomic_copy来自于主模板。我们下面看看_Atomic_impl<_Bytes>的偏特化版本,如对于一个uint2_t是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  #define _Compiler_barrier()   _ReadWriteBarrier()

#if defined(_M_ARM)
#define _Memory_barrier() __dmb(_ARM_BARRIER_ISH)
#endif /* defined(_M_ARM) */

#if defined(_M_ARM64)
#define _Memory_barrier() __dmb(_ARM64_BARRIER_ISH)
#endif /* defined(_M_ARM64) *

/* _Atomic_load_2 */
inline _Uint2_t _Load_seq_cst_2(volatile _Uint2_t *_Tgt)
{
_Uint2_t _Value;

#if defined(_M_ARM) || defined(_M_ARM64)
_Value = __iso_volatile_load16((volatile short *)_Tgt);
_Memory_barrier();

#else
_Value = *_Tgt;
_Compiler_barrier();
#endif

return (_Value);
}

inline _Uint2_t _Load_relaxed_2(volatile _Uint2_t *_Tgt)
{
_Uint2_t _Value;

#if defined(_M_ARM) || defined(_M_ARM64)
_Value = __iso_volatile_load16((volatile short *)_Tgt);

#else
_Value = *_Tgt;
#endif
return (_Value);
}

inline _Uint2_t _Load_acquire_2(volatile _Uint2_t *_Tgt)
{
return (_Load_seq_cst_2(_Tgt));
}

可以发现在这个偏特化版本的实现中直接借助了处理器提供的设施,而避免了自旋锁的使用。

weak和strong版本的CAS

类似与Windows API的互锁访问函数,atomic库通过bool atomic::compare_exchange_weak(old, value)bool atomic::compare_exchange_strong(old, value)提供了对CAS操作的支持。这两个函数监测该std::atomic中维护的std::atomic_flag _My_flag(在_Atomic_impl模板中定义)的值,如果等于old就改为value,函数返回一个表示修改是否成功的bool量。因此容易发现这两个函数不总是成功的,因为CPU可能仅对某些类型提供了相应的CAS原子指令,对于其他的类型则必须通过使用自旋锁甚至内核锁来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
inline int _Atomic_compare_exchange_weak(volatile _Atomic_flag_t *_Flag, size_t _Size, volatile void *_Tgt, volatile void *_Exp, const volatile void *_Src, memory_order _Order1, memory_order _Order2)
{ /* atomically compare and exchange with memory ordering */
int _Result;

_Lock_spin_lock(_Flag);
_Result = _CSTD memcmp((const void *)_Tgt, (const void *)_Exp, _Size) == 0;
if (_Result != 0)
_CSTD memcpy((void *)_Tgt, (void *)_Src, _Size);
else
_CSTD memcpy((void *)_Exp, (void *)_Tgt, _Size);
_Unlock_spin_lock(_Flag);
return (_Result);
}

这两个函数有一些细致的区别,compare_exchange_weak在可能会False Negative,这是由于weak允许spurious failure。在某些平台(不错ARM、PowerPC又被点名了)上的CAS是通过LL/SC实现的,而不像x86上那样只通过一条指令,所以可能存在问题。因此使用compare_exchange_weak的时候需要一个循环。注意到这个!expected不是必要的。

1
2
3
bool expected=false;
extern atomic<bool> b; // set somewhere else
while(!b.compare_exchange_weak(expected,true) && !expected);

Stackoverflow上相关问题的整理中提到了这两者之间的性能比较,由于weak会忽视检查,所以一般weak比strong快。但是如果使用strong能避免weak+loop,那么选择strong是适合的。注意到即使使用strong,loop也不是就一定可以避免的,因为原子操作本来就存在使用乐观锁的情况。

ABA问题

伴随着CAS的是可能存在的ABA问题。ABA问题的根源是从内存中取出值和CAS这两个操作不是原子的,因此可能在这两个过程中发生切换。

原子操作与锁的关系

在上面的讨论中,我们能够直观地发现原子操作和锁的关系。原子操作看起来是“独善其身”的只能管住自己,原子操作之间、原子操作和非原子操作之间可能发生乱序或重排;而锁像大哥,能护住一段代码。由此我们思考如何通过原子操作来组织其他的那些非原子操作呢?这就要引入下面讨论的内存模型的问题。

其他的同步原语

在上面的几个章节中,我们论述了基于锁和基于原子操作的同步原语。还有一些其他的同步原语,例如RCU、MCS Lock等。在Linux中使用了

Hazard Pointer

Hazard Pointer类似于面向多线程的智能指针,它能够无等待地进行线程安全的垃圾回收。

RCU

RCU(Read Copy Update)是Linux2.6引入的一种替代读写锁的方法,在Linux内核中被广泛使用。其思想是在多个读者能和一个写者并行,写者在访问值时首先拷贝一个副本,对副本进行修改,再在适当的时候将新的数据一次性写回,这个时机就是所有读者完成读取之后。由此看出这种方法能够很好地适应读多写少的情况,因为现在读者端不需要任何锁来保证同步,也不会被写者阻塞。但是写者端的任务加重了,一方面它仍然要处理和其他写者的竞争问题,另一方面它具有复制开销,还需要监听所有读者的信号以确定数据的修改时间。下面描述了写操作的流程:

  1. 对旧数据建立一块副本,对副本进行修改
  2. 等待前面所有的读者完成读取
    这时候RCU实际上向专门的垃圾回收器注册一个callback并等待器通知,这段时间称为grace period。
  3. 写者将原来的数据替换为新的数据
  4. 写者删除旧的数据

通过内存模型约束线程对变量的读写顺序

锁无关编程的难点之一就是需要从编译器与CPU两个层面考虑行为对线程间的同步造成的的影响,也就是考虑编译器重排和CPU缓存与乱序对读写逻辑可能造成的影响。当我们试图使用原子操作去解决非原子操作间的竞态问题时,那么我们需要谨慎选择使用恰当的内存模型,这样能够在提升效率的同时保证安全性。

原子操作的线程间顺序

我们知道从C++语言到运行程序得到结果之间需要经历编译器优化和处理器优化两道坎,处理器优化包括高速缓存和指令乱序,编译器优化可能进行重排。编译器和处理器达成的协议是不能改变单线程程序的行为。以编译器优化为例,下面展示的代码在O0下,g++7按照1-2的原始顺序来编译,但开启O1后,g++7就会进行Store-Store重排,将2提到1前面,先对b赋值,再对a赋值;并且还去除了一部分没用的代码。对单线程来说,这样的优化并没有任何问题。但对于多线程来说则可能出现问题。一方面,由于去除部分代码的原因,汇编O1事实上不能在a处观察到a == 1 && b == 1的情况,而假设O0汇编在进行到b处被抢占,那么其他的线程有机会看到以上的情况。另一方面,在O1中先对b赋值再对a赋值,仍然会出现问题。假如说在3和4间线程被强占,那么另外一个线程观察ab,得到b为123,而a为不确定值(或者1,如果前面赋初值语句没有被删去的话),而如果编译器不进行重排,我们理想中的原始结果是a为43,b为不确定值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int main(){
int a = 1, b = 1; // 0
// a
a = b + 42; // 1
// c
b = 123; // 2
printf("%d %d", a, b);
return 0;
}
// compile with -O0
movl $1, -8(%rbp)
movl $1, -4(%rbp)
// b
movl -4(%rbp), %eax
addl $42, %eax
movl %eax, -8(%rbp)
movl $123, -4(%rbp)
movl -4(%rbp), %edx
movl -8(%rbp), %eax
// compile with -O1
movl $123, %ecx // 3
movl $43, %edx // 4

出于性能方面的考虑,对多线程的程序而言并不存在和单线程一样的硬性要求。在使用原子操作等锁无关技术时,不能假设所有环境下程序最后行为一如我们希望代码所“暗示”一样。事实上编译器或处理器可以在不同线程间不同变量间进行读写乱序,而这在多线程中会造成问题。例如在单线程中将对B的写提到对A的读前面是没有问题的,但是对多线程来说,这往往就会出现问题。虽然大部分时候我们不需要操心这个问题,这是因为一方面在使用mutex等内核锁时,内核帮我们做了相关工作,另一方面部分处理器(如x86)也提供了(近似,实际上是TSO)acquire/release的保障,并也可以通过一些指令命令编译器在某些地方减少优化,但这依然是一个客观存在的问题。

可见性和有序性

可见性指一个线程对变量的写操作对其它线程后续的读操作可见,这里见的是结果。可见性要求CPU在对缓存行写操作后能够保证至少在某个时间点前必须写回内存,从而保证其他线程能读到最新的值。有序性指的是数据不相关变量在并发的情况下,实际执行的结果和单线程的执行结果和单线程的执行结果是一样的,不会因为重排/乱序的问题导致结果不可预知。

根据以上的定义,我们引入下面的两个概念:
如果操作A先行发生(happen-before)于操作B,那么A造成的修改能够被B观察到。我们以知乎上举出的一个例子来理解,根据上面有关原子操作的线程间顺序的讨论,我们知道下面的代码在单线程条件下断言是始终成立的,即使语句1和2之间发生了重排。容易看出happen-before强调的是一个现象,C++保证在单线程中happen-before现象是始终成立的,不管后面编译器和CPU如何进行重排。

1
2
3
4
5
6
int a, b;
void foo() {
a = 42; // 1
b = a; // 2
assert(b == 42);
}

如果A同步发生(synchronizes-with)于B,那么某个线程中A的修改能够被另一个线程中的B观察到。它实际上是建立一种方法,使得一个时间点前内存的变化能够被其他线程看到。我们引用同样的来源的一个例子,由于重排的问题,在下面的代码中语句4的断言不一定成立(我们不考虑x86 CPU的TSO模型)。这就说明在Relax等无约束或者少约束的内存模型下,在多线程中试图通过某原子量来同步非原子量并不是可靠的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int data;
std::atomic_bool flag { false };

// Execute in thread A
void producer() {
data = 42; // (1)
flag.store(true, memory_order_relaxed); // (2)
}

// Execute in thread B
void consume() {
while (!flag.load(memory_order_relaxed)); // (3)
assert(data == 42); // (4)
}

内存一致性模型

广义上的一致性模型包括Strict Consistency、Sequential Consistency、Causal Consistency、Processor Consistency、FIFO consistency、Cache Consistency、Slow Consistency、Release consistency、Entry Consistency、General Consistency、Local Consistency等。在std::atomic中提供了六种内存模型(memory order)来描述不同线程之间相同/不同数据的读写的顺序。
顺序一致性(sequential consistency, SC)是最强的模型,要求程序中的行为从任意角度来看,序列顺序都是一致的(have single total order);这是在说这段多线程程序的行为和一段单线程程序的行为是一致的,类似于不是并行的并发。这个模型禁止了任何四种类型的读写重排,因此我们可以认为SC下每次读到的都是最新值。在C++ Concurrency in Action中举了一个例子,使用四个线程运行下面四个函数,断言无论如何z永远不可能为0。这说明了在read_x_then_yread_y_then_x两个函数至少有一个能看到xy同时被设为true。容易看出将1/2/3/4任意排序,那么上面的断言是显然的(注意到即使read_x_then_ywrite_x前被调用也有while循环兜底)。但是如果(1 -> 3)(2 -> 4)这两个步骤并行发生的话,上面断言就不成立了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
std::atomic<bool> x = false, y = false;
std::atomic<int> z = 0;
void write_x()
{
x.store(true,std::memory_order_seq_cst); // 1
}
void write_y()
{
y.store(true,std::memory_order_seq_cst); // 2
}
void read_x_then_y()
{
while(!x.load(std::memory_order_seq_cst)); // a
if(y.load(std::memory_order_seq_cst)) // 3
++z;
}
void read_y_then_x()
{
while(!y.load(std::memory_order_seq_cst)); // b
if(x.load(std::memory_order_seq_cst)) // 4
++z;
}

自由模型(Relaxed ordering)是最弱的模型,它对线程间的执行顺序不做任何synchronizes-with的假设,但同线程的变量仍然遵循happens-before假设,即它除了禁止重排单线程上对单个变量的访问顺序,并不作任何额外的事情。下面展示的一个C++11对应的自由模型std::memory_order_relaxed的例子,注意在后面的详述中可以看到,由于特定平台的一致性模型要强于自由模型,所以std::memory_order_relaxed只是保证强于等于自由模型。注意到这时候仍然保证了1先于2、3先于4,且3读到true,但是z就可能为0了。这是由于xy是两个不同变量,自由模型不关注它们之间的关系。关注一下图5.4,我们发现这张图非常反直觉,例如4步骤还会返回false,这是出于什么机理呢?但是在这之前,先在x86下的MSVC2015上测试一下,发现z始终不为0,这是为什么呢?我在StackOverflow上提了个问题。回答首先指出std::memory_order_relaxed的副作用实际上是禁止编译器(注意区分编译器的重排行为和处理器的乱序行为)重排xy的Store-Store,但是断言失败还可能由于CPU决定颠倒写x和写y的顺序(虽然一般不会进行这种乱序),或者CPU的缓存导致了x延迟写入内存。回答接着解释了为什么x86上不会assertion fail,这是因为x84提供了acquire/release语义,保证了当3是true时,在2前的对3后的可见。注意到这个并不违反x86对Store-Load可能的乱序,它实际上利用了Store-Store不会乱序的特性。回顾之前的最严格的顺序一致模型,它要求对于每个共享变量,Load-Load、Load-Store、Store-Load、Store-Store都不乱序。

1
2
3
4
5
6
7
8
9
10
11
12
13
std::atomic<bool> x = false, y = false;
std::atomic<int> z = 0;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
y.store(true,std::memory_order_relaxed); // 2
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3
if(x.load(std::memory_order_relaxed)) // 4
++z;
}

acquire/release模型相对灵活一点,也是x86实现的语义。release可以理解为写操作,acquire可以理解为读操作。acquire fence要求其后面的RW不能与其前面的R重排,也就是RW不能重排到(原本在自己前面的)R前,例如R1 W2不能变成W2 R1,否则R1读到的就是脏值的。release fence要求其前面的RW不能和其后面的W重排,也就是RW不能重排到(原本在自己后面的)W后,例如R1 W2不能重排为W2 R1,否则R1又脏读。但是这两个模型即使组合起来也不能禁止其前面的W和后面的R重排。
C++11使用剩下四个内存模型常数来实现这一机制,memory_order_releasememory_order_acquire表示B线程在使用memory_order_acquire读时,线程A在memory_order_release前的所有写操作都是可见的。而稍弱一点的memory_order_releasememory_order_consume只用来保证当前操作涉及到的对象的可见性。

Store-Load乱序问题

在前面的讨论中提到了x86的Store-Load乱序问题,对于x86,Loads May Be Reordered with Earlier Stores to Different Locations,但对于相同地址则不会乱序
这篇博文中记录了一个有关Store-Load乱序的实验。在实验中,X的写操作可能被延迟到Y的读操作之后,尽管我们插入了compiler barrier。这时候我们需要一个full/general memory barrier,也就是实现让它前面所有的Load/Store操作对它后面的Load/Store操作都是可见的,包括了止Store-Load类型的乱序。在同一篇博文中指出可以使用插入一个mfence,以实现full/general memory barrier。

1
2
3
4
5
// gcc
asm volatile("mfence" ::: "memory")
// c++11
// https://stackoverflow.com/questions/25478029/does-atomic-thread-fencememory-order-seq-cst-have-the-semantics-of-a-full-memo
std::atomic_thread_fence(std::memory_order_seq_cst)

SoF上指出虽然atomic(seq_cst)atomic(seq_cst)始终不会重排,但是在atomic(seq_cst)附近的non-atomic甚至是atomic(non-seq_cst)形式的STORE-LOAD都会被重排。例如在下面的代码中1和3的STORE-LOAD肯定不会被重排,但2和3的STORE-LOAD就可能被重排,所以一定要注意。

1
2
3
4
std::atomic<int> a, b, c;
a.store(2, std::memory_order_seq_cst); // 1: movl 2,[a]; mfence;
c.store(4, std::memory_order_release); // 2: movl 4,[c];
int tmp = b.load(std::memory_order_seq_cst); // 3: movl [b],[tmp];

因此在x86上至少要对LOAD/STORE其中的一个加上MFENCE,或者用一个LOCK指令,而这也实现了类似memory_order_seq_cst的效果。在章节内存模型的实现中还有更多说明。
下面的代码不一定是等价的

1
2
3
4
5
6
7
8
9
10
atomic<int> x, y

y.store(1, memory_order_relaxed); //(1)
atomic_thread_fence(memory_order_seq_cst); //(2)
x.load(memory_order_relaxed); //(3)

atomic<int> x, y;
y.store(1, memory_order_seq_cst); //(1)
// Nothing
x.load(memory_order_seq_cst); //(3)

fence

常见的fence包括thread fence(memory/CPU barrier)和signal fence(compiler barrier)。

signal fence

signal fence类似下面的东西,参考Wikipedia

1
2
3
4
5
6
// gcc
asm volatile("" ::: "memory");
// msvc
__MACHINE(void _ReadWriteBarrier(void))
// c++11
std::atomic_signal_fence(memory_order_acq_rel)

对于gcc版本,asmvolatilememory三个关键字的作用可以参考SoF上的回答
对于MSVC版本,根据MSDN_ReadWriteBarrier限制了编译器可能的重排。C++11标准库中的std::atomic_signal_fence在MSVC上也是利用Compiler Barrier实现的。
一个signal fence的作用是

  1. 强制单线程和该线程上的异步中断之间的顺序性
  2. 强制单核上运行的多线程之间的顺序性
    注意到在SMP架构下这一点难以保证,所以对于多线程程序往往需要更强的thread fence。

thread fence

thread fence也就是所谓的内存屏障,我们可以使用下面的语句进行声明,此外,我们还可以声明一个单独的Store/Load Barrier。这里补充一下Store Barrier强制所有屏障的store指令,都在屏障指令执行之前被执行,并把store缓冲区的数据都刷到主存。Load Barrier强制所有屏障的load指令,都在屏障指令执行之后被执行,并且一直等到load缓冲区被该CPU读完才能执行之后的load指令。而一个full barrier兼而有之。

1
2
3
4
5
6
7
8
9
10
// x86
asm volatile("mfence":::"memory")
// gcc
__sync_synchronize
// msvc
MemoryBarrier()
// c++11
std::atomic_thread_fence(memory_order_seq_cst)
// other methods
_mm_mfence

我们需要注意的是内存屏障是相当耗时的操作,甚至还要超过原子操作,内存屏障还会干扰CPU的流水线,导致性能的降低。下面我们查看一下标准库atomic_thread_fence函数的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// MSVC
inline void _Atomic_thread_fence(memory_order _Order)
{ /* force memory visibility and inhibit compiler reordering */
#if defined(_M_ARM) || defined(_M_ARM64)
if (_Order != memory_order_relaxed)
{
_Memory_barrier();
}
#else
_Compiler_barrier();
if (_Order == memory_order_seq_cst)
{ /* force visibility */
static _Uint4_t _Guard;
_Atomic_exchange_4(&_Guard, 0, memory_order_seq_cst);
_Compiler_barrier();
}
#endif
}
// GCC
inline void
atomic_thread_fence(memory_order __m) noexcept
{ __atomic_thread_fence(__m); }

可以发现由于x86自带的acquire/release语义,除非是最强的memory_order_seq_cst,否则atomic_thread_fence等价于atomic_signal_fence。而memory_order_seq_cst下的thread fence的full barrier实现则比较奇特,仔细查看这个full barrier的实现这里为啥不直接插入一个MemoryBarrier(),而是利用了一个原子操作呢?

内存模型的实现

六种内存模型通过加入Compiler Barrier和Memory Barrier来实现。
SoF中指出acquire/release相当于在relax后面加一道栅栏,即下面的代码是等价的。但如果不显示加入fence的话,编译器可以视情况生成等价的更好的代码。而在x86等强内存模型架构cpu上,也不一定生成fence,例如单独的atomic_thread_fence(memory_order_acquire)就可以简化为nop。

1
2
3
4
5
// 1
a.load(memory_order_acquire)
// 2
a.load(memory_order_relaxed)
atomic_thread_fence(memory_order_acquire)

memory_order_seq_cst则需要额外的MFENCE或者LOCK,可参考上节所述。
下面的代码是等价的

1
2
3
4
5
6
7
8
9
10
if (var.load(std::memory_order_acquire) == 0)
{
assert(a==123);
}

if (var.load(std::memory_order_relaxed) == 0)
{
std::atomic_thread_fence(std::memory_order_acquire);
assert(a==123);
}

使用内存模型解决双重检查锁定模式(DCLP)存在的问题

在之前的章节中,我们曾经提到过双重检查锁定模式中存在的问题,对此文章Double-Checked Locking is Fixed In C++11指出我们可以通过适当的内存屏障或者atomic store/load语义来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::atomic<Singleton*> Singleton::m_instance;
std::mutex Singleton::m_mutex;

Singleton* Singleton::getInstance() {
Singleton* tmp = m_instance.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
if (tmp == nullptr) {
std::lock_guard<std::mutex> lock(m_mutex);
tmp = m_instance.load(std::memory_order_relaxed);
if (tmp == nullptr) {
tmp = new Singleton;
std::atomic_thread_fence(std::memory_order_release);
m_instance.store(tmp, std::memory_order_relaxed);
}
}
return tmp;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::atomic<Singleton*> Singleton::m_instance;
std::mutex Singleton::m_mutex;

Singleton* Singleton::getInstance() {
Singleton* tmp = m_instance.load();
if (tmp == nullptr) {
std::lock_guard<std::mutex> lock(m_mutex);
tmp = m_instance.load();
if (tmp == nullptr) {
tmp = new Singleton;
m_instance.store(tmp);
}
}
return tmp;
}

无等待

无等待(Wait-Free)是比锁无关更高层面的并发。无等待指的是程序中的每个线程都可以一直运行下去而不阻塞。

并发编程中数据共享措施

常见的并发模型有共享变量、Communicating Sequential Process(CSP)、Actor等模型。共享变量最为常见,多个线程通过锁或者原子操作对变量进行有序访问。CSP模型可以参照Go语言中的channel。Actor模型可以参照Mapreduce模型。

condition variable

CV和mutex

lock_guardunique_lock能够保证线程之间的互斥访问临界资源,但是不能控制这些访问的先后顺序。自然而然地可以想到可以用锁维护一个共享变量记录状态,以实现线程间同步的措施,例如在生产者/消费者模型中用它来描述产品数量。一个比较naive的方式是轮询(poll),注意在实现时仍然是需要锁的,否则可能两个互相竞争的线程同时获得flag。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool flag;
std::mutex m;

void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再锁互斥量
}
flag = false;
}

另一种更好的方法是利用条件变量,条件变量(condition variable)是利用共享的变量进行线程之间同步的一种机制。条件变量维护一个等待列表,相对于前面的轮询,条件变量应用了推的机制,当某一个线程所需要的条件不满足时,它会被阻塞。拥有锁的线程在退出临界区时使用notify_one/notify_all发出信号通知条件变量,条件变量会唤醒一个/所有正在等待的线程。
使用条件变量等待事件通常是类似下面的形式,容易发现期间需要解锁,所以这里只能用unique_lock而不能用lock_guard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void wait_for_event()
{
std::unique_lock<std::mutex> uni_lock(mtx);
while (!condition)
{
// 调用wait后发生的操作:
// 1. 将线程挂到等待队列上
// 2. 释放锁(前面说过带着锁睡觉会死锁)
// 3. 进入睡眠阻塞线程
cv.wait(uni_lock, [](){return condition;}); // wait until condition
// wait返回时会重新获得锁
}
}
void signal_event()
{
std::unique_lock<std::mutex> uni_lock(mtx);
condition = true;
cv.notify_one();
}
void broadcast_event()
{
std::unique_lock<std::mutex> uni_lock(mtx);
// 在notify前需要在锁保护下修改condition
condition = true;
// 理论上notify的时候不一定要拥有锁,但是性能上可能差一点
// 原因是 https://stackoverflow.com/questions/4544234/calling-pthread-cond-signal-without-locking-mutex
cv.notify_all();
}

下面我们回答两个问题:

  1. 为什么使用mutex的时候要CV
    我认为这是因为从逻辑上来讲,CV提供了一种新的同步原语。如果我们使用mutex来进行同步,其实也是可以的。如下面代码所示,我们两个线程竞争一个mut,以类似轮询的方式来获得同步,不仅很耗资源,而且从逻辑上不够整齐。而CV把consumer的轮询变成了阻塞wait,以provider的通知notify来唤醒,节省了资源,从逻辑上也有了一个完整过程的抽象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def provider(mut):
    while 1:
    lock(mut)
    count++
    unlock(mut)
    def consumer(mut):
    while 1:
    lock(mut)
    if count:
    count--
    unlock(mut)
  2. 为什么使用CV的时候要mutex
    首先我们看signal的源码,我们修改了维护的condition,然后notify。我们知道condition是一个临界变量,所以必须要用锁来保护。

    另一个原因是cv.wait对应的一系列步骤必须要是原子的,否则会造成丢失signal的问题。例如当条件满足!condition时程序应当释放锁并阻塞在CV上进入睡眠,这个过程必须是原子的。我们考虑在释放锁之后另一个线程上的一个signal获得了锁,并发送了signal,但此时本线程上还没有来得及阻塞在CV上,这个信号就不能被本线程收到了。这一现象广泛出现在使用边缘触发(Edge triggered)机制的程序中,例如Linux的信号(Linux的信号属于一种边缘触发),pthread_cond_系列也是边缘触发。相对于水平触发,边缘触发只会唤醒已经等待在wait上的线程,因此可能出现丢失信号的问题,例如当notify操作早于wait操作时,这个notify就会丢失了。注意到上面的实现中,cv.notify_onecv.notify_all函数始终是出现在修改condition之后的,这也是为了保证当睡眠线程在收到信号后能够及时观察到条件满足了。

关于使用CV时使用锁的错误用法在陈硕大牛的博客中还指出了更多的例子。例如不为signal部分上锁是错误的,因为可能在wait部分在检测到condition不满足准备pthread_cond_wait睡眠前,另一个线程修改conditionpthread_cond_signal,这样进入pthread_cond_wait的wait部分代码就会丢失这次的signal,如下代码所示。所以我们在signal部分加锁的目的是为了确保自己现在可以signal。

Process A                             Process B

pthread_mutex_lock(&mutex);
while (condition == FALSE)

                                      condition = TRUE;
                                      pthread_cond_signal(&cond);

pthread_cond_wait(&cond, &mutex);

不过即使为signal部分上锁还可能丢失信号。原因是生产者和消费者竞争同一把锁虽然能够保证wait和signal是串行的顺序,但可能整个signal过程都在wait过程前面。

虚假唤醒

使用条件变量时可能发生虚假唤醒/伪唤醒(spurious wakeup)的问题,虚假唤醒指的是被wait中阻塞的线程在没有notify的情况下被唤醒,或者一次notify_one唤醒多个线程(惊群效应)。虚假唤醒可能发生在多处理器系统和接收Linux信号时,条件变量设计者出于性能因素容忍了虚假唤醒的存在。APUE指出pthread_cond_signal函数可能唤起多个线程。SoF中指出,当等待队列中的Linux线程收到一个系统信号时,会得到虚假唤醒。在另一个回答中,Jon Skeet大神指出其深层次原因是没有任何的保障是一个被唤醒(awakened)的线程一定会被调度(scheduled),很可能当一个等待队列中的线程被唤醒后准备获得锁时,另一个线程已经捷足先登了获得了锁,并且重置了条件condition的值。但注意,即使是虚假唤醒的情况,cv.wait也是在获得锁之后再返回,但这时候条件condition可能已经不满足了,这时候就出现了虚假唤醒。解决虚假唤醒的方案很简单,如上文wait_for_event所示,可以将wait包裹在一个while循环里面。在SF上记录了一番实验,强行产生虚假唤醒。

1
2
3
4
5
con_var.notify_one();
// trigger the spurious wakeup
lock.unlock();
std::this_thread::sleep_for(std::chrono::seconds(2));
condition = true;

可以发现在notify和unlock两个过程之后的两秒内消费者线程已经被唤醒了,但在拿到锁和条件变量后它发现其实condition值并不为true,这就产生了一次虚假唤醒。
在使用notify_all()唤醒时需要注意惊群问题,如果我们只需要唤醒任意一个线程,那么notify_one()就行了,但是如果我们希望唤起特定的几个,或者我们不清楚需要唤醒几个,就得使用notify_all()唤醒全部。前者的情况类似于accept,当多个线程的套接口listen同一个fd时,内核只notify等待队列最顶部的套接口。后者的情况类似epoll,内核不知道究竟有多少个线程需要被唤醒。

事件

Windows中通过事件的机制类似于条件变量的机制。

信号量的实现

在POSIX和Linux中分别提供了两种信号量的实现方式。
其实我们也可以方便地通过mutex+cv+counter来实现信号量。

future

条件变量的一个重要的应用就是生产者/消费者模型,但对于一些平凡的情况,消费者只等待一次(one-off)来自生产者的结果。一个普遍的场景是启动一个计算线程并异步获取它的结果,不过std::thread不能直接提供获取返回值的方法,此时可以考虑使用全局变量或者传入指针和回调函数。
另一个较为方便的做法是使用std::future来获取异步任务中的结果。从一定意义上讲,future类似于一个callback。C++中提供了std::futurestd::shared_future,可以触发一个或多个事件。当多个线程访问std::future时,需要锁来保护线程安全。

async

如下面代码所示,future常和std::async一并使用,容易看到,它类似于Python中的subprocess.call,是个高层面的封装。

1
2
3
4
5
6
int main()
{
std::future<int> the_answer = std::async(get_integer());
do_other_stuff();
std::cout<< the_answer.get() << std::endl;
}

这里面的std::async用来实现一次异步调用。std::async是一个模板函数,它可以接受一个std::launch类型的参数,其中使用std::launch::defered表示异步调用延迟到.wait().get()再执行。而std::launch::async表示在一个独立线程中运行。默认是std::launch::defered | std::launch::async表示这两个二选一。std::async能够接受函数指针、函数对象(的左值、右值。引用和std::ref),也能够通过类似std::bind一样的机制以引用、std::ref等方式传入对象的上下文。
注意在仅C++11中,future的析构函数可能会阻塞线程

packaged_task

不同于std::asyncstd::packaged_task是个函数对象,这个函数对象有点类似std::function(但std::function还能够复制构造),因此std::packaged_task需要手动调用以运行。

1
2
3
4
5
6
7
std::packaged_task<int()> task(sleep);
auto f = task.get_future();
// 在主线程中运行
task();
// 启动另一个线程运行,注意只能移动packaged_task
std::thread myThread(std::move(task));
f.get();

由于std::packaged_task能移动到某个std::thread中,因此适合用来实现线程池,负责打包待计算的任务。std::packaged_task还可以被用来向另一个线程派发任务。

promise

在上面的两种派发任务-等待获取结果的模型中,我们的主线程是作为等待任务结果的一方,而将执行任务的异步线程则是产生结果的一方。std::future对象由主线程持有,主线程会调用std::async或者packaged_task启动一个任务,并且在需要结果时调用f.get()获得异步线程的结果。对于异步线程来说,自己并不需要额外工作,正常返回。但是这在promise中就不一样了,在使用promise时,异步线程需要显式向主线程设置值p.set_value()或传递异常p.set_exception()

future.then

future.then目前还没有进入C++标准,其风格类似于CPS和Haskell中的Monad,相比先前介绍的三种机制,future.then利用了回调链的机制避免了手动的同步。

co_await、co_yield、co_return

future.then机制降低了流程操控的复杂度,不过写起来仍然很繁琐,而且存在回调地狱的问题。在链式调用的过程中,整个流程难以在中间直接abort,为此Rust先后提出了try!?.这样的机制。截至目前这三个函数尚未进入C++标准,但这种由CS流行开来的异步编程范式其实十分优雅,我们将在一个单独的专题进行讨论。

并发编程中的基础架构

在设计高性能的并发代码时,我们需要注意以下几点:

  1. 充分利用局部性假设,是同一线程中的数据紧密联系
  2. 减少线程上所需的数据量
  3. 让不同线程访问不同位置,避免伪共享,这里也是将内存分配对齐到缓存行大小的一个重要原因。

线程安全的队列

线程池

一个简单的线程池的实现需要借助一个线程安全队列。