C++ atomic 库

因为《并发编程重要概念及比较》文章过长,所以将其中无锁队列部分拆出来。

std::atomic_flag

std::atomic_flag是C++11原子库的一个基础设施,它被广泛地运用到下面的std::atomic类模板的实现中。std::atomic_flag基于TAS(test-and-set)操作维护了一个布尔量flag,提供了test_and_setclear两个方法:

  1. test_and_set
    尝试原子设置为true,并且返回它持有的先前的值
  2. clear

std::atomic_flag可以保证对flag的写不会冲突,读不会脏读。由于std::atomic_flag原子地维护了一个flag,它常被用来实现自旋锁。下面的代码来自MSVC的atomic库,它在std::atomic_Atomic_copy方法中被调用。

1
2
3
4
5
6
7
8
9
10
11
12
inline void _Lock_spin_lock(
volatile _Atomic_flag_t *_Flag)
{
while (_ATOMIC_FLAG_TEST_AND_SET(_Flag, memory_order_acquire))
_YIELD_PROCESSOR;
}

inline void _Unlock_spin_lock(
volatile _Atomic_flag_t *_Flag)
{
_ATOMIC_FLAG_CLEAR(_Flag, memory_order_release);
}

上面的_YIELD_PROCESSOR取决于有没有__yield关键字,如果没有,就是空的。
此外,容易发现TAS操作也可以通过CAS实现,其代码很简单

1
return InterlockedCompareExchange(&flag, true, false);

在MSVC的标准库实现中test_and_set借助了Interlock互锁访问函数,保证了访问的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 #if defined(_M_ARM) || defined(_M_ARM64)
#define _INTRIN_RELAXED(x) _CONCAT(x, _nf)
#define _INTRIN_ACQUIRE(x) _CONCAT(x, _acq)
#define _INTRIN_RELEASE(x) _CONCAT(x, _rel)
#define _INTRIN_SEQ_CST(x) x
#else /* defined(_M_ARM) || defined(_M_ARM64) */
#define _INTRIN_RELAXED(x) x
#define _INTRIN_ACQUIRE(x) x
#define _INTRIN_RELEASE(x) x
#define _INTRIN_SEQ_CST(x) x
#endif /* defined(_M_ARM) || defined(_M_ARM64) */
inline int _Atomic_flag_test_and_set(volatile _Atomic_flag_t *_Flag,
memory_order _Order)
{ /* atomically test flag and set to true */
switch (_Order)
{
case memory_order_relaxed:
return (_INTRIN_RELAXED(_interlockedbittestandset)(_Flag, 0));

case memory_order_consume:
case memory_order_acquire:
return (_INTRIN_ACQUIRE(_interlockedbittestandset)(_Flag, 0));

case memory_order_release:
return (_INTRIN_RELEASE(_interlockedbittestandset)(_Flag, 0));

case memory_order_acq_rel:
case memory_order_seq_cst:
return (_INTRIN_SEQ_CST(_interlockedbittestandset)(_Flag, 0));

default:
_INVALID_MEMORY_ORDER;
return (0);
}
}
inline void _Atomic_flag_clear(volatile _Atomic_flag_t *_Flag,
memory_order _Order)
{ /* atomically clear flag */
static_assert(sizeof(_Atomic_flag_t) == sizeof(_Uint4_t),
"Unexpected _Atomic_flag_t size");

switch (_Order)
{
case memory_order_relaxed:
case memory_order_release:
case memory_order_seq_cst:
_Atomic_store_4((volatile _Uint4_t *)_Flag, 0, _Order);
break;

default:
_INVALID_MEMORY_ORDER;
break;
}
}

std::atomic

我们已经明白C++中的基本类型并不保证是原子的,例如a++这样的并不一定是原子的,除非编译器愿意实现为lock inc,可是谁会愿意这样锁总线呢?

所以std::atomic<T>定义了一系列具有原子行为类型

对typename T的限制

