并发编程重要概念及比较

在本篇中比较了各种并发、并行技术。
并发(concurrency)强调的是逻辑上的同时发生,在实际上并发程序可能是由一个处理器同时(simultaneously)处理多个任务,因此并发过程中常出现一个线程等待其他资源的情况。此时常伴随着线程的阻塞和调度。并发过程通常可划分为多个级别:Blocking、Obstruction-Free、Lock-Free、Wait-Free,其中后三种统称为Non-blocking的。
并行(parallelism)属于并发,并行强调这两个并发事件实际上也是同时发生的,例如在多个处理器上运行的多个任务。
【未完待续】

处理器层面的并发

这里不详细论述处理器层面的并发相关,例如SMP架构、流水线、分支预测、多级缓存等技术。

流水线

CPU流水线的目的是为了提高吞吐量(throughput),但会增加每条指令的延迟(latency),这是因为执行一条指令需要经过更多的流水线寄存器。特别地,一个较深的流水线可能带来很大的流水线寄存器延迟,从而制约了吞吐量能提高的上限。根据CSAPP,一条指令在CPU中被执行会经过取指(fetch)、译码(decode)、执行(execute)、访存(memory)、写回(write back)、更新PC(在SEQ+中和取指进行了合并)等阶段。容易想到在某一时钟周期内,每一个阶段都可以独立运行。例如当指令PC在执行阶段时,我们可以对指令PC+1进行译码,这样译码器就不会闲置,这就是流水线化的一个简单思路。CPU流水线中通过暂停(stalling)和转发(forward)/旁路(bypassing)的机制维护了指令级别的并发。

CPU的缓存和缓存一致性

x86往往都有高速缓存Cache,而且有多级。高速缓存基于静态RAM(SRAM)技术,区别于主存的动态RAM(DRAM)技术。现代CPU中寄存器与内存之间没有直接的渠道,而必须通过多级的高速缓存才能到内存。高速缓存的作用依然是为了弥补CPU和内存在速度上的差异,高速缓存提高效率的原理是基于时间局部性和空间局部性,也就是说被引用过一次的存储器位置很可能在不远的将来再次被引用,而存储器中的某一个地址被引用过,那么它附近的地址很可能也会被使用。在CSAPP中专门对高速缓存有将近一章的讨论。
虽然高速缓存对用户来说是透明的,我们的代码要不直接操作寄存器,要不直接操作内存,但它并不是不存在,如果深究下去会发现如何保障内存和对应CPU缓存的同步是个问题。但是我们不要担心诸如此类的“一个核心写另一个核心脏读”的情况,x86会在一个CPU修改高速缓存行后自动作废其他CPU的高速缓存行,而缓存一致性协议能够解决内存和多核CPU缓存之间的同步性问题。

缓存一致性和volatile

既然我们CPU总是能读到内存里面最新的值,那为啥还需要volatile呢?原因有二

  1. CPU不一定会读缓存
    这时候要注意区分CPU缓存一致性和volatile之间的关系。例如出于优化的角度,编译器可能把一个在内存的值放到到寄存器里面,以避免访问内存,既然不访问内存那和缓存一致也没啥关系了。但这样就会出现问题,如果某个线程修改了这个值对应的内存,那么寄存器是不知道的,所以这时候volatile强制说不要在寄存器里面读啦,直接从内存里面读,这时候缓存就能发挥应该有的作用了。所以CPU缓存一致性解决的是CPU的Lx缓存和内存之间之间的问题,而不是CPU寄存器和内存之间的问题
  2. 有的缓存一致性也不保证是任意时刻的
    经过简单的了解,CPU缓存行的写操作也分为直写(write though)和回写(write back)两种策略。对于直写来说,确实可以做到任意时刻各缓存中的内容等于内存中内容;但回写就不一定是任意时刻了,因为它并不是立即更新缓存,而是值修改本级缓存,而将对应缓存标记为脏段,只有当所有脏段被会邂逅我们才能达到一致性。具体还可以参看缓存一致性

缓存一致性协议

MESI是实现缓存一致性的一个基础协议,MESI分别表示缓存行可能处于的四个状态Modified、Exclusive、Shared、Invalid。其中Intel使用了扩展的MESIF,增加了状态Forward,而AMD使用了MOESI,增加了状态Owned。MESI协议需要正确处理local read(LR)、local write(LW)、remote read(RR)、remote write(RW)四个场景,其中local表示处理器对本地缓存读写,而remote则表示处理器需要对主存读写。一个缓存行Cache0初始状态为I,因为它并不缓存任何数据。当本地处理器向该缓存行请求时,CPU会向其他缓存行询问,如果存在缓存行Cache1拥有该缓存,则将对应缓存行设为S状态,表示目前有多个缓存行缓存有该数据;否则从内存中加载到Cache0,并设为E状态,表示目前只有一个缓存行缓存有该数据。当本地处理器写入时,缓存行状态变为M,此时缓存与主存之间的数据不一致,CPU通知所有对应的其他的缓存行失效,状态变为I。

伪共享

伪共享(False Sharing)是在MESI模型下多个线程对同一缓存行竞争写所导致的性能降低。我们考虑这篇博文中的一个场景一个数组int32_t arr[]被同时加载到CPU0和CPU1的L1缓存Cache0和Cache1上。现在两个线程A和B试图分别修改arr[0]arr[1]。这种情况下是不存在race condition的,但是可能导致伪共享。我们考虑初始情况下Cache0和Cache1都处于S状态,现在CPU0收到线程A的请求,写arr[0],Cache0状态变为M,而Cache1收到通知后也作废了自己的缓存行。接下来CPU1发起了写操作,根据MESI模型,CPU0会先将arr[0]写回内存,此时Cache0变为I,之后CPU1才能从内存重新读取,Cache1变成E,然后CPU1才能修改arr[1]

测试缓存大小

内核层面的并发

中断

在现在的多核CPU(特别是SMP架构)背景下,中断的机制和单核有所区别。在x86中使用的是高级可编程中断控制器(APIC),APIC由本地APIC和IO APIC组成,其中本地APIC与每个处理器核心对应,IO APIC负责采集和转发来自IO设备的中断信号。
根据Intel的规定,广义上的中断可分为同步中断和异步中断。同步中断又称异常,实际上是由CPU产生的,因此显然不能被屏蔽,异常分为故障(fault)、陷阱(trap)和终止(abort),异常对应到中断号的0-15。异步中断又称中断,分为外部非屏蔽中断(NMI)和外部可屏蔽中断(INTR),分别对应中断号的16-31和32-47。中断处理的原则是快,否则很容易被覆盖。在Intel中将非屏蔽中断也归入异常,所以异常一般为来自外设或CPU中的非法或故障状态。除了一些硬件故障,来自外部IO设备的中断常是可以等待的,所以属于可屏蔽中断,当IF标志为1时,CPU可以不响应可屏蔽中断,而是将它缓存起来,在开中断后会传给CPU。当CPU在响应一个异常时,所有的可屏蔽中断都将被屏蔽,而如果此时再出现一个异常,即产生了double fault故障,一般来说系统就会宕机。

软中断和硬中断

在操作系统如Linux中,中断还可以被分为软中断(内部中断)和硬中断,硬中断是来自硬件的中断,它可以是可屏蔽的,也可以是不可屏蔽的,软中断一般是由int指令产生的,由操作系统提供,在Linux中软中断对应于中断号48-255。软中断是不可屏蔽的(不然干嘛调用int呢),但在操作系统中软中断也可能是由一个硬中断产生的,例如一个来自打印机的硬中断可能产生一个软中断给内核中的相关处理程序,这就是所谓的中断推后处理机制。
Linux中硬中断是可嵌套的,也就是说它可以被非同种中断打断,Linux禁止来自同种类中断的打断,而是挂起后来的中断,这主要是为了防止重入现象的发生。Linux通过巧妙的办法来防止同种类中断重入。
如果不希望中断的嵌套的发生,可以进行关中断操作,因为它屏蔽的可屏蔽硬件中断具有抢占性。在关中断时要注意关中断会导致异步IO、调度等一系列依赖中断的功能失效,所以屏蔽中断后一定要尽快执行完临界区内代码。
在Linux内核中是允许在硬中断中出现中断嵌套的。这是指在中断处理函数handle_IRQ_event中开启了中断,但是在这个函数前的一条调用链中都是关中断的,可参考这篇文章。所以说只要非同种中断发生就可以抢占内核是不准确的。
研究Linux中的进程状态时,我们可以看到一个TASK_UNINTERRUPTIBLE,即不可中断的睡眠状态,这是指这个进程不响应SIGKILL等外部信号,因此必须是十分短暂的,在ps -aux命令中显示为D。顺带一提另一中不能响应外部信号的僵尸进程Z,僵尸进程的资源已经被全部释放,只留下包括task_struct在内的一点信息用来给父进程提供返回码等信息,如果此时父进程被阻塞而不能回收子进程,那么子进程就会进入僵尸状态。
Linux中,软中断是不能嵌套的,但可以被硬中断打断,所以可以认为硬中断具有更高的“优先级”,但Linux中并没有中断优先级的概念(不过也有个中断线程化的东西)。软中断可以在SMP的不同CPU上并行执行,Linux上每一个CPU都拥有一个中断栈。
Linux中使用一个softirq_action结构维护软中断

1
2
3
struct softirq_action {  
void (*action) (struct softirq_action *); /* 软中断的处理函数 */
};

可以看出,软中断实际上很像一个回调函数。那么软中断和回调函数之间有什么区别呢:

  1. 中断能够实现不同优先级代码的跳转
  2. 中断的重入性和回调函数不同

中断下半部和软中断

Linux的外部中断处理过程可以参考文章
中断延时处理和中断下半部分别是来自Windows和Linux的中断推后处理机制。
Linux中完整的中断处理包含上半部和下半部,其中上半部是真正的中断处理程序,它短小精悍,运行时需要关中断。
Linux中的中断下半部的实现有三种机制:Orignial Bottom Half机制、Task Queue机制、软中断Softirq机制、tasklet和工作队列,其中前两种已被替代。注意到很多博客提到众多下半部不属于中断上下文,但又有说软中断处理函数属于中断上下文,从而不允许休眠。但注意工作队列由内核线程eventX执行,允许被调度甚至睡眠

调度

任务

抢占式调度

