tokio channel 实现

基于 tokio 1.46.0 版本

mpsc

mpsc 有 bounded 和 unbounded 两种形式。通过不同的 semaphore 来区别。

Chan 结构

对于 unbounded semaphore,其最低的 bit 表示这个 channel 有没有关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// ===== impl Semaphore for (::Semaphore, capacity) =====

impl Semaphore for bounded::Semaphore {
fn add_permit(&self) {
self.semaphore.release(1);
}

fn add_permits(&self, n: usize) {
self.semaphore.release(n)
}

fn is_idle(&self) -> bool {
self.semaphore.available_permits() == self.bound
}

fn close(&self) {
self.semaphore.close();
}

fn is_closed(&self) -> bool {
self.semaphore.is_closed()
}
}

// ===== impl Semaphore for AtomicUsize =====

impl Semaphore for unbounded::Semaphore {
fn add_permit(&self) {
let prev = self.0.fetch_sub(2, Release);

if prev >> 1 == 0 {
// Something went wrong
process::abort();
}
}

fn add_permits(&self, n: usize) {
let prev = self.0.fetch_sub(n << 1, Release);

if (prev >> 1) < n {
// Something went wrong
process::abort();
}
}

fn is_idle(&self) -> bool {
self.0.load(Acquire) >> 1 == 0
}

fn close(&self) {
self.0.fetch_or(1, Release);
}

fn is_closed(&self) -> bool {
self.0.load(Acquire) & 1 == 1
}
}

无论是 unbounded 还是 bounded,最终都是到 chan::Tx 和 chan::Rx 这两个结构里面。这两个类都持有一个 Arc<Chan<T, S>>

我们会看到有两个 Tx:

  1. chan::Tx 是一个 Arc<Chan<T, S>>,它实际上封装了 Chan,更上层一点
  2. list::Tx 是一个 Block 的链表。它是 Chan<T, S> 的 filed,更底层一点

list::tx 是一个无锁队列。这个队列的内存是以 Block 为基础分配的,每个 block 能装 const BLOCK_CAP: usize = 32; 个 Value。所以,Chan 中实现了解耦:

  • list 模块只负责无锁队列的实现
  • Chan 的其他部分负责容量控制、notify/waker 的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
pub(super) struct Chan<T, S> {
/// Handle to the push half of the lock-free list.
tx: CachePadded<list::Tx<T>>,

/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: CachePadded<AtomicWaker>,

/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,

/// Coordinates access to channel's capacity.
semaphore: S,

/// Tracks the number of outstanding sender handles.
///
/// When this drops to zero, the send half of the channel is closed.
tx_count: AtomicUsize,

/// Tracks the number of outstanding weak sender handles.
tx_weak_count: AtomicUsize,

/// Only accessed by `Rx` handle.
rx_fields: UnsafeCell<RxFields<T>>,
}

block 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#[cfg(all(target_pointer_width = "64", not(loom)))]
const BLOCK_CAP: usize = 32;

#[repr(transparent)]
struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);

pub(crate) struct Block<T> {
/// The header fields.
header: BlockHeader<T>,

/// Array containing values pushed into the block. Values are stored in a
/// continuous array in order to improve cache line behavior when reading.
/// The values must be manually dropped.
values: Values<T>,
}

/// Extra fields for a `Block<T>`.
struct BlockHeader<T> {
/// The start index of this block.
///
/// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
start_index: usize,

/// The next block in the linked list.
next: AtomicPtr<Block<T>>,

/// Bitfield tracking slots that are ready to have their values consumed.
ready_slots: AtomicUsize,

/// The observed `tail_position` value *after* the block has been passed by
/// `block_tail`.
observed_tail_position: UnsafeCell<usize>,
}

看 Block::grow,就是一个无锁链表的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
let mut curr = next;

// TODO: Should this iteration be capped?
loop {
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };

curr = match actual {
Ok(()) => {
return next;
}
Err(curr) => curr,
};

crate::loom::thread::yield_now();
}

为什么是关于 Block 的无锁队列?

为什么是关于 Block 的无锁队列,而不是关于 Value 的呢?

  1. 减少锁竞争和原子操作 (Reduced Contention and Atomic Operations)
    集中操作 (Batch Operations): 在高并发场景下,如果每次发送一个 Value 就需要对共享队列(如链表或原子指针)进行一次原子操作(例如 CAS - Compare-and-Swap)来添加节点,那么多个发送者 (Multi-Producer) 之间的竞争会非常激烈,导致性能瓶颈。
    Block 机制: 使用 Block(块),每个 Block 中可以存放多个 Value。这样,发送者可以一次性分配一个 Block,并填充多个 Value。在将这个 Block 链接到队列末尾时,只需要进行一次原子操作来更新队列的尾指针。这大大减少了对核心共享数据结构(队列头/尾)的原子操作次数,从而降低了锁竞争和系统开销。
    局部性 (Locality): 一旦一个发送者成功获取并填充了一个 Block,它就可以在没有竞争的情况下,向该 Block 中写入若干条消息,这利用了 CPU 缓存的局部性原理。
  2. 优化内存分配 (Optimized Memory Allocation)
    批量分配: 每次发送一个 Value 就进行一次内存分配是低效的。Block 允许一次性分配一块较大的内存,用于存储多个 Value。
    更少的元数据: 如果每条 Value 都是一个独立的链表节点,那么每个节点都需要存储一个指针(指向下一个节点)作为元数据。在 Block 方案中,只有 Block 之间有链接指针,一个 Block 内部的多个 Value 可以紧凑存储,减少了内存开销。

channel 创建

send 实现

unbounded

unbounded 的 send 的实现,可以看到,最终调用了 Chan 的 send,后面会详细介绍。

1
2
3
4
5
6
7
8
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
if !self.inc_num_messages() {
return Err(SendError(message));
}

self.chan.send(message);
Ok(())
}

inc_num_messages 的逻辑如下,主要是每次增加 2,保证末尾是 0

1
2
3
4
5
if curr & 1 == 1 {
return false;
}

.compare_exchange(curr, curr + 2, AcqRel, Acquire)

bounded

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
crate::trace::async_trace_leaf().await;

if n > self.max_capacity() {
return Err(SendError(()));
}
match self.chan.semaphore().semaphore.acquire(n).await {
Ok(()) => Ok(()),
Err(_) => Err(SendError(())),
}
}

pub fn max_capacity(&self) -> usize {
self.chan.semaphore().bound
}

pub(crate) fn acquire(&self, num_permits: usize) -> Acquire<'_> {
Acquire::new(self, num_permits)
}

Chan 的 send 实现

Chan 的 send 的实现如下

1
2
3
4
5
6
7
fn send(&self, value: T) {
// Push the value
self.tx.push(value);

// Notify the rx task
self.rx_waker.wake();
}

这个 rx_waker 是一个 AtomicWaker 对象。

1
2
3
4
pub(crate) struct AtomicWaker {
state: AtomicUsize,
waker: UnsafeCell<Option<std::task::Waker>>,
}

Reference