std::atomic禁用了复制构造函数和复制赋值运算符,相当于不允许复制语义。同时,atomic也是不允许移动的。这是由于这两个操作发生在两个对象间,势必要破坏原子性,而一个std::atomic的所有操作都是原子的。

对于用户自定义类型(UDT)typename Tstd::atomic主模板要求T满足standard layout、trivial default constructor和trivial destructor,即编译器可以使用memcpy等进行bitwise的复制并使用memcmp进行bitwise的比较。在CAS操作的实现中调用了memcpymemcmp。这看起来限制很大,我们希望有用户自定义的复制构造函数,这样我们就可以进行member-wise的操作了,对此想法,C++ Concurrency in Action一书中指出,如果有自定义的复制构造函数,那么就势必要将锁定区域(下文中会交待其实std::atomic的主模板实现中可能会有自旋锁)内的数据交给这些用户代码,如果在用户代码中再使用了锁,就可能产生死锁的现象。

容易发现这样一来,std::atomic<T>里面能放的东西,有很多限制。例如,我们常用的智能指针std::shared_ptr并不能被放到std::atomic里面,但是我们又确实有这个需要,因此标准库通过重载std::atomic_系列函数为std::shared_ptr提供了原子操作的支持。而对于std::atomic类型,我们既可以通过std::atomic_系列函数,也可以通过std::atomic模板中提供了compare_exchange_weakcompare_exchange_strongloadstore等操作。

atomic偏特化版本的实现

一般标准库会对一些大小满足能够直接使用某些处理器的原子指令的类型进行特化,例如指针类型、integral类型和一些用户定义类型。对于指针类型,std::atomic会进行偏特化。原子的指针运算可以通过fetch_开头的函数和相应的operator运算符来实现。对integeral类型std::atomic类模板也会进行特化,其实现类似指针类型,并且添加了对位运算的支持。对于“复杂”的整型计算如乘法,虽然atomic未提供,但可以通过compare_exchange_weak等函数间接实现。从C++20开始,std::atomic类模板提供对浮点类型的特化。注意在这之前,compare_exchange_strong等CAS方法对浮点数可能出现问题,原因显而易见是memcmp的锅,C++浮点数之间比较时甚至都不能使用==,遑论memcmp

atomic主模板的实现

在上文中提到,除了std::atomic_flagstd::atomic<typename T>类模板都是不保证不使用锁的(情况特定于处理器和标准库实现),用户可通过bool is_lock_free()函数判断是否Lockfree的。以PJ Plauger的实现为例,主模板的load()就用了std::atomic_flag实现的自旋锁

1
2
3
4
5
6
7
inline void _Atomic_copy(volatile _Atomic_flag_t *_Flag, size_t _Size,
volatile void *_Tgt, volatile const void *_Src, memory_order _Order)
{
_Lock_spin_lock(_Flag);
_CSTD memcpy((void *)_Tgt, (void *)_Src, _Size);
_Unlock_spin_lock(_Flag);
}

主模板的template<class _Ty> struct atomic的实现继承了_Atomic_base<_Ty, sizeof (_Ty)>
这个_Atomic_base<_Ty, sizeof (_Ty)>模板又继承了_Atomic_impl<_Bytes>模板,其作用相当于把_Atomic_impl<_Bytes>中全void *的东西封装回了_Ty。至此,我们将各个类型的atomic模板类的实现按照长度进行了分类
我们查看最核心的_Atomic_impl<_Bytes>模板,它是和数据字节数相关的,分别对1/2/4/8字节的进行了偏特化。模板里面定义了最重要的_Is_lock_free_Store_Load_Exchange_Compare_exchange_weak_Compare_exchange_strong等操作,全部是void*的。我们刚才看到的_Atomic_copy来自于主模板。

_Atomic_impl<_Bytes>偏特化版本实现

我们下面看看_Atomic_impl<_Bytes>的偏特化版本,如对于一个uint2_t是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  #define _Compiler_barrier()   _ReadWriteBarrier()