被动式的调度指的是使用一个调度器决定哪一个任务会在下一刻运行,而不是由进程主动放弃处理器。调度器暂时停止一个任务并让另一个任务开始执行的行为就是抢占式(preempt)调度。抢占式调度分为用户抢占和内核抢占两种。
以Linux为例,调度器工作在内核态,所以用户抢占并不是在用户态下,而是发生在进程即将从内核态返回用户态时,这对应两种情况,从系统调用返回和从中断处理程序返回。
如果不开启内核抢占,那么进程在内核态的运行会直至结束(主动放弃/时间片耗尽/阻塞),这样的假设方便了内核的编写,特别是在单处理器(UP)下。考虑到在内核态中不存在进程上下文的切换,内核并不需要考虑对临界资源的竞争访问问题,而用户程序也可以假设在一次系统调用过程中不需要保护内核临界资源。但需要注意的是中断仍然存在,所以在进入临界区前还需要关中断。
通过内核抢占,系统允许高优先级的进程抢占低优先级的进程的内核态,这将能提高系统的实时性能。内核抢占是可以被关闭的,即所谓的关抢占的几种情况:

  1. 内核正在处理中断
    这时候也就是所谓的中断上半部,中断在操作系统中拥有最高的优先级
  2. 中断的bottom half
    通常有上文所述的三种方式,当内核执行软中断和tasklet是禁止内核抢占(注意此时可以被硬中断打断)
  3. 当进程持有自旋锁读写锁
    这实际上是为了保护临界资源,在有关自旋锁的讨论中会详细说明
  4. 当内核调度器scheduler正在运行时
  5. 内核操作Per-CPU data structures
    在SMP架构中,不同的CPU仍然会维护一些私有数据,此时抢占可能造成一个进程被调度到另一个CPU上去,此时Per-CPU变量就会发生改变
  6. 关中断
    这是一种特殊情况,关中断后抢占机制自然无法实现了

Linux通过内核抢占锁preempt_count来跟踪一个进程的可抢占性,当内核代码具有抢占性时,则调用preempt_schedule_irq进行内核抢占。
因此可以总结到内核抢占可能发生在中断处理程序返回到内核空间前、内核代码具有抢占性、内核代码显式调用调度器schedule、内核中的任务被阻塞。

可重入函数

考虑单线程模型下的并发,一个典型的模型是signal机制(借助于软中断实现)。一些条件会影响函数的可重入性,例如全局变量的使用,假如说在函数func涉及读写全局变量errno,在运行过程中被signal中断,而中断处理程序也会访问这个errno,那么当继续进行func时就可能读到无效的errno。容易联想到一些和硬件有副作用的函数也是不可重入的,如fprintfmalloc等。
异步可重入和线程安全是两个不同的概念,一般来说线程安全的函数不一定是可重入的,如malloc,而反之则可以使用锁机制来避免。所以我们可以体会到Linux提供的signal机制虽然能够实现异步,但是却十分不体面。

内核向外提供的并发设施

多进程

进程是UNIX设计模型中的一个重要部分,UNIX推崇以多进程+IPC的方式组成应用系统,充分贯彻了KISS的方针。在UNIX产生的年代,这种方式无疑是很健壮的。

进程模型

五状态进程模型包括新建、就绪(等待CPU)、阻塞(等待事件)、运行和退出。在五状态模型之外,还有挂起操作。挂起操作指的是将进程交换到外存,这常常是由于内存紧张或者该进程被阻塞的缘故。挂起不同于阻塞或者就绪,被挂起的进程犹如进入一个平行世界,当等待的事件到达时,它能够从挂起阻塞直接切换成挂起就绪。一个挂起的进程必须脱去挂起这层壳之后才能重新进入五状态模型,如一个挂起就绪态进程必须换回到内存切换成就绪态才能被调度。

同步与异步

在进程IPC中同步和异步是两种编程模型,描述了IPC中被调用者的行为。在同步模型中一个调用只有在等到结果时才返回,被调用者并不会进行通知。而异步模型中调用会立即返回,而由被调用者选择在结果达到后通过回调函数或者信号等机制进行通知。UNP书中还强调异步过程中用户不需要手动将数据从内核复制到用户(即不需要在被通知后调用read等函数)。
需要和同步异步进行区分的是阻塞和非阻塞的概念,这两个描述了调用结果未到达时调用者的状态。阻塞调用会直接睡眠线程。而非阻塞调用不会阻塞线程,这时候线程可以继续执行下面的逻辑,消息到达时线程收到一个异步信号或者回调,或者采用类似协程的方法,这时候适用异步非阻塞模型。当然线程也可以选择在原地自旋进行轮询,这就是同步非阻塞模型。
IO多路复用是一种特殊的同步模型,这是因为在poll函数中仍然需要手动将数据从内核复制回来,并且它们在消息到来前必须在一个循环中轮询,而不是立即返回,跳出循环。不过IO多路复用并不属于同步阻塞模型,因为当一个fd在等待结果时,线程可能在处理来自其它fd的返回结果。IO多路复用也不属于同步非阻塞模型,因为事实上线程还是会被阻塞的。
以UNIX套接口为例,SS_NBIOSS_ASYNC标志分别表示非阻塞和异步的选项,其中非阻塞套接口在请求资源不满足时会返回EWOULDBLOCK,而异步套接口则会通过SIGIO信号来通知进程。

多线程

从定义上看,进程是资源分配的最小单位,而线程是程序执行的最小单位。线程通常和同一程序下的其他线程共享一套地址空间,其中包含应用程序拥有的程序代码与资源,每个线程自己维护一套运行上下文。

Linux的线程模型

对Linux内核来说,线程和进程是不区分的,无论是fork()还是pthread_create(),最后都是调用do_fork(),不过是普通进程和轻量级进程的区别。pthread(POSIX threads)是POSIX下的一套线程模型,它运行在内核外。在Linux平台上,在glibc库源码的nptl目录下可以看到pthread的具体实现。NPTL(Native POSIX Thread Library)是POSIX线程模型的新实现,它的性能和稳定性方面都优于从前用进程模拟线程的的LinuxThreads。
fork出的子进程默认会拷贝父进程的一系列资源,包括内存(包括堆栈)、文件(Python的subprocess库中提供了关闭文件的功能)、信号设定、Nice优先级值、工作目录等

协程

协程(Coroutine)是具有多个入口点的函数,协程内部可通过yield进行中断,此时处理器可能被调度到执行其他的函数。虽然线程也可以被睡眠,睡眠本身涉及用户态与内核态的上下文切换,开销较大。对于套接字等IO密集型的应用,协程在提高CPU使用率上比线程轻很多。

stackful和stackless

stackless实现不保存调用栈和寄存器等上下文,因此效率比较高。
stackful即栈式构造,这时候协程拥有自己的堆栈等上下文。

约束线程间并发行为

线程间实现互斥与同步的任务常具有下面两种形式:

  1. 多个线程同时访问一个共享资源,需要维护该共享资源的完整性。这就是Race condition问题,将在本节讨论。
  2. 一个线程向另一个线程通告自己的结果/等待另一个线程的结果,这将在章节数据共享中讨论。

总的来说,为了实现同步,等待资源的一方可以处于用户态(忙等)或者内核态(睡眠),而获得资源的一方可以选择锁进行同步,或者使用原子操作保证自己访问不被打断。这分别下面的基于锁和原子操作的工具。Linux、Windows和C++11的标准库中都对这些工具提供了不同程度的支持,具体可参考文档

Race condition

当两个线程竞争同一资源时(但竞态条件也会出现在逻辑电路等地方),如果对资源的访问顺序敏感,就称存在竞态条件(race condition),导致竞态条件发生的代码区称作临界区。

volatile

在有关CPU缓存一致性的章节中,我们讨论的volatile、寄存器、主存和CPU缓存之间的关系。这里我们讨论语言(C/C++)层面的volatile的有关特性。
C++中的volatile并不以任何形式保证线程安全,它仅用来告知编译器期修饰的变量是易变的,要避免对其进行优化,这里所谓的优化常常是将变量缓存到寄存器中而不是每次都从内存读取。volatile并不蕴含禁止编译器或者处理器进行重排或乱序,我们需要通过编译器屏障或者内存屏障来实现这一点。
volatile关键字有时是有用的,例如可以在自旋锁中防止多次循环中始终读取寄存器中的值。但滥用volatile不仅不会提高程序的安全性,而且会导致程序变慢。对于以为可以加“volatile”就可以解决的问题,一般可以使用std::atomic来避免使用内核锁的开销。

内核锁

内核锁一般基于内核对象互斥量/信号量,它们通常是阻塞锁,会导致线程进入睡眠。锁的存在通常限制了并发范围,变并行访问为串行访问。在使用锁维护临界资源时应当争取让序列化访问最小化,真实并发最大化。

互斥量

在C++中一般不直接使用std::mutex,而使用lock_guardunique_locklock_guardunique_lock利用了RAII来自动管理加锁解锁操作,它们能够应用到包括互斥量的所有Lockable的对象上。unique_lock相比于lock_guard更灵活,用户能够手动地加/解锁,例如在条件变量中就需要unique_lock以便解锁,而在取得对临界资源后进行处理时也可以暂时解锁。unique_lock还是可移动的,以下面的代码为例,这里1处是一个直接返回lk,编译器可能进行NRVO,当然即使不这么做,2作为一个直接初始化操作,也可以接受get_lock返回的将亡值,从而完成移动构造std::unique_lock<std::mutex>。因此这里锁的控制权从get_lock转移到了process_data

1
2
3
4
5
6
7
8
9
10
11
12
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk; // 1
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock()); // 2
do_something();
}

一般会将mutex和临界资源一起放到一个类中进行管理,此时宜保证该临界资源是非public的,并且不会以引用或者指针的形式传出。这些场景例如成员函数直接返回指针或者引用,友元函数,或者一个接受函数指针P作为参数的函数,且P接受了临界成员的指针或引用,并将其泄露到类外(C++ Concurrency in Action),因此我们还要避免在持有锁时调用用户提供的代码。
虽然我们的愿景是希望最大化真实并发,因此要追求较小粒度的锁(small granularity),一个较小的粒度表现在锁所保护的临界数据较少,并且持有锁的时间较短,但粒度不可能无限小。例如考虑删除双向链表中的一个节点,需要修改三个节点的数据,如果对这三个修改单独加锁,其实等于没有加锁。因此一个直截了当的解决方案是在删除过程中锁住整个链表。如果是仍然希望为每个节点维护一把锁,那么对于删除操作必须获得被删除节点和其相邻的共三把锁,当重新连接节点时,必须在获得当前节点锁的前提下尝试获取下一节点的锁(但一旦持有了下一节点的锁就可以释放当前节点的锁了),以防后继节点产生变化。与此同时,我们还要考虑在遍历的时候需要对要访问的节点上锁,因此遍历时同样需要按照和删除相同的上锁步骤。
下面我们考虑一个线程安全的栈,其中实现了empty()top()size()push()pop()等常见方法。在多线程下,下面的代码中pop()可能使得top()的结果无效,这是因为在1和2两个方法可能有另一个线程执行了pop()。容易看出无论这些方法内部怎么加锁都无法避免这种情况,因为这里的竞态发生在这些方法之间,C++ Concurrency in Action特别指出这属于接口设计的问题。在后面内存模型的部分,我们能看到类似的问题,原子操作虽然避免了竞态,但原子操作之间可能存在的乱序必须要被考虑。书中还指出了另一个更严重的2和3之间竞争的错误,假设有两个线程并行执行该段代码,我们期望的顺序是top[1] -> pop[1] -> top[2] -> pop[2],中括号表示执行该方法的线程。然而实际执行顺序可能是top[1] -> top[2] -> pop[1] -> pop[2]。这就导致了从栈顶开始的两个元素有一个被处理了两次,另一个完全没有被处理。一个简单的解决方案是将这两个调用合并成一个带返回值的pop,使用一个mutex来管理,但Tom Cargill指出这处理方式是有问题的。这涉及到为什么标准库在设计时选择将top()pop()两个方法,原因是可能发生在成功将元素pop后而拷贝函数失败,这个元素就被丢失,所以先取top()pop()保证了top()失败情况下可以选择不执行pop()

