基于 tokio 1.46.0 版本
mpsc
mpsc 有 bounded 和 unbounded 两种形式。通过不同的 semaphore 来区别。
Chan 结构
对于 unbounded semaphore,其最低的 bit 表示这个 channel 有没有关闭。
1 | // ===== impl Semaphore for (::Semaphore, capacity) ===== |
无论是 unbounded 还是 bounded,最终都是到 chan::Tx 和 chan::Rx 这两个结构里面。这两个类都持有一个 Arc<Chan<T, S>>。
我们会看到有两个 Tx:
- chan::Tx 是一个
Arc<Chan<T, S>>,它实际上封装了 Chan,更上层一点 - list::Tx 是一个 Block 的链表。它是
Chan<T, S>的 filed,更底层一点
list::tx 是一个无锁队列。这个队列的内存是以 Block 为基础分配的,每个 block 能装 const BLOCK_CAP: usize = 32; 个 Value。所以,Chan 中实现了解耦:
- list 模块只负责无锁队列的实现
- Chan 的其他部分负责容量控制、notify/waker 的逻辑
1 | pub(super) struct Chan<T, S> { |
block 的实现
1 |
|
看 Block::grow,就是一个无锁链表的实现
1 | let mut curr = next; |
为什么是关于 Block 的无锁队列?
为什么是关于 Block 的无锁队列,而不是关于 Value 的呢?
- 减少锁竞争和原子操作 (Reduced Contention and Atomic Operations)
集中操作 (Batch Operations): 在高并发场景下,如果每次发送一个 Value 就需要对共享队列(如链表或原子指针)进行一次原子操作(例如 CAS - Compare-and-Swap)来添加节点,那么多个发送者 (Multi-Producer) 之间的竞争会非常激烈,导致性能瓶颈。
Block 机制: 使用 Block(块),每个 Block 中可以存放多个 Value。这样,发送者可以一次性分配一个 Block,并填充多个 Value。在将这个 Block 链接到队列末尾时,只需要进行一次原子操作来更新队列的尾指针。这大大减少了对核心共享数据结构(队列头/尾)的原子操作次数,从而降低了锁竞争和系统开销。
局部性 (Locality): 一旦一个发送者成功获取并填充了一个 Block,它就可以在没有竞争的情况下,向该 Block 中写入若干条消息,这利用了 CPU 缓存的局部性原理。 - 优化内存分配 (Optimized Memory Allocation)
批量分配: 每次发送一个 Value 就进行一次内存分配是低效的。Block 允许一次性分配一块较大的内存,用于存储多个 Value。
更少的元数据: 如果每条 Value 都是一个独立的链表节点,那么每个节点都需要存储一个指针(指向下一个节点)作为元数据。在 Block 方案中,只有 Block 之间有链接指针,一个 Block 内部的多个 Value 可以紧凑存储,减少了内存开销。
channel 创建
send 实现
unbounded
unbounded 的 send 的实现,可以看到,最终调用了 Chan 的 send,后面会详细介绍。
1 | pub fn send(&self, message: T) -> Result<(), SendError<T>> { |
inc_num_messages 的逻辑如下,主要是每次增加 2,保证末尾是 0
1 | if curr & 1 == 1 { |
bounded
1 | async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> { |
Chan 的 send 实现
Chan 的 send 的实现如下
1 | fn send(&self, value: T) { |
这个 rx_waker 是一个 AtomicWaker 对象。
1 | pub(crate) struct AtomicWaker { |