#if defined(_M_ARM)
#define _Memory_barrier() __dmb(_ARM_BARRIER_ISH)
#endif /* defined(_M_ARM) */

#if defined(_M_ARM64)
#define _Memory_barrier() __dmb(_ARM64_BARRIER_ISH)
#endif /* defined(_M_ARM64) *

/* _Atomic_load_2 */
inline _Uint2_t _Load_seq_cst_2(volatile _Uint2_t *_Tgt)
{
_Uint2_t _Value;

#if defined(_M_ARM) || defined(_M_ARM64)
_Value = __iso_volatile_load16((volatile short *)_Tgt);
_Memory_barrier();

#else
_Value = *_Tgt;
_Compiler_barrier();
#endif

return (_Value);
}

inline _Uint2_t _Load_relaxed_2(volatile _Uint2_t *_Tgt)
{
_Uint2_t _Value;

#if defined(_M_ARM) || defined(_M_ARM64)
_Value = __iso_volatile_load16((volatile short *)_Tgt);

#else
_Value = *_Tgt;
#endif
return (_Value);
}

inline _Uint2_t _Load_acquire_2(volatile _Uint2_t *_Tgt)
{
return (_Load_seq_cst_2(_Tgt));
}

可以发现在这个偏特化版本的实现中直接借助了处理器提供的设施,而避免了自旋锁的使用。

weak和strong版本的CAS

类似与Windows API的互锁访问函数,atomic库通过bool atomic::compare_exchange_weak(old, value)bool atomic::compare_exchange_strong(old, value)提供了对CAS操作的支持。这两个函数监测该std::atomic中维护的std::atomic_flag _My_flag(在_Atomic_impl模板中定义)的值,如果等于old就改为value,函数返回一个表示修改是否成功的bool量。
我们看一下某一个重载版本的函数签名,容易发现,这里面的参数,并不是std::atomic<T>,而是T&类型的。

1
2
3
bool std::atomic<T>::compare_exchange_weak( T& expected, T desired,
std::memory_order success,
std::memory_order failure ) noexcept;

容易发现这两个函数不总是成功的,因为CPU可能仅对某些类型提供了相应的CAS原子指令,对于其他的类型则必须通过使用自旋锁甚至内核锁来实现。
在这里,需要使用volatile,从而强制从内存中读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
inline int _Atomic_compare_exchange_weak(volatile _Atomic_flag_t *_Flag, size_t _Size, volatile void *_Tgt, volatile void *_Exp, const volatile void *_Src, memory_order _Order1, memory_order _Order2)
{ /* atomically compare and exchange with memory ordering */
int _Result;

_Lock_spin_lock(_Flag);
_Result = _CSTD memcmp((const void *)_Tgt, (const void *)_Exp, _Size) == 0;
if (_Result != 0)
_CSTD memcpy((void *)_Tgt, (void *)_Src, _Size);
else
_CSTD memcpy((void *)_Exp, (void *)_Tgt, _Size);
_Unlock_spin_lock(_Flag);
return (_Result);
}

这两个函数有一些细致的区别,compare_exchange_weak在可能会False Negative,也就是说。这是由于weak允许spurious failure。在某些平台(不错ARM、PowerPC又被点名了)上的CAS是通过LL/SC实现的,而不像x86上那样只通过一条指令,所以可能存在问题。因此使用compare_exchange_weak的时候需要一个循环。注意到这个!expected不是必要的。

1
2
3
bool expected=false;
extern atomic<bool> b; // set somewhere else
while(!b.compare_exchange_weak(expected,true) && !expected);

Stackoverflow上相关问题的整理中提到了这两者之间的性能比较,由于weak会忽视检查,所以一般weak比strong快。但是如果使用strong能避免weak+loop,那么选择strong是适合的。注意到即使使用strong,loop也不是就一定可以避免的,因为原子操作本来就存在使用乐观锁的情况。