1
2
3
4
5
6
stack<int> s;
if (! s.empty()){ // 1
int const value = s.top(); // 2
s.pop(); // 3
do_something(value);
}

死锁

使用互斥量的另一个问题是死锁,这时候锁机制避免了竞态,却可能产生线程对锁的竞争,即线程间互相等待对方的锁。死锁的产生满足四个条件:互斥、占有且请求、不可抢占和循环等待。其中等待且请求条件很关键,当完成一个操作需要获得两个及以上的锁时,死锁往往就会发生。为了解决死锁就要想办法破坏它的四个条件之一。从占有且求的角度来解决,我们可以借助于Dijkstra的银行家算法。在C++11中,我们可以使用标准库函数std::lock来同时上锁,不过现实情境下我们往往难以在申请锁时就确定自己需要哪些锁。从破坏循环等待条件的解读来设计的一个解决方案是永远按照一个顺序来获得锁,以哲学家就餐问题为例,我们将筷子进行编号,并约定哲学家们总是先尝试获得编号较低的筷子,用完后总是先释放编号较高的筷子,这样就能避免死锁问题。注意到约定哲学家们总是先拿起左手边的筷子,再拿起右手边的筷子恰恰会导致死锁问题,因为在这里我们并不是对每一个哲学家的操作指定一个相对的规则,而是为所有的资源(锁)的获取直接指定一个绝对的顺序。于是我们发现有时候去确定一个顺序并不是很容易。对于swap函数来说,我们可以按照参数的顺序来加锁,例如先获得第一个参数的锁,再获得第二个参数的锁。可惜这个是相对的,例如考虑如下面代码所示的两个规则,容易发现这两个线程并行执行时死锁就会发生了。

1
2
3
4
// thread 1
swap(a, b);
// thread 2
swap(b, a);

为了解决这个问题,一个方法是对每个对象求Hash,从而进行排序,另一种是借助于std::lock函数。这个函数的作用是将多个互斥量同时上锁(失败时则抛出异常并释放已经获得的锁),下面代码展示了一个线程安全的swap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}

friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m, rhs.m); // 1
// std::adopt_lock告知这个lock_guard已获得锁
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // 3
swap(lhs.some_detail, rhs.some_detail);
}
};
// 注意1/2/3也可以换为以下代码
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock); // unique_lock 不对互斥量上锁
std::lock(lock_a, lock_b); // 互斥量在这里上锁

std::lock的实现借助了try_lock_Try_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template<class _Lock0, class _Lock1, class... _LockN> inline
void lock(_Lock0& _Lk0, _Lock1& _Lk1, _LockN&... _LkN)
{ // lock N mutexes
int _Res = 0;
while (_Res != -1)
_Res = _Try_lock(_Lk0, _Lk1, _LkN...);
}

template<class _Lock0> inline
int _Try_lock(_Lock0& _Lk0)
{ // try to lock one mutex
if (!_Lk0.try_lock())
return (0);
else
return (-1);
}

template<class _Lock0, class _Lock1, class... _LockN> inline
int _Try_lock(_Lock0& _Lk0, _Lock1& _Lk1, _LockN&... _LkN)
{ // try to lock n-1 mutexes
int _Res;
// 如果第一个锁_Lk0直接失败,则返回失败0
if (!_Lk0.try_lock())
return (0);
try{
// 否则递归地尝试获取第二个锁_Lk1,如果失败则解开第一个锁并修改_Res为失败0
if ((_Res = std:: try_lock(_Lk1, _LkN...)) != -1)
{ // tail lock failed
_Lk0.unlock();
++_Res;
}
}catch(...){
// 如果出现异常同样解开第一个锁并返回失败0
_Lk0.unlock();
throw;
}
return (_Res);
}

值得注意的是引起死锁的并不一定是锁,而可以扩展到造成互相等待的其他情况,例如一组线程互相join等待对方,相互阻塞,这导致整个程序无法往下运行,或者线程在持有锁时同时等待其他线程。
另一种解决死锁的办法从锁的角度,为锁提供层级。一个已持有低层级锁的线程是不能试图获得高层级的锁的,试图违反这一约定的行为将导致抛出异常或终止程序。注意到不能同时持有相同层级上的锁,所以这些互斥量往往形成一条链。一个层次互斥量hierarchical_mutex的实现可以对每个线程使用一个thread_local全局变量进行维护当前线程所持有的锁的层级,默认取UINT_MAX,这样线程可以获得任何层级的互斥量。
但即使可以避免死锁也要注意锁的粒度(保护数据规模与持有时间)对性能的影响,一个总的原则是一个锁应当被持有尽可能少的时间,并且在持有过程中只应该去完成必要的工作。特别地,持有一个锁的同时等待另一个锁,即使不造成死锁,也要考虑其性能问题。以判定两个int是否相等为例,在先前我们看到了一个swap函数的实现方案,同时对两个互斥量进行加锁。但这里考虑到实际上int非常小,所以比较好的是分别对两个int加锁,复制副本,并比较两个副本,从而避免同时持有两个锁。注意和前面top()pop()所遇到的问题一样,在对intA和intB的读操作(LOAD)间可能发生另一个线程对intA的写操作,导致先前读取到的是旧值。

自旋锁

自旋锁是一种忙等锁(busy waiting),它适用于短时间锁定某个资源,这样可以避免内核锁所需要的线程睡眠(两次线程上下文切换)等一系列的开销,但持有过长的自旋锁会导致浪费大量CPU资源。特别是在单核CPU上,自旋锁的使用需要审慎考虑,因为在单核CPU上同一时间只能运行一个线程,这时候如果等待锁的线程先运行,那么它势必进入空等直到时间片用完,因为获得锁的线程势必不能运行以释放锁。因此在单核CPU上使用内核锁进入睡眠是一个好的选择。自旋锁常被用在对称多处理器(SMP)系统中,在多CPU的情况下保护临界区。

自旋锁与中断处理

处理中断时不能使用互斥量、信号量等让线程进入睡眠的锁,因此自旋锁常用于内核中有关中断处理的部分。内核锁必须在进程上下文中才能使用,这里的关闭中断的目的是为了关闭调度,因为关闭了时钟中断(时钟中断是可以被关闭的),调度器就无法运转了。这样就产生了睡死的现象。

持有自旋锁时不能进入睡眠

自旋锁适用于不能睡眠的场景,但双向地来说,持有自旋锁时也不能进入睡眠,否则会引起死锁。
为了理解原因,首先要了解为什么自旋锁常伴随关中断关抢占。Linux中提供了各个品种的自旋锁操作函数。其中spin_lock系列的关闭了抢占而不关中断,而spin_lock_irqsavespin_lock_irqspin_lock_bh会一道把中断也关了。
关抢占的原因是如果一个低优先级的线程A获得自旋锁而紧接着被一个高优先级的进程B抢占,那么会造成这两个线程死锁,直到时间片用尽,也就是所谓的优先级反转(优先级倒置)现象,会严重影响性能。
关中断的原因是如果一个进程A获得自旋锁然后被一个中断打断,如果这个中断处理器也试图获得同一个自旋锁,那么就会造成在中断内部的死锁(自旋锁不能嵌套上锁,否则会造成自死锁现象),并且中断处理无法被抢占(但可以被其他中断打断)。可以参考文章知乎
既然使用自旋锁应当关闭中断或者调度,那么原因就很明显了,如果进程A获得了自旋锁并且阻塞在内核态,此时内核调度了进程B(阻塞可导致调度),而B也试图获得自旋锁,那么B将永远自旋,A将永远睡眠,这类似于开中断时在中断内的死锁情况,不过在这种情况下仍有可能B时间片用完从而再次重新调度。此外另一种解释认为对于不关中断的自旋锁在睡眠后可能会被重新调度,从而造成自死锁的现象。
这种思想同样值得用在处理异常、信号等会破坏程序执行顺序的地方。

临界区

相对于互锁访问函数这种“有限的”原子操作,临界区允许对一段代码进行“原子操作”,临界区相对于互斥量比较轻便,这是由于互斥量能够跨进程,并且支持等待多个内核对象。同时线程在进入一个被占用的临界区时会首先尝试自旋锁,在自旋锁循环一定次数失败后再让线程进入睡眠。

临界资源的初始化

我们考虑临界资源的初始化问题,一个重要的情景就是实现单例模式。在C++11后,可以方便地使用局部静态变量(Meyers Singleton满足初始化和定义完全在一个线程中发生,并且发生在所有其他线程访问之前)或者std::call_once(在C++11前我们只能使用Linux系统中的替代品pthread_once)实现线程安全的单例模式。
不过首先先看看一个使用锁的朴素的,也是开销巨大的方案。查看下面的代码,我们可以发现这里用一把大锁保证了不会有两个线程竞争创建/访问p_singleton的实例。但同时需要意识到当实例被唯一地创建好后,这个函数就不需要要锁来保护了,因为在这种情况下它简单到可以作为原子操作,然而事与愿违的是每次调用这个函数都需要获得锁。因此我们需要一种仅保护临界资源初始化过程的机制

1
2
3
4
5
6
7
8
Singleton * p_singleton = nullptr;
Singleton * get_singleton() {
std::lock_guard<std::mutex> lock(mtx);
if (p_singleton == nullptr) {
p_singleton = new Singleton();
}
return p_singleton;
}

双重检查锁定模式(Double-checked locking pattern, DCLP)指的是在加锁前先进行一次验证是否可以加锁。下面使用双重检查锁定模式来减少加锁的开销,具体的做法是先检查一遍p_singleton是否为nullptr

1
2
3
4
5
6
7
8
9
10
Singleton * p_singleton = nullptr;
Singleton * get_singleton() {
if (p_singleton == nullptr) { // a
std::lock_guard<std::mutex> lock(mtx);
if (p_singleton == nullptr) { // b
p_singleton = new Singleton();
}
}
return p_singleton;
}

但其实这种加锁方式也是有理论上的风险的,例如我们的p_singleton不是原子的,甚至都不是volatile的,基于我们与编译器的约定,编译器完全可以认为p_singleton的值不会发生变化,因此直接将两层if削掉一层。
但是即使我们将p_singleton套上std::atomic、加上volatile,这个代码仍然是错误的。原因在于p_singleton = new Singleton()这条语句不是原子的。我们可以把该语句分为三步

  1. 分配内存
  2. 构造对象
  3. 指针指向对象

编译器在理论上(但实践中编译器没有理由去进行这样的重排)会在2构造对象前执行1内存分配和3指针指向操作。假设线程在1/3步骤完毕之后被挂起而来不及执行2步骤,而另一个线程开始访问a处的代码,注意到此时p_singleton已经不是nullptr了,于是函数会返回一个未初始化的内存。继续思考我们发现,这里的问题是由于第一个线程此时已经在初始化p_singleton,这第二个线程就不应该有机会执行到a处的代码,试想即使第二个线程知道初始化再被另一个线程执行,那它也做不了任何事情,因为代码中写了要么初始化并返回指针,要么直接返回指针。选择前者会破坏第一个线程的初始化过程,选择后一个会造成上面说的结果。因此在a处对p_singleton进行保护是非常有必要的。在稍后的章节中,我们将对双重检查锁定模式进行进一步的讨论。

因此实际上使用上面提到的std::call_once是一个更好的解决方案。在下面的代码中,只会输出一行Called once

1
2
3
4
5
6
7
8
9
10
11
12
13
std::once_flag flag;  
void do_once()
{
std::call_once(flag, [](){ std::cout << "Called once" << std::endl; });
}
int main()
{
std::thread t1(do_once);
std::thread t2(do_once);

t1.join();
t2.join();
}

这里的std::once_flag不能被拷贝和移动,其实相当于一个锁,call_once实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template<class _Fn,
class... _Args> inline
void (call_once)(once_flag& _Flag, _Fn&& _Fx, _Args&&... _Ax)
{ // call _Fx(_Ax...) once
// 定义一个_Tuple类型
typedef tuple<_Fn&&, _Args&&..., _XSTD exception_ptr&> _Tuple;
// _Seq的值索引上面的_Tuple
typedef make_integer_sequence<size_t, 1 + sizeof...(_Args)> _Seq;

_XSTD exception_ptr _Exc;
// 将回调函数参数打包到_Tuple类型里面,最后一个是exception_ptr
_Tuple _Tup(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)..., _Exc);

// 使用_Tup里面的上下文特化_Callback_once函数模板
_Lambda_fp_t _Fp = &_Callback_once<_Tuple, _Seq, 1 + sizeof...(_Args)>;

// 在xonce.cpp中实际调用了__crtInitOnceExecuteOnce的WINAPI
if (_Execute_once(_Flag, _Fp, _STD addressof(_Tup)) != 0)
return;

if (_Exc)
_XSTD rethrow_exception(_Exc);

_XGetLastError();
}

// xonce.cpp
_STD_BEGIN
_CRTIMP2_PURE int __CLRCALL_PURE_OR_CDECL _Execute_once(
once_flag& _Flag, _Lambda_fp_t _Lambda_fp, void *_Pv) _NOEXCEPT
{ // wrap Win32 InitOnceExecuteOnce()
static_assert(sizeof(_Flag._Opaque) == sizeof(INIT_ONCE), "invalid size");

return (__crtInitOnceExecuteOnce(
reinterpret_cast<PINIT_ONCE>(&_Flag._Opaque),
reinterpret_cast<PINIT_ONCE_FN>(_Lambda_fp),
_Pv, 0));
}

读写锁

相对于临界区,读写锁能提供更精细的控制,它适用于写操作远少于读操作的数据结构。读写锁允许一个写线程独占访问,而多个读线程并行访问。当写线程需要独占访问时,它需要获得一个排它锁,如果此时有另外的线程持有排它锁或者共享锁,那么本线程就会被阻塞。当读线程需要共享访问时,只要没有线程持有排它锁,那么他就可以立即获得共享锁。读写锁的过程可以参照来自博文的论述

1
2
3
4
5
6
7
8
9
10
11
12
13
Read object begin
P(object.lock)
AtomicAdd(object.activeReader, 1)
V(object.lock)
Do Actual Read
AtomicAdd(object.activeReaders, −1)
end
Write object begin
P(object.lock)
while object.activeReaders != 0 do delay
Do Actual Write
V(object.lock)
end

对读写锁Windows API提供了所谓的SRW系列函数,Linux提供了rwlock系列函数。在C++17中终于提供了std::shared_mutex来实现读写锁,此时我们的读锁可以声明为std::shared_lock<std::shared_mutex>(C++14),写锁可以声明为std::unique_lock<std::shared_mutex>(C++11),如此跨越三个版本的标准才能完成的实现,你只有在C++中才能看到。
从实现上来看,无论是关键段、互斥量、信号量,甚至是条件变量都可以实现读写锁。

  1. 关键段的实现方式
    这里摘录了CSDN上的一个实现。其思想如下
  2. 互斥量的实现方式
    使用互斥量时我们需要注意在进行读操作时我们要获取写锁以免脏读,但可能出现多个读线程竞争写锁的情况,所以我们需要一个读锁。只有竞争到读锁的线程才能去锁定写锁。其过程如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 写
    lock(mutex_write);
    write();
    unlock(mutex_write);

    // 读
    lock(mutex_read);
    if(readers == 0)
    lock(mutex_write);
    readers++;
    unlock(mutex_read);
    read();
    lock(mutex_read);
    readers--;
    if(readers == 0)
    unlock(mutex_write);
    unlock(mutex_read);
  3. 信号量的实现方式
    这里的Swait(sem, t, d)表示信号量sem的P操作需要t个资源,并且会消耗d个资源,Ssignal(sem, d)表示信号量sem的V操作产生d个资源。这里Swait类似std::lock,可以同时对若干个信号量上锁,从而避免死锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 初始化
    max_reader = n; // 最多允许n个读者读
    Sinit(sem_read, max_reader);
    Sinit(sem_write, 1);

    // 写
    Swait(sem_write, 1, 1; sem_read, max_reader, 0);
    write();
    Ssignal(sem_write, 1);

    // 读
    Swait(sem_write, 1, 0; sem_read, 1, 1);
    write();
    Ssignal(sem_read, 1);
  4. 条件变量的实现方式

不变量与恶性条件竞争

不变量(invariant)是某个特定数据结构始终保持的特性。例如通过链表的前向指针始终能到达前驱结点。不变量对应的特性常常在更新过程中被破坏,特别是当更新涉及到修改多个值,或者需要多个过程时。这就类似于在最终一致性系统的窗口内强一致性被破坏。当不变量遭到破坏时,才会产生竞态条件(C++ Concurrency in Action: Ch3)。
为了解决竞态条件,一种方法是确保只有当前进行修改的线程才能看到不变量被修改的中间状态,也就是将临界资源保护起来,前面看到的互斥量等属于这种机制。另一种方法是借助于锁无关编程技术,这种技术将对数据结构的修改分解为若干个不破坏不变量的原子操作。还有一种办法是借助于事务的STM技术,将所需要的操作存储于日志中,再合并提交。

锁无关

锁无关(Lock-Free)是一种比无干扰(Obstruction-Free)高层次的并发模型,它是一个容易混淆的概念。锁无关与其他模型的本质区别并不是不用锁(Lockless),而是确保各个线程在访问共享资源时之间不会互相阻塞,从而使得整个程序整体上能够始终向后执行。相对应地,如果使用内核锁,如果一个获得内核锁的线程被挂起或者挂掉,这容易导致其他拥有锁的线程陷入永久等待。但即使借助于原子操作,也会产生死锁、竞态的问题,例如自旋锁的死锁问题和使用CAS时可能出现的ABA问题
加锁操作通常存在着一些问题,锁无关的编程虽然复杂,但相对于使用锁,锁无关的可伸缩性和性能方面会强于锁相关的算法,并且如果我们能够有序地组织各个线程“各行其道”,就能减少锁的使用。通常来说一个基于锁的算法在高竞争的系统中有较好的效率,因为当发生竞态时线程进行睡眠而不是立即重试,但在一般的情境中,不使用锁往往能避免上下文切换的开销。
相应的还有一个无阻塞(Non-blocking)的概念,无阻塞的限制条件要弱于锁无关。属于无阻塞算法而不属于无锁算法的常见例子包括自旋锁。在自旋锁中,所有的线程都不会进入睡眠,因此是非阻塞算法;而考虑当获得锁的线程因为一些原因被暂停时,所有的其他线程仍然需要在原地自旋忙等,因而自旋锁不是无锁算法。

原子操作

原子操作是常见的实现锁无关编程的方式,常见的原子操作有CAS、FAA、TAS、TTAS等。原子操作指的是不可被中断的一系列操作,在原子操作保证当前操作中不发生线程切换,因此保证了其他的线程不可能访问这个资源。原子操作一般有两种实现方式,第一个是使用锁或者CPU的特殊指令等机制来维持原子性,第二个是当出现并发写等破坏原子性的情况时让操作失败,因此对于第二种情况需要使用一个循环不断尝试。这里需要注意的是,原子操作并不一定就能够提高效率,也就是所谓的scalability,这是由于涉及对共享对象操作的原子指令都可能造成cache invalidation,也就是需要重新刷新缓存行。另外,原子操作本身也是很慢的,如下图所示

我们知道x86汇编要求对任意位置的1字节,以及对2/4/8对齐的2/4/8长度的整型读取都是原子的,但是C++11前我们却不能假设甚至是对一个int赋值的操作是原子的,而在C++11后我们需要使用std::atomic来显式声明一个原子的变量。这一方面是由于C++无法保证通过一条指令从内存的存取(考虑一些违反strict aliasing的胡乱cast破坏了对齐)。另一方面也是C++11前根本没有考虑对多线程提供语言级别的支持(这点Java就做得比较好),C++标准规定data race,即并发地去修改一个对象是UB的,所以编译器可以不考虑多线程的情况而进行优化,产生错误。因此通常的方式是直接使用操作系统提供的API,一般来说如果能够有一个原子的CAS,那么就能够借助它实现其他的原子操作。
为了展示问题的复杂性,下面展示了一个早期GCC编译器的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
extern int v;

void f(int set_v)
{
if (set_v)
v = 1;
}
// GCC 3.3.4--4.3.0 O1
f:
pushl %ebp
movl %esp, %ebp
cmpl $0, 8(%ebp)
movl $1, %eax
cmove v, %eax ; load (maybe)
movl %eax, v ; store (always)
popl %ebp
ret

在这个问题中考虑调用f(0),理想情况下v的值无论如何都不会变动的,但是在gcc生成的代码中,我们看到一个始终执行的存储movl %eax, v。显然编译器认为f并不会被修改,而且这对仅面向单线程优化的编译器是完全有理由说得通的。但如果在load和store之间发生了切换,并导致竞态。
除了上面的例子,一些常见的优化,例如循环展开都会造成严重后果。

互锁访问函数和CAS操作

操作系统提供的原子API常借助于某些CPU(比如Intel处理器)提供的指令,能够对某些类型实现某些原子操作。注意到对SMP架构而言,会出现多个核心并发写的情况,这个涉及到后面的内存模型,并且在这里我们可以暂时忽略这个问题。
Windows API提供了一系列Interlocked开头的互锁访问函数,这些函数在处理器层面被保证独占访问。其中一个很关键的便是InterlockedCompareExchange(PLONG dest, LONG value, LONG old)函数,这个函数提供了对LONG类型的原子的CAS操作。InterlockedCompareExchange*destold进行比较,如果相等就将*dest设为value。显然,通过内核锁能够方便地实现原子语义,但原子操作通常会借助于这样的CAS操作,因为这样能避免线程进入睡眠。借助于CAS可以实现其他的原子操作,例如下面的对LONG进行原子赋值的InterlockedExchange函数。在C++11的std::atomic类型中,我们会看到更多的CAS的应用。

1
2
3
4
5
6
7
LONG InterlockedExchange(LONG volatile * target, LONG value){
LONG old;
do{
old = *target;
}while(! InterlockedCompareExchange(target, value, old));
return old;
}

这里的do-while循环保证了在InterlockedCompareExchange失败之后再来一遍能够再来一遍直到成功,但是不能将这个循环和自旋锁中的忙等混淆,从而认为CAS不是锁无关的。这是因为CAS实际上并没有“持有”临界资源,它只需要一个指令就能结束战斗。因此任意一个线程的暂停并不会使得其他线程进入忙等,甚至能够使得CAS的成功率更高。因此可以看出CAS在这里对竞争访问实际上是“消极防御”的态度,也就是所谓的乐观锁(Optimistic Locking),乐观锁是一种非独占锁,它并不是向内核锁一样直接让竞争者们睡眠,而是返回一个失败的状态。相对应的,之前的内核锁和自旋锁等机制属于悲观锁、独占锁。相比乐观锁,悲观锁有以下的弱点(也可以理解为基于锁算法的弱点)

  1. 上下文切换造成的性能开销
  2. 可能造成的死锁问题
  3. 优先级倒置

使用CAS操作实现的并发缓冲队列

常见的用CAS实现的Lockfree算法例如缓冲队列。首先对于一读一写的模型我们可以仅通过约束读指针和写指针的行为即可实现,并不需要接触并发模型。下面我们考虑多对多的模型,以论文Implementing Lock-Free Queues中的论述为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Enqueue(x) {
q = new record();
q->value = x;
q->next = NULL;

do {
p = tail; // 使用tail维护链表尾指针的位置
} while( CAS(p->next, NULL, q) != true); // 1

CAS(tail, p, q); // 2
}

DeQueue() {
do{
p = head;
if (p->next == NULL){
return ERR_EMPTY_QUEUE;
}
} while( CAS(head, p, p->next) != TRUE );
return p->next->value;
}

这里有一个疑问,就是为什么2这句不使用循环保护起来,这是因为这个语句是始终能够成功的。我们考虑成功进行了1处CAS的线程T1,它使得tail->next不为NULL了,假如此时另一个线程T2执行到1,那么它的CAS一定是失败的。这个过程一直到语句2之后tail被成功更新成q,因此实际上可以把tail->next看成一个锁一样的东西。既然如此,我们容易发现一个违背锁无关性质的可能性,也就是当线程T1在执行语句2时挂掉了,那就会阻塞所有其他在循环中的线程。因此我们提出下面的改良版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
EnQueue(x)
{
q = new record();
q->value = x;
q->next = NULL;

p = tail;
oldp = tail;
do {
while (p->next != NULL)
p = p->next;
} while( CAS(p->next, NULL, q) != TRUE); // 1

CAS(tail, oldp, q); // 2
}

考虑到可能来自其他线程的未提交(指未执行语句2)的添加,我们发现语句p = tail中的tail并不一定是结尾,这也导致了为了维护离开循环时p必须指向结尾这个特性,线程需要在循环内自旋,从而导致上述的死锁现象的产生。为了解决问题,在这一版本中我们索性放宽假设,认为tail只是“接近”结尾,因此现在我们需要使用一个内层的while循环来从tail开始尝试更新结尾。这样即使T1线程挂在语句2,没能更新完tail指针,线程T2也可以自动跟踪到T1在1处的修改。此时我们也不要担心语句2的失败问题,因为有的时候它应该失败。考虑下面的执行顺序:原先链表中只有一个元素1,此时线程T1添加了一个元素2,并且成功执行语句1,将p指向了元素2的位置。此时发生了调度,线程T2获得处理器,它需要在队列中加一个元素3,T2在刚进入循环时它发现自己的tail是指向1的,但它在内层的while循环中根据pnext指针走到了刚被T1添加进去的元素2处。因此T2在元素2的末尾增加了元素3,并且更新自己的p指向元素3。T2继续执行语句2,此时tail == oldp指向元素1,所以CAS成功,tail指向了元素3。接着T1重新获得了处理器,此时tail已经被T2修改到指向元素3了,于是不能匹配oldp,这个CAS就会失败。容易看到这个失败不会影响tail指向精确的队列结尾。但是如果我们稍稍修改下上面的运行顺序,按照T1添加元素2 => T2添加元素3 => T1修改tail => T2修改tail(失败)来执行,那么我们就会发现tail被更新到指向元素2而不是元素3。所以我们看到先前我们放宽的假设是非常有必要的,在论文中作者指出这种情况下tail指针距离列表的准确结束位置最多相差2 * p - 1个节点。其实这个“最多”还是有点多的,所以在实践中我们常常结合两种方案来使用。

CPU提供的原子操作

如果细究上面WINAPI的InterlockedCompareExchange,容易猜到它的实现方式来自于CPU的硬件支持,因为它只能为特定数据类型提供服务。事实上,这里用了x86中的cmpxchg命令
CPU原子操作的实现借助于总线锁、缓存锁等机制。

C++的原子操作库

在上面的章节中,我们概览了锁无关编程的一些思想。从现在开始,我们将讨论C++11标准库提供的原子操作支持。

std::atomic_flag

std::atomic_flag是C++11原子库的一个基础设施,它被广泛地运用到下面的std::atomic类模板的实现中。std::atomic_flag基于TAS(test-and-set)操作维护了一个布尔量flag,提供了test_and_setclear两个方法,可以保证对flag的写不会冲突,读不会脏读。test_and_set尝试将flag从false设为true,如果发现flag已被设置,否则原子地设置flag为true,该函数返回的是flag先前的值。由于std::atomic_flag原子地维护了一个flag,它常被用来实现自旋锁。下面的代码来自MSVC的atomic库,它在std::atomic_Atomic_copy方法中被调用。

1
2
3
4
5
6
7
8
9
10
11
12
inline void _Lock_spin_lock(
volatile _Atomic_flag_t *_Flag)
{
while (_ATOMIC_FLAG_TEST_AND_SET(_Flag, memory_order_acquire))
_YIELD_PROCESSOR;
}

inline void _Unlock_spin_lock(
volatile _Atomic_flag_t *_Flag)
{
_ATOMIC_FLAG_CLEAR(_Flag, memory_order_release);
}

此外,容易发现TAS操作也可以通过CAS实现,其代码很简单

1
return InterlockedCompareExchange(&flag, true, false);

在MSVC的标准库实现中test_and_set借助了Interlock互锁访问函数,保证了访问的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 #if defined(_M_ARM) || defined(_M_ARM64)
#define _INTRIN_RELAXED(x) _CONCAT(x, _nf)
#define _INTRIN_ACQUIRE(x) _CONCAT(x, _acq)
#define _INTRIN_RELEASE(x) _CONCAT(x, _rel)
#define _INTRIN_SEQ_CST(x) x
#else /* defined(_M_ARM) || defined(_M_ARM64) */
#define _INTRIN_RELAXED(x) x
#define _INTRIN_ACQUIRE(x) x
#define _INTRIN_RELEASE(x) x
#define _INTRIN_SEQ_CST(x) x
#endif /* defined(_M_ARM) || defined(_M_ARM64) */
inline int _Atomic_flag_test_and_set(volatile _Atomic_flag_t *_Flag,
memory_order _Order)
{ /* atomically test flag and set to true */
switch (_Order)
{
case memory_order_relaxed:
return (_INTRIN_RELAXED(_interlockedbittestandset)(_Flag, 0));

case memory_order_consume:
case memory_order_acquire:
return (_INTRIN_ACQUIRE(_interlockedbittestandset)(_Flag, 0));

case memory_order_release:
return (_INTRIN_RELEASE(_interlockedbittestandset)(_Flag, 0));

case memory_order_acq_rel:
case memory_order_seq_cst:
return (_INTRIN_SEQ_CST(_interlockedbittestandset)(_Flag, 0));

default:
_INVALID_MEMORY_ORDER;
return (0);
}
}
inline void _Atomic_flag_clear(volatile _Atomic_flag_t *_Flag,
memory_order _Order)
{ /* atomically clear flag */
static_assert(sizeof(_Atomic_flag_t) == sizeof(_Uint4_t),
"Unexpected _Atomic_flag_t size");

switch (_Order)
{
case memory_order_relaxed:
case memory_order_release:
case memory_order_seq_cst:
_Atomic_store_4((volatile _Uint4_t *)_Flag, 0, _Order);
break;

default:
_INVALID_MEMORY_ORDER;
break;
}
}

std::atomic

在前面的讨论中我们已经明白C++中的基本类型并不保证是原子的,所以std::atomic<T>定义了一系列具有原子行为类型std::atomic禁用了复制构造函数和复制赋值运算符(同时也不允许移动)。这是由于这两个操作发生在两个对象间,势必要破坏原子性,而一个std::atomic的所有操作都是原子的。
对于用户自定义类型(UDT)typename Tstd::atomic主模板要求T满足standard layout、trivial default constructor和trivial destructor,即编译器可以使用memcpy等进行bitwise的复制并使用memcmp进行bitwise的比较。在CAS操作的实现中调用了memcpymemcmp。这看起来限制很大,我们希望有用户自定义的复制构造函数,这样我们就可以进行member-wise的操作了,对此想法,C++ Concurrency in Action一书中指出,如果有自定义的复制构造函数,那么就势必要将锁定区域(下文中会交待其实std::atomic的主模板实现中可能会有自旋锁)内的数据交给这些用户代码,如果在用户代码中再使用了锁,就可能产生死锁的现象。容易发现我们常用的智能指针std::shared_ptr并不能被放到std::atomic里面,但是我们又确实有这个需要,因此标准库通过重载std::atomic_系列函数为std::shared_ptr提供了原子操作的支持。而对于std::atomic类型,我们既可以通过std::atomic_系列函数,也可以通过std::atomic模板中提供了compare_exchange_weakcompare_exchange_strongloadstore等操作。
一般标准库会对一些大小满足能够直接使用某些处理器的原子指令的类型进行特化,例如指针类型、integral类型和一些用户定义类型。对于指针类型,std::atomic会进行偏特化。原子的指针运算可以通过fetch_开头的函数和相应的operator运算符来实现。对integeral类型std::atomic类模板也会进行特化,其实现类似指针类型,并且添加了对位运算的支持。对于“复杂”的整型计算如乘法,虽然atomic未提供,但可以通过compare_exchange_weak等函数间接实现。从C++20开始,std::atomic类模板提供对浮点类型的特化。注意在这之前,compare_exchange_strong等CAS方法对浮点数可能出现问题,原因显而易见是memcmp的锅,C++浮点数之间比较时甚至都不能使用==,遑论memcmp
在上文中提到,除了std::atomic_flagstd::atomic<typename T>类模板都是不保证不使用锁的(情况特定于处理器和标准库实现),用户可通过bool is_lock_free()函数判断是否Lockfree的。以PJ Plauger的实现为例,主模板的load()内部就使用了上文提到的用std::atomic_flag实现的自旋锁。

1
2
3
4
5
6
7
inline void _Atomic_copy(volatile _Atomic_flag_t *_Flag, size_t _Size,
volatile void *_Tgt, volatile const void *_Src, memory_order _Order)
{
_Lock_spin_lock(_Flag);
_CSTD memcpy((void *)_Tgt, (void *)_Src, _Size);
_Unlock_spin_lock(_Flag);
}

这里要提一句,主模板的template<class _Ty> struct atomic的实现继承了_Atomic_base<_Ty, sizeof (_Ty)>。这个_Atomic_base<_Ty, sizeof (_Ty)>模板又继承了_Atomic_impl<_Bytes>模板,其作用相当于把_Atomic_impl<_Bytes>中全void *的东西封装回了_Ty。我们查看最核心的_Atomic_impl<_Bytes>模板,它是和数据字节数相关的,分别对1/2/4/8字节的进行了偏特化。模板里面定义了最重要的_Is_lock_free_Store_Load_Exchange_Compare_exchange_weak_Compare_exchange_strong等操作,全部是void*的。我们刚才看到的_Atomic_copy来自于主模板。我们下面看看_Atomic_impl<_Bytes>的偏特化版本,如对于一个uint2_t是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  #define _Compiler_barrier()	_ReadWriteBarrier()

#if defined(_M_ARM)
#define _Memory_barrier() __dmb(_ARM_BARRIER_ISH)
#endif /* defined(_M_ARM) */

#if defined(_M_ARM64)
#define _Memory_barrier() __dmb(_ARM64_BARRIER_ISH)
#endif /* defined(_M_ARM64) *

/* _Atomic_load_2 */
inline _Uint2_t _Load_seq_cst_2(volatile _Uint2_t *_Tgt)
{
_Uint2_t _Value;

#if defined(_M_ARM) || defined(_M_ARM64)
_Value = __iso_volatile_load16((volatile short *)_Tgt);
_Memory_barrier();

#else
_Value = *_Tgt;
_Compiler_barrier();
#endif

return (_Value);
}

inline _Uint2_t _Load_relaxed_2(volatile _Uint2_t *_Tgt)
{
_Uint2_t _Value;

#if defined(_M_ARM) || defined(_M_ARM64)
_Value = __iso_volatile_load16((volatile short *)_Tgt);

#else
_Value = *_Tgt;
#endif
return (_Value);
}

inline _Uint2_t _Load_acquire_2(volatile _Uint2_t *_Tgt)
{
return (_Load_seq_cst_2(_Tgt));
}

可以发现在这个偏特化版本的实现中直接借助了处理器提供的设施,而避免了自旋锁的使用。

weak和strong版本的CAS

类似与Windows API的互锁访问函数,atomic库通过bool atomic::compare_exchange_weak(old, value)bool atomic::compare_exchange_strong(old, value)提供了对CAS操作的支持。这两个函数监测该std::atomic中维护的std::atomic_flag _My_flag(在_Atomic_impl模板中定义)的值,如果等于old就改为value,函数返回一个表示修改是否成功的bool量。因此容易发现这两个函数不总是成功的,因为CPU可能仅对某些类型提供了相应的CAS原子指令,对于其他的类型则必须通过使用自旋锁甚至内核锁来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
inline int _Atomic_compare_exchange_weak(volatile _Atomic_flag_t *_Flag, size_t _Size, volatile void *_Tgt, volatile void *_Exp, const volatile void *_Src, memory_order _Order1, memory_order _Order2)
{ /* atomically compare and exchange with memory ordering */
int _Result;

_Lock_spin_lock(_Flag);
_Result = _CSTD memcmp((const void *)_Tgt, (const void *)_Exp, _Size) == 0;
if (_Result != 0)
_CSTD memcpy((void *)_Tgt, (void *)_Src, _Size);
else
_CSTD memcpy((void *)_Exp, (void *)_Tgt, _Size);
_Unlock_spin_lock(_Flag);
return (_Result);
}

这两个函数有一些细致的区别,compare_exchange_weak在可能会False Negative,这是由于weak允许spurious failure。在某些平台(不错ARM、PowerPC又被点名了)上的CAS是通过LL/SC实现的,而不像x86上那样只通过一条指令,所以可能存在问题。因此使用compare_exchange_weak的时候需要一个循环。注意到这个!expected不是必要的。

1
2
3
bool expected=false;
extern atomic<bool> b; // set somewhere else
while(!b.compare_exchange_weak(expected,true) && !expected);

Stackoverflow上相关问题的整理中提到了这两者之间的性能比较,由于weak会忽视检查,所以一般weak比strong快。但是如果使用strong能避免weak+loop,那么选择strong是适合的。注意到即使使用strong,loop也不是就一定可以避免的,因为原子操作本来就存在使用乐观锁的情况。

ABA问题

伴随着CAS的是可能存在的ABA问题。ABA问题的根源是从内存中取出值和CAS这两个操作不是原子的,因此可能在这两个过程中发生切换。

原子操作与锁的关系

在上面的讨论中,我们能够直观地发现原子操作和锁的关系。原子操作看起来是“独善其身”的只能管住自己,原子操作之间、原子操作和非原子操作之间可能发生乱序或重排;而锁像大哥,能护住一段代码。由此我们思考如何通过原子操作来组织其他的那些非原子操作呢?这就要引入下面讨论的内存模型的问题。

其他的同步原语

在上面的几个章节中,我们论述了基于锁和基于原子操作的同步原语。还有一些其他的同步原语,例如RCU、MCS Lock等。在Linux中使用了

Hazard Pointer

Hazard Pointer类似于面向多线程的智能指针,它能够无等待地进行线程安全的垃圾回收。

RCU

RCU(Read Copy Update)是一种替代读写锁的方法,在Linux内核中被广泛使用。其思想是

  1. 对于写操作
    1. 从数据结构中移除对应的指针,因此后面的读者将不能成功读取
    2. 等待前面的读者完成读取

通过内存模型约束线程对变量的读写顺序

锁无关编程的难点之一就是需要从编译器与CPU两个层面考虑行为对线程间的同步造成的的影响,也就是考虑编译器重排和CPU缓存与乱序对读写逻辑可能造成的影响。当我们试图使用原子操作去解决非原子操作间的竞态问题时,那么我们需要谨慎选择使用恰当的内存模型,这样能够在提升效率的同时保证安全性。

原子操作的线程间顺序

我们知道从C++语言到运行程序得到结果之间需要经历编译器优化和处理器优化两道坎,处理器优化包括高速缓存和指令乱序,编译器优化可能进行重排。编译器和处理器达成的协议是不能改变单线程程序的行为。以编译器优化为例,下面展示的代码在O0下,g++7按照1-2的原始顺序来编译,但开启O1后,g++7就会进行Store-Store重排,将2提到1前面,先对b赋值,再对a赋值;并且还去除了一部分没用的代码。对单线程来说,这样的优化并没有任何问题。但对于多线程来说则可能出现问题。一方面,由于去除部分代码的原因,汇编O1事实上不能在a处观察到a == 1 && b == 1的情况,而假设O0汇编在进行到b处被抢占,那么其他的线程有机会看到以上的情况。另一方面,在O1中先对b赋值再对a赋值,仍然会出现问题。假如说在3和4间线程被强占,那么另外一个线程观察ab,得到b为123,而a为不确定值(或者1,如果前面赋初值语句没有被删去的话),而如果编译器不进行重排,我们理想中的原始结果是a为43,b为不确定值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int main(){
int a = 1, b = 1; // 0
// a
a = b + 42; // 1
// c
b = 123; // 2
printf("%d %d", a, b);
return 0;
}
// compile with -O0
movl $1, -8(%rbp)
movl $1, -4(%rbp)
// b
movl -4(%rbp), %eax
addl $42, %eax
movl %eax, -8(%rbp)
movl $123, -4(%rbp)
movl -4(%rbp), %edx
movl -8(%rbp), %eax
// compile with -O1
movl $123, %ecx // 3
movl $43, %edx // 4

出于性能方面的考虑,对多线程的程序而言并不存在和单线程一样的硬性要求。在使用原子操作等锁无关技术时,不能假设所有环境下程序最后行为一如我们希望代码所“暗示”一样。事实上编译器或处理器可以在不同线程间不同变量间进行读写乱序,而这在多线程中会造成问题。例如在单线程中将对B的写提到对A的读前面是没有问题的,但是对多线程来说,这往往就会出现问题。虽然大部分时候我们不需要操心这个问题,这是因为一方面在使用mutex等内核锁时,内核帮我们做了相关工作,另一方面部分处理器(如x86)也提供了(近似,实际上是TSO)acquire/release的保障,并也可以通过一些指令命令编译器在某些地方减少优化,但这依然是一个客观存在的问题。

可见性和有序性

可见性指一个线程对变量的写操作对其它线程后续的读操作可见,这里见的是结果。可见性要求CPU在对缓存行写操作后能够保证至少在某个时间点前必须写回内存,从而保证其他线程能读到最新的值。有序性指的是数据不相关变量在并发的情况下,实际执行的结果和单线程的执行结果和单线程的执行结果是一样的,不会因为重排/乱序的问题导致结果不可预知。

根据以上的定义,我们引入下面的两个概念:
如果操作A先行发生(happen-before)于操作B,那么A造成的修改能够被B观察到。我们以知乎上举出的一个例子来理解,根据上面有关原子操作的线程间顺序的讨论,我们知道下面的代码在单线程条件下断言是始终成立的,即使语句1和2之间发生了重排。容易看出happen-before强调的是一个现象,C++保证在单线程中happen-before现象是始终成立的,不管后面编译器和CPU如何进行重排。

1
2
3
4
5
6
int a, b;
void foo() {
a = 42; // 1
b = a; // 2
assert(b == 42);
}

如果A同步发生(synchronizes-with)于B,那么某个线程中A的修改能够被另一个线程中的B观察到。它实际上是建立一种方法,使得一个时间点前内存的变化能够被其他线程看到。我们引用同样的来源的一个例子,由于重排的问题,在下面的代码中语句4的断言不一定成立(我们不考虑x86 CPU的TSO模型)。这就说明在Relax等无约束或者少约束的内存模型下,在多线程中试图通过某原子量来同步非原子量并不是可靠的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int data;
std::atomic_bool flag { false };

// Execute in thread A
void producer() {
data = 42; // (1)
flag.store(true, memory_order_relaxed); // (2)
}

// Execute in thread B
void consume() {
while (!flag.load(memory_order_relaxed)); // (3)
assert(data == 42); // (4)
}

内存一致性模型

广义上的一致性模型包括Strict Consistency、Sequential Consistency、Causal Consistency、Processor Consistency、FIFO consistency、Cache Consistency、Slow Consistency、Release consistency、Entry Consistency、General Consistency、Local Consistency等。在std::atomic中提供了六种内存模型(memory order)来描述不同线程之间相同/不同数据的读写的顺序。
顺序一致性(sequential consistency, SC)是最强的模型,要求程序中的行为从任意角度来看,序列顺序都是一致的(have single total order);这是在说这段多线程程序的行为和一段单线程程序的行为是一致的,类似于不是并行的并发。这个模型禁止了任何四种类型的读写重排,因此我们可以认为SC下每次读到的都是最新值。在C++ Concurrency in Action中举了一个例子,使用四个线程运行下面四个函数,断言无论如何z永远不可能为0。这说明了在read_x_then_yread_y_then_x两个函数至少有一个能看到xy同时被设为true。容易看出将1/2/3/4任意排序,那么上面的断言是显然的(注意到即使read_x_then_ywrite_x前被调用也有while循环兜底)。但是如果(1 -> 3)(2 -> 4)这两个步骤并行发生的话,上面断言就不成立了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
std::atomic<bool> x = false, y = false;
std::atomic<int> z = 0;
void write_x()
{
x.store(true,std::memory_order_seq_cst); // 1
}
void write_y()
{
y.store(true,std::memory_order_seq_cst); // 2
}
void read_x_then_y()
{
while(!x.load(std::memory_order_seq_cst)); // a
if(y.load(std::memory_order_seq_cst)) // 3
++z;
}
void read_y_then_x()
{
while(!y.load(std::memory_order_seq_cst)); // b
if(x.load(std::memory_order_seq_cst)) // 4
++z;
}

自由模型(Relaxed ordering)是最弱的模型,它对线程间的执行顺序不做任何synchronizes-with的假设,但同线程的变量仍然遵循happens-before假设,即它除了禁止重排单线程上对单个变量的访问顺序,并不作任何额外的事情。下面展示的一个C++11对应的自由模型std::memory_order_relaxed的例子,注意在后面的详述中可以看到,由于特定平台的一致性模型要强于自由模型,所以std::memory_order_relaxed只是保证强于等于自由模型。注意到这时候仍然保证了1先于2、3先于4,且3读到true,但是z就可能为0了。这是由于xy是两个不同变量,自由模型不关注它们之间的关系。关注一下图5.4,我们发现这张图非常反直觉,例如4步骤还会返回false,这是出于什么机理呢?但是在这之前,先在x86下的MSVC2015上测试一下,发现z始终不为0,这是为什么呢?我在StackOverflow上提了个问题。回答首先指出std::memory_order_relaxed的副作用实际上是禁止编译器(注意区分编译器的重排行为和处理器的乱序行为)重排xy的Store-Store,但是断言失败还可能由于CPU决定颠倒写x和写y的顺序(虽然一般不会进行这种乱序),或者CPU的缓存导致了x延迟写入内存。回答接着解释了为什么x86上不会assertion fail,这是因为x84提供了acquire/release语义,保证了当3是true时,在2前的对3后的可见。注意到这个并不违反x86对Store-Load可能的乱序,它实际上利用了Store-Store不会乱序的特性。回顾之前的最严格的顺序一致模型,它要求对于每个共享变量,Load-Load、Load-Store、Store-Load、Store-Store都不乱序。

1
2
3
4
5
6
7
8
9
10
11
12
13
std::atomic<bool> x = false, y = false;
std::atomic<int> z = 0;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
y.store(true,std::memory_order_relaxed); // 2
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3
if(x.load(std::memory_order_relaxed)) // 4
++z;
}

acquire/release模型相对灵活一点,也是x86实现的语义。release可以理解为写操作,acquire可以理解为读操作。acquire fence要求其后面的RW不能与其前面的R重排,也就是RW不能重排到(原本在自己前面的)R前,例如R1 W2不能变成W2 R1,否则R1读到的就是脏值的。release fence要求其前面的RW不能和其后面的W重排,也就是RW不能重排到(原本在自己后面的)W后,例如R1 W2不能重排为W2 R1,否则R1又脏读。但是这两个模型即使组合起来也不能禁止其前面的W和后面的R重排。
C++11使用剩下四个内存模型常数来实现这一机制,memory_order_releasememory_order_acquire表示B线程在使用memory_order_acquire读时,线程A在memory_order_release前的所有写操作都是可见的。而稍弱一点的memory_order_releasememory_order_consume只用来保证当前操作涉及到的对象的可见性。

Store-Load乱序问题

在前面的讨论中提到了x86的Store-Load乱序问题,对于x86,Loads May Be Reordered with Earlier Stores to Different Locations,但对于相同地址则不会乱序
这篇博文中记录了一个有关Store-Load乱序的实验。在实验中,X的写操作可能被延迟到Y的读操作之后,尽管我们插入了compiler barrier。这时候我们需要一个full/general memory barrier,也就是实现让它前面所有的Load/Store操作对它后面的Load/Store操作都是可见的,包括了止Store-Load类型的乱序。在同一篇博文中指出可以使用插入一个mfence,以实现full/general memory barrier。

1
2
3
4
5
// gcc
asm volatile("mfence" ::: "memory")
// c++11
// https://stackoverflow.com/questions/25478029/does-atomic-thread-fencememory-order-seq-cst-have-the-semantics-of-a-full-memo
std::atomic_thread_fence(std::memory_order_seq_cst)

SoF上指出虽然atomic(seq_cst)atomic(seq_cst)始终不会重排,但是在atomic(seq_cst)附近的non-atomic甚至是atomic(non-seq_cst)形式的STORE-LOAD都会被重排。例如在下面的代码中1和3的STORE-LOAD肯定不会被重排,但2和3的STORE-LOAD就可能被重排,所以一定要注意。

1
2
3
4
std::atomic<int> a, b, c;
a.store(2, std::memory_order_seq_cst); // 1: movl 2,[a]; mfence;
c.store(4, std::memory_order_release); // 2: movl 4,[c];
int tmp = b.load(std::memory_order_seq_cst); // 3: movl [b],[tmp];

因此在x86上至少要对LOAD/STORE其中的一个加上MFENCE,或者用一个LOCK指令,而这也实现了类似memory_order_seq_cst的效果。在章节内存模型的实现中还有更多说明。
下面的代码不一定是等价的

1
2
3
4
5
6
7
8
9
10
atomic<int> x, y

y.store(1, memory_order_relaxed); //(1)
atomic_thread_fence(memory_order_seq_cst); //(2)
x.load(memory_order_relaxed); //(3)

atomic<int> x, y;
y.store(1, memory_order_seq_cst); //(1)
// Nothing
x.load(memory_order_seq_cst); //(3)

fence

常见的fence包括thread fence(memory/CPU barrier)和signal fence(compiler barrier)。

signal fence

signal fence类似下面的东西,参考Wikipedia

1
2
3
4
5
6
// gcc
asm volatile("" ::: "memory");
// msvc
__MACHINE(void _ReadWriteBarrier(void))
// c++11
std::atomic_signal_fence(memory_order_acq_rel)

对于gcc版本,asmvolatilememory三个关键字的作用可以参考SoF上的回答
对于MSVC版本,根据MSDN_ReadWriteBarrier限制了编译器可能的重排。C++11标准库中的std::atomic_signal_fence在MSVC上也是利用Compiler Barrier实现的。
一个signal fence的作用是

  1. 强制单线程和该线程上的异步中断之间的顺序性
  2. 强制单核上运行的多线程之间的顺序性
    注意到在SMP架构下这一点难以保证,所以对于多线程程序往往需要更强的thread fence。

thread fence

thread fence也就是所谓的内存屏障,我们可以使用下面的语句进行声明,此外,我们还可以声明一个单独的Store/Load Barrier。这里补充一下Store Barrier强制所有屏障的store指令,都在屏障指令执行之前被执行,并把store缓冲区的数据都刷到主存。Load Barrier强制所有屏障的load指令,都在屏障指令执行之后被执行,并且一直等到load缓冲区被该CPU读完才能执行之后的load指令。而一个full barrier兼而有之。

1
2
3
4
5
6
7
8
9
10
// x86
asm volatile("mfence":::"memory")
// gcc
__sync_synchronize
// msvc
MemoryBarrier()
// c++11
std::atomic_thread_fence(memory_order_seq_cst)
// other methods
_mm_mfence

我们需要注意的是内存屏障是相当耗时的操作,甚至还要超过原子操作,内存屏障还会干扰CPU的流水线,导致性能的降低。下面我们查看一下标准库atomic_thread_fence函数的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// MSVC
inline void _Atomic_thread_fence(memory_order _Order)
{ /* force memory visibility and inhibit compiler reordering */
#if defined(_M_ARM) || defined(_M_ARM64)
if (_Order != memory_order_relaxed)
{
_Memory_barrier();
}
#else
_Compiler_barrier();
if (_Order == memory_order_seq_cst)
{ /* force visibility */
static _Uint4_t _Guard;
_Atomic_exchange_4(&_Guard, 0, memory_order_seq_cst);
_Compiler_barrier();
}
#endif
}
// GCC
inline void
atomic_thread_fence(memory_order __m) noexcept
{ __atomic_thread_fence(__m); }

可以发现由于x86自带的acquire/release语义,除非是最强的memory_order_seq_cst,否则atomic_thread_fence等价于atomic_signal_fence。而memory_order_seq_cst下的thread fence的full barrier实现则比较奇特,仔细查看这个full barrier的实现这里为啥不直接插入一个MemoryBarrier(),而是利用了一个原子操作呢?

内存模型的实现

六种内存模型通过加入Compiler Barrier和Memory Barrier来实现。
SoF中指出acquire/release相当于在relax后面加一道栅栏,即下面的代码是等价的。但如果不显示加入fence的话,编译器可以视情况生成等价的更好的代码。而在x86等强内存模型架构cpu上,也不一定生成fence,例如单独的atomic_thread_fence(memory_order_acquire)就可以简化为nop。

1
2
3
4
5
// 1
a.load(memory_order_acquire)
// 2
a.load(memory_order_relaxed)
atomic_thread_fence(memory_order_acquire)

memory_order_seq_cst则需要额外的MFENCE或者LOCK,可参考上节所述。
下面的代码是等价的

1
2
3
4
5
6
7
8
9
10
if (var.load(std::memory_order_acquire) == 0)
{
assert(a==123);
}

if (var.load(std::memory_order_relaxed) == 0)
{
std::atomic_thread_fence(std::memory_order_acquire);
assert(a==123);
}

使用内存模型解决双重检查锁定模式(DCLP)存在的问题

在之前的章节中,我们曾经提到过双重检查锁定模式中存在的问题,对此文章Double-Checked Locking is Fixed In C++11指出我们可以通过适当的内存屏障或者atomic store/load语义来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::atomic<Singleton*> Singleton::m_instance;
std::mutex Singleton::m_mutex;

Singleton* Singleton::getInstance() {
Singleton* tmp = m_instance.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
if (tmp == nullptr) {
std::lock_guard<std::mutex> lock(m_mutex);
tmp = m_instance.load(std::memory_order_relaxed);
if (tmp == nullptr) {
tmp = new Singleton;
std::atomic_thread_fence(std::memory_order_release);
m_instance.store(tmp, std::memory_order_relaxed);
}
}
return tmp;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::atomic<Singleton*> Singleton::m_instance;
std::mutex Singleton::m_mutex;

Singleton* Singleton::getInstance() {
Singleton* tmp = m_instance.load();
if (tmp == nullptr) {
std::lock_guard<std::mutex> lock(m_mutex);
tmp = m_instance.load();
if (tmp == nullptr) {
tmp = new Singleton;
m_instance.store(tmp);
}
}
return tmp;
}

无等待

无等待(Wait-Free)是比锁无关更高层面的并发。无等待指的是程序中的每个线程都可以一直运行下去而不阻塞。

并发编程中数据共享措施

常见的并发模型有共享变量、Communicating Sequential Process(CSP)、Actor等模型。共享变量最为常见,多个线程通过锁或者原子操作对变量进行有序访问。CSP模型可以参照Go语言中的channel。Actor模型可以参照Mapreduce模型。

condition variable

lock_guardunique_lock能够保证线程之间的互斥访问临界资源,但是不能控制这些访问的先后顺序。自然而然地可以想到可以用锁维护一个共享变量记录状态,以实现线程间同步的措施,例如在生产者/消费者模型中用它来描述产品数量。一个比较naive的方式是轮询(poll)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool flag;
std::mutex m;

void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解锁互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再锁互斥量
}
flag = false;
}

注意这里这里仍然是需要锁的,否则可能两个互相竞争的线程同时获得flag。
另一种更好的方法是利用条件变量,条件变量(condition variable)是利用共享的变量进行线程之间同步的一种机制。条件变量维护一个等待列表,相对于前面的轮询,条件变量应用了推的机制,当某一个线程所需要的条件不满足时,它会被阻塞。拥有锁的线程在退出临界区时使用notify_one/notify_all发出信号通知(注意并不是拥有锁才能notify)条件变量,条件变量会唤醒一个/所有正在等待的线程。
使用条件变量等待事件通常是类似下面的形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void wait_for_event()
{
std::unique_lock<std::mutex> uni_lock(mtx);
while (!condition)
{
cv.wait(uni_lock, [](){return condition;}); // wait until condition
}
}
void signal_event()
{
std::unique_lock<std::mutex> uni_lock(mtx);
condition = true;
cv.notify_one();
}
void broadcast_event()
{
std::unique_lock<std::mutex> uni_lock(mtx);
condition = true;
cv.notify_all();
}

其中cv.wait会将线程挂到等待队列上,然后释放锁,并使用进入睡眠阻塞线程,否则带着锁睡觉会死锁。当cv.wait返回时,它会重新获得锁。特别地,上面wait对应的一系列步骤必须要是原子的,否则会造成丢失signal的问题。例如当条件满足!condition时,程序进入cv.wait等待,cv.wait会释放锁并准备进入睡眠,此时一个signal产生了,但是线程却并不能收到这个信号。这一现象广泛出现在使用边缘触发(Edge triggered)机制的程序中,常见的边缘触发还有Linux的信号。相对于水平触发(select/poll等),边缘触发只会唤醒已经等待在wait上的线程,因此可能出现丢失信号的问题,例如当notify操作早于wait操作时,这个notify就会丢失了。注意到上面的实现中,cv.notify_onecv.notify_all函数始终是出现在修改condition之后的,这也是为了保证当睡眠线程在收到信号后能够及时观察到条件满足了。
在陈硕大牛的博客中还指出了更多的例子。例如不为signal部分上锁是错误的,因为可能在wait部分的while循环和pthread_cond_wait函数之间发生修改conditionpthread_cond_signal,这样进入pthread_cond_wait的wait部分代码就会丢失这次的signal。不过即使为signal部分上锁还可能丢失信号。原因是生产者和消费者竞争同一把锁虽然能够保证wait和signal是串行的顺序,但可能整个signal过程都在wait过程前面。
使用条件变量时可能发生虚假唤醒(spurious wakeup)的问题,虚假唤醒指的是被wait中阻塞的线程在没有notify的情况下被唤醒,或者一次signal_one唤醒多个线程。虚假唤醒可能发生在多处理器系统和接收Linux信号时,条件变量设计者出于性能因素容忍了虚假唤醒的存在。APUE指出pthread_cond_signal函数可能唤起多个线程。SoF中指出,当等待队列中的Linux线程收到一个系统信号时,会得到虚假唤醒。在另一个回答中,Jon Skeet大神指出其深层次原因是没有任何的保障是一个被唤醒(awakened)的线程一定会被调度(scheduled),很可能当一个等待队列中的线程被唤醒后准备获得锁时,另一个线程已经捷足先登了获得了锁,并且重置了条件condition的值。但注意,即使是虚假唤醒的情况,cv.wait也是在获得锁之后再返回,但这时候条件condition可能已经不满足了,这时候就出现了虚假唤醒。解决虚假唤醒的方案很简单,如上文wait_for_event所示,可以将wait包裹在一个while循环里面。在SF上记录了一番实验,强行产生虚假唤醒。

1
2
3
4
5
con_var.notify_one();
// trigger the spurious wakeup
lock.unlock();
std::this_thread::sleep_for(std::chrono::seconds(2));
condition = true;

可以发现在notify和unlock两个过程之后的两秒内消费者线程已经被唤醒了,但在拿到锁和条件变量后它发现其实condition值并不为true,这就产生了一次虚假唤醒。

事件

Windows中通过事件的机制类似于条件变量的机制。

信号量的实现

future

条件变量的一个重要的应用就是生产者/消费者模型,但对于一些平凡的情况,消费者只等待一次(one-off)来自生产者的结果。一个普遍的场景是启动一个计算线程并异步获取它的结果,不过std::thread不能直接提供获取返回值的方法,此时可以考虑使用全局变量或者传入指针和回调函数。
另一个较为方便的做法是使用std::future来获取异步任务中的结果。从一定意义上讲,future类似于一个callback。C++中提供了std::futurestd::shared_future,可以触发一个或多个事件。当多个线程访问std::future时,需要锁来保护线程安全。

async

如下面代码所示,future常和std::async一并使用,容易看到,它类似于Python中的subprocess.call,是个高层面的封装。

1
2
3
4
5
6
int main()
{
std::future<int> the_answer = std::async(get_integer());
do_other_stuff();
std::cout<< the_answer.get() << std::endl;
}

这里面的std::async用来实现一次异步调用。std::async是一个模板函数,它可以接受一个std::launch类型的参数,其中使用std::launch::defered表示异步调用延迟到.wait().get()再执行。而std::launch::async表示在一个独立线程中运行。默认是std::launch::defered | std::launch::async表示这两个二选一。std::async能够接受函数指针、函数对象(的左值、右值。引用和std::ref),也能够通过类似std::bind一样的机制以引用、std::ref等方式传入对象的上下文。
注意在仅C++11中,future的析构函数可能会阻塞线程

packaged_task

不同于std::asyncstd::packaged_task是个函数对象,这个函数对象有点类似std::function(但std::function还能够复制构造),因此std::packaged_task需要手动调用以运行。

1
2
3
4
5
6
7
std::packaged_task<int()> task(sleep);
auto f = task.get_future();
// 在主线程中运行
task();
// 启动另一个线程运行,注意只能移动packaged_task
std::thread myThread(std::move(task));
f.get();

由于std::packaged_task能移动到某个std::thread中,因此适合用来实现线程池,负责打包待计算的任务。std::packaged_task还可以被用来向另一个线程派发任务。

promise

在上面的两种派发任务-等待获取结果的模型中,我们的主线程是作为等待任务结果的一方,而将执行任务的异步线程则是产生结果的一方。std::future对象由主线程持有,主线程会调用std::async或者packaged_task启动一个任务,并且在需要结果时调用f.get()获得异步线程的结果。对于异步线程来说,自己并不需要额外工作,正常返回。但是这在promise中就不一样了,在使用promise时,异步线程需要显式向主线程设置值p.set_value()或传递异常p.set_exception()

future.then

future.then目前还没有进入C++标准,其风格类似于CPS和Haskell中的Monad,相比先前介绍的三种机制,future.then利用了回调链的机制避免了手动的同步。

co_await、co_yield、co_return

future.then机制降低了流程操控的复杂度,不过写起来仍然很繁琐,而且存在回调地狱的问题。在链式调用的过程中,整个流程难以在中间直接abort,为此Rust先后提出了try!?.这样的机制。截至目前这三个函数尚未进入C++标准,但这种由CS流行开来的异步编程范式其实十分优雅,我们将在一个单独的专题进行讨论。

并发编程中的基础架构

在设计高性能的并发代码时,我们需要注意以下几点:

  1. 充分利用局部性假设,是同一线程中的数据紧密联系
  2. 减少线程上所需的数据量
  3. 让不同线程访问不同位置,避免伪共享

线程安全的队列

线程池

一个简单的线程池的实现需要借助一个线程安全队列。