主要介绍 TiCI 的一些实现。
TiCI 要解决什么问题?
拓展 TiFlash 的能力
TiFlash 中已经实现了 Vector Index 和 Inverted Index 了,在一些版本中也支持 FTS 了。但是在 TiFlash 中使用这些索引存在一些问题:
- TiFlash 中的 Region 的分布随 TiKV 而定,这导致优化空间较小
- Region 的迁入和迁出会导致重建 Vector Index 字典和 HNSW 图,开销大。理想状态下,一个 Index 应该有一个自己的结构,以及自己的分区策略,而不是始终跟随者 TiKV 的策略进行 sharding。
- TiFlash 需要和 TiKV 共用一个 PK,导致 TiFlash 无法选择更适合列存的 Sharding 方式。在“PK 的选择”这一章节中,我们会进一步阐述。
- TiFlash 的并发较低,但本质上,并发低的原因还是和索引有关。
- TiFlash 默认提供了强一致和 SI 的隔离级别,但是这些特性并不是全部必须的,而它们又带来很高的成本,如 MVCC、Learner Read 等。新的引擎能够从架构上摆脱这些臃肿的功能。
- 半结构化的数据如 json,非结构化的数据如文本,TiFlash 不能有效处理。
- 我们需要一个真正的索引,而 TiFlash 更像是一种数据格式或者物化视图。这里也很大程度上是因为数据格式导致的。
支持 Hybrid Index
传统的列存的访问模式是 scan,它是顺序访问一个流,但是可以通过 min-max、延迟物化等特性,去 skip 掉某些 block 的读取,从而减少 IO 开销和编解码的开销。这种方式对于高 selectivity 的数据(也就是留下的数据多)是友好的,但是对于复杂 OR / 模糊匹配 / Top-K 这些场景就不友好了。在 这篇文章的《延迟物化》 这一章节中,举了一个因为 min-max 索引无效,从而退化为全表扫描的例子。
1 | for row in block: |
倒排索引的访问模式是 lookup + intersect,它是根据多个条件去反查 doc ID。这种方式实际上是对 term 做点查,而传统行存是对 PK / key 做点查。倒排索引的方式能够返回一组行号,又可以理解为将本来要下推给行存的一部分条件过滤前置了。这样的方式,对于低 selectivity 的数据(也就是留下的数据少)是友好的。
1 | posting = intersect(term1, term2) |
查询规划核心问题其实是:“哪个 predicate 能最快把 selectivity 降下来?”
- 先用低 selectivity 条件 缩小候选集
如倒排索引、主键、稀疏 bitmap - 再用高 selectivity 条件 做 scan + aggregate
如列存、向量化
架构
PK 的选择
在 关于 TiKV、TiDB、TiFlash 的一些思考 的“为什么 TiFlash 实现 HTAP 基于 Raft?”中介绍了在 HTAP 场景中,因为 TiFlash 使用了 Raft Group 复制,获得了一些 benefit。但是,Raft 会导致 TiFlash 和 TiKV 绑定很死,特别是 TiFlash 没办法换 PK,这就一定程度上影响了 QPS。
而行存和列存在访问模式上天然是有差别的,行存以点查和事务为主,通过打散热点可以获得更好的 QPS,而列存以扫表和聚合为主,更希望数据有空间局部性。
使用 TiCI 的架构,用 CDC 同步,可以避免 Raft 那一层的影响。主要体现在可以为列存指定一个新的 PK,并按照新 PK 做 sharding。这里的问题是:
- 在任何时候,新的 PK 和老的 PK 一定是一一对应的。但是这个对应关系可能会变。例如
(new_key_col1, new_key_col2) -> old_key_col1中如果new_key_col2的值变了,那么它在 TiCI 的分片可能就会变,但是它在 TiKV 的分片是不可能变的。这就体现了一行可能从一个 Shard 移动到另一个 Shard 中。 - 后续支持 Snapshot Read 就会比较麻烦。
- Import into 在导入的时候,数据会非常散。
一些人会 argue 认为 (new_key) -> (old_key) 的对应关系可能会变。这会导致一行数据在 TiCI 内部发生跨 Shard 移动。这种 “逻辑上的行移动” 在 CDC 异步同步下,对最终一致性会产生考验。然而这种观点并不正确:
- 跨 shard 移动实际被建模成一条 CDC UPDATE 的两个效果:
- 对旧 TiCI key 发 tombstone:delete(old_key, commit_ts)
- 对新 TiCI key 发 upsert:put(new_key, row, commit_ts)
- CDCLog 会用 CDC 的 old 字段算 old_sharding_key,如果 sharding key 变了,cdc_file_reader 先把 delete_doc(old_key, version) 发到旧 shard,再把新 doc 发到新 shard。每个 doc 都带同一个 commit_ts/version,后续 buffer/compaction 按 key 保留最大 version,并保留/清理 delete mark。
- 如前面所说,在强一致性上,受到不同 shard 上写入进度是异步的影响,会存在不一致窗口。
- 如果先写新 shard,则会重复
- 如果先删老 shard,则会漏
- 实现强一致,需要借助于 ts
数据源
Push or Pull
TiCI 设计主要以 TiCDC 作为数据源。所以一开始是每个 Shard 会不停 Poll S3 上的某个 prefix,复杂度是 O(shard_count*log_count)。另外,LIST 的价格和 PUT 是一个级别,大约为 GET 的 10 倍还多。为了减小 LIST 的读放大,对 CDC 的日志进行了编号,这样只需要不停 GET 就行。但是如果 Shard 很多,那么每个 Shard 的 Poll 依然开销很大。所以,引入了一个 per node 的 Poller,去代理这个 poll 的过程。
随后发现,在一些 DDL 之后,CDC 的目录会发生变更。因此,对于这种情况,需要通过 LIST 找到新的 prefix。而这种 fallback 就会对要求在一定阈值之后,就 LIST 一次,这给低频更新的表造成了很大的负担。优雅的做法是支持 Push,但这个需要 CDC 的配合。我觉得比较好的 workaround,是 CDC 提供一个 api 供 TiCI poll,而不是 Poll S3。
实时性和延迟
根据 TiCDC 的原理,以 TiCDC 为数据源,很难将延迟降低到秒以内。
TiCI 的取舍不是“每条更新立即可搜索”,而是 CDC → 小批量 fragment → meta seq → reader 本地 refresh 的 bounded-staleness 模型。它用批量化降低写入和搜索端刷新成本,用 seq/heartbeat/cache 确认可搜索版本。
和 MySQL 的兼容
Meta Service
Outline
Shard 管理
在 shard 的大小默认设置为 4 GiB,这样对于 80 TiB 的集群规模,会有 20k+ 个 shard。一个 shard 中有多个 fragment,fragment 之间是可以 overlap 的,我们也按照传统方法引入了 leveled compaction。
过多的 shard/fragment 意味更小的 shard/fragment 的 size,这对于查询是不利的:
- 过多的 fragment,会导致每个 fragment 中的词典会很小,压缩率不高。
- 此外,一个查询如果覆盖多个 fragment,就需要按照每个 fragment 的词典都过滤一遍,再 merge,这样并发上不去。
设计能向后兼容的初始策略
要实现好 split、compaction 等是困难的,但是我们给出了最基础的策略:
- Split
根据 shard 的大小以及行数。同时还比较写入速度和 L0 的 Compaction 速度,如果发现 L0 的 Compaction 速度很慢,则会 split。 - Compaction
根据 fragment 的数量,以及 segment 的数量,可以按照 level 配置。 - Merge
根据当前 shard 的大小,以及写入的流量。并且要确保 merge 后不会立即 split。
这里面另一个简化是,除了 merge 是定期检查外,Split 和 Compaction 只在写入后检查。
此外,因为 Compaction 的策略并不是很明确,所以在存储格式上,特别注意将 fragment 按照线性表的形式存起来,而将分层信息单独存放。
因为 L0 和 L1 都是 Tiered 策略,所以原有的线性表在没有分层信息的情况下,依然是可以用的。Apply compaction 实际上就是一个 split。这样一来,后续要支持更复杂的 compaction 策略,就不会涉及到改动已有数据的兼容性。
大量小表
主要有几个问题:
- Shard 就会很多
- 碎片
因为这些 Shard 的写入带宽都不会很大,所以在无状态架构下,基本定时都会上传一个小文件到 S3 上。这会产生大量的小文件。 - CDC LIST
LIST 会有很大压力。 - CDC log advance
但是因为每个表是一个单独的 cdc prefix,所以相比于单个大表,这些 shard 上对应的 append empty 的场景不会很多。因此在 shard 数量相同的情况下,meta 的写入压力要小于单个大表。
- 碎片
写入路径
S3 的 Slowdown
从 https://zhuanlan.zhihu.com/p/606711742 来看,prefix 怎么分完全依赖 S3 内部实现。⽽且请求量上去之后,S3 怎么扩容、拆分不同分⽚的流量,⽀撑起所需 qps 的时间没有 qos 保证。
基于需求,做了测试:
- https://github.com/CalvinNeo/terraform-cloud-native-tiflash/tree/test-s3
- https://github.com/pingcap-inc/tici/pull/344
因为并发较大,所以要达到精确的 QPS。对此需要关注:
- 因为不同的 Worker 达到稳定的时间不一致,所以一个 Worker 在并发达到稳定之后,应当能够保持负载。
- 通常需要让每个 Worker 睡眠一段时间来实现精确的 QPS,这里的 sleep 时间不能过长,否则会影响实际的 QPS,因此对于不同并发的要求,Worker 的数量是有合适区间的。
测试结论如下:
- 访问区域影响访问耗时
从 Local 访问 AWS S3 us-west-2 ⼤概耗时 1.7s,在 us-west-2 EC2 上访问耗时⼤概在 0.1/0.6s。在⼀些情况下,upload 的时间会显著增⻓,达到 1s+,此时 QPS 会受到严重影响。
在极个别的时间段中,upload 的时间达到 10s+,此时 S3 的可⽤性下降明显,⽆法达到最⾼的 QPS。
这种情况下,不仅能到 Slowdown 报错,还能看到 InternalError 报错。 - 先前有过前⼀次“扩容”,也并不会缩短后⼀次“扩容”所需的时间
这个和 AWS 的客服的回复是不一样的,暂时不知道具体原因。 - 3000 QPS 的上限是比较准确的,扩容也是根据 prefix 来进行的,速度大概是在秒级别
prefix 数量为2,最⾼能压到 6500 QPS,达到稳定耗时 30s+。压到 6000 QPS 内,达到稳定耗时在 20s 左右。
Split & Compaction
理论上 Compaction 结束后,遍历所有的 Shard,将其中匹配 compaction.input 的串替换为 compaction.output 就行了。但是因为 TiCI 是 cloud native 的架构,所以 Split 只能是逻辑分裂。因此,它的 manifest 中需要记录每一个 fragment 它到底大概引用了多少比例的数据。那么考虑:
- Shard 1 开始 Compaction
- Shard 1 分裂为 Shard 2 和 3
- Shard 1 的 Compaction 完成,需要替换到 Shard 2 和 3 上
此时,很难推断出 Shard 2 和 Shard 3 上应该各有多少行,因为我们不知道 Compaction 到底清理了多少数据。当然,也可以粗略地按照 range 截取一下。
Compaction 的策略
在 LevelDB 之 Compaction 实现 中主要介绍了两种 LSM 树的构造:
- Tiered:同一层中的 sst 按照创建的时间进行排序,组成多个 run。这种方式不保证上层的数据一定比下层的更新。
- Leveled:同一层中的 sst 按照 user key(或者叫 pk)进行排序,组成一个 run。这种方式在 Compaction 的时候,需要同时重写下层 overlap 的 sst。
TiCI 中的 Fragment 大致对应 sst,目前将 Manifest 作为一个线性表来存储。而 Compaction 就是选择一段区间的 Fragments,将它们合并为一个 Fragment 后,再 splice 回去,实际上就是 Tiered 的策略。出于为了实现简单的“合并大小相近的 Fragment”的策略,所以人为分了两层,但实际上是不必要的。因此,在实现上,我并没有把分层的策略写死,而是做成了外挂式的。
如果要实现 Leveled 策略,那么 Shard Compactor 就需要感知到下层的 sst 的排布,对实现要求较高。而如果沿用现在的 Tiered 策略,则 Shard Reader 本身就有一个 dedup 的操作,读放大不是那么糟糕。另外,考虑到倒排索引或者全文索引的特性,我们实际上是希望 Fragment 能尽可能大一点的,而 Leveled 策略需要分出很多相等大小的 sst,所以 Tiered 策略也是更优的。
如果基于 Tiered 构造 LSM 树,则分层显得不必要且不灵活。依旧可以将 Manifest 看做一个头部写入的有序的线性表,表中的元素是 Fragment。要实现的方案需要做到:
每次 Compaction 必须选择连续的 Fragment,然后用合并完的 Fragment 进行 Splice 替换,这样能够保留原有的先后关系
需要保证合并之后的 Fragment 大小是相近的
例如对于下面的 Manifest1
[1,1,3,3,3,3,10,10]
那么 Compaction 的结果可以是
1
[2,3,3,3,3,10,10]
或者
1
[1,1,3,9,10,10]
而下面这种“头重脚轻”的就不是很好了
1
[14,10,10]
如果写入很频繁,那么要保证 Manifest 的尾部也能被 Compact 到
总而言之,我提出了一个 Uni Compaction 的方案,可以参考 https://github.com/pingcap-inc/tici/pull/971 中的设计。
关于“Full Compaction”
Full Compaction 突然会变得非常重要,当我们发现:
- Base 上的 Fragment 会一直逻辑分裂,导致读取性能很差
从性质上来说:
- Base 层会“好”一点,主要体现在虽然它有重复的 PK,但它们的值是一样的,所以大概能有一个 run。
- Delta 层是全序的,每一个 Fragment 都需要带一个 delete set,表示它会删除前面。
我曾经提出一个方案,想直接根据 delete set 为空来分割 Base 和 Delta。但并不是 delete set 为空,就是 base 了。例如 delta 的前几个 fragment,也可能就全是 insert。当然,因为全是 insert,所以把这一部分 delta 放到 base 也是安全的(考虑到我们认为 Delta 始终比 Base 新)。
在实现 Full Compaction 的时候,通常实现就是引入一个原子的 Switch 操作。这个操作非常轻,只是一个保证,表示后续不会再往 base 写入数据了。在这个操作之后,Base 和 Delta 就会合并。但遇到的最大讨论点是在 Compaction 之后,是否还需要区分 Base 和 Delta。
最终方案是:
- 只在内存中维护 Base 和 Delta,持久化的时候合并为一个以保持灵活性。记录一个 offset 来区分。这里的灵活性指的是,即使丢失 offset,数据也是完整的,因为这是一个全序的列表。我们完全可以根据有没有 delete set 去重新划分一个新的 Base 和 Delta。
- 在 Switch 之后,Base 就不可以写了,并且会把 Base 都 prepend 到 Delta 前面。在下次 Compaction 的时候,会按照 Uni Compaction 的方法生成 Compaction。
- 但是会对 Compaction 进行校验:
- Full Compaction:这种 Compaction 的产物不需要保留 delete set,其实也不应该有。需要满足已经 Switch 且 Base 为空,并且这个 Compaction 是从 0 开始的。
- Delta Compaction:这种 Compaction 需要保留 delte set。包含
[b2, ..., bn, d0, d1]这种包含了部分 base 的情况。
关于 Base 和 Delta 的 TSO
有状态还是无状态的 Worker
使用无状态的 worker,则:
- 不需要挂载 EBS,成本低
- 如果要支持高频写入,则对 S3 的 QPS 有要求。后续对 Compaction 的速度也有要求
- 如果不支持高频写入,则数据的 freshness 就会比较差
使用有状态的 worker,则:
- 需要挂载一个小容量的 EBS,定期 flush 到 S3
- 查询的时候,需要从 EBS 上 merge
类似 TiFlash 的方案
CDC 的延迟问题
CDC 这一套架构的延迟是有上限的,基本上只能保证秒级。其中原理在 “CDC 的延迟”中已经有介绍。因此,我觉得要想获得更好的延迟,就得有一套支持直接写入的 API 和 client。
心跳
在分布式架构和高并发相关场景中介绍了心跳的一些基本的概念。这里不再详细论述。
读取路径
Refactor
希望满足:
- lazy 地往 reader 上缓存 shard。
- TiDB 请求 shardcache,⼀定能返回⼀个全部 warmup 好的 range。
- 处理好同步和异步问题
从物理规律上 warmup 和写⼊是异步的。
但是 pubsub 处理写⼊只能同步,处理 warmup 请求也只能同步。 - 【可选】请求 shardcache 的时候,返回完整的 shard。比如如果 1 分裂为 2 和 3,在 2 和 3 都 ready 前,只返回 1。
曾经 vibe 了一版实现,但是存在 bug。我认为这是设计问题,比如:
- 状态信息维护不全,shardcache 总共有 existence 和 allocate 和 ready 三个状态,其两两转换是异步的:
- 必须 ready 才能对 reader 可⻅
- shard 变更 split 和 merge 必须 in-order 处理
- warmup 必须阻塞,最多做到类似 cq 优化
- 现有的逻辑是只维护 existence 态和 ready 态,从原理上就⽆法做到完备的描述整个过程。因此针对于这个⽅案的⼩修⼩补是不够的
- 状态信息更新存在冲突
例如存在很多异步更新:- 读请求:分配未存在的
- 读请求:分配 reader 宕机的
- shardevent: split merge 这些场景中,都需要阻塞整个 pubsub 队列做 warmup,⾮常慢。并且会导致 pubsub 队列阻塞出现 Lag 的报错。
- ⼼跳
- 因为 warmup 消息提前修改 reader 端元信息,所以导致⼼跳冲突误删 shard
- 后续可能会做成异步补副本,从⽽引发冲突
复杂的需求导致复杂的设计。良好的设计只能封装/转移复杂性,⽽不能减少复杂性。
详细的讨论在 20260424.7z 中的 《WIP TiCI Reader refactor 讨论.pdf》 里面,因为太细碎,就不 cover 了。
tantivy
为什么选择 tantivy
Tantivy 实际上就是类似 Lucene 这样的实现。Tantivy 是 Rust 写的,对 Rust 项目来说有天然优势:
- 零成本抽象 + 无 GC
避免像 Apache Lucene 那样依赖 JVM 和 GC。 - 内存安全 + 并发友好
Rust 的 ownership 模型让并发写索引更安全。 - 在保持了原来的 term+posting list,以及 segment+merge 的思路后,抛弃了一些陈旧无用的特性
- tantivy 进行了一些性能优化
例如使用了 mmap,尽管如此,我们并不觉得这个是一个比较好的选择,特别是当它被链接到外部程序中的时候。
tantivy 和多版本
tantivy 自己有 delete 机制,但是他没有版本号的东西。
它使用 bitset 来维护 docId 的删除机制。
1 | doc_id: 0 1 2 3 4 5 |
做 merge on read 类似的:
- 从 posting list 拿 docID 们
- 检查 delete bitset
- 跳过被删除的 doc
需要注意,delete 并不是 O(1) 的,因为要遍历 posting list。
然而,TiDB 自己也有自己的 MVCC 版本号机制,在 TiFlash 中表示为 PK, version, delmark,tantivy 的那一套 bitset 机制实际上是作用在这三个 key 之上的。TiDB 会多路归并,计算出那些 PK 要删除(去重),然后根据这些 PK 生成每个 frag 对应的 bitset,读的时候作为查询条件传给 tantivy。
保序
同时支持 ASC 和 DESC
Tantivy 的倒排表文件是顺序压缩存储的,例如:
- doc_id 列表经过 delta-encoded
如果要支持逆向,则需要保存两个方向的差值,十分麻烦。 - 然后通过 bit-packed 或 block-wise 压缩
- 压缩的 Block 按顺序写入文件
如果要支持逆向,则需要能定位到前一个 Block 的 offset。 - 查找时通过 BlockReader 顺序解码
这么做的目的是因为顺序访问能够提高吞吐量。而对于随机访问,tantivy 主要是通过如下方式进行快速的 seek:
- skiplist
对于搜索引擎而言,存储空间通常比随机访问代价更便宜,尤其是 posting list 占比主要来自长尾词,常见高频词列表短,存两份并不会显著膨胀。
为什么常见高频词反而消耗 posting list 的物理空间更小?
- 因为足够高频,所以 delta-encode + bitpack 的优化效果非常好
- 高频词可能是 Stopword,不会为它生成倒排表
- 基于布尔逻辑,或者范围过滤的短路逻辑,常常能够跳过这些高频词的 posting list 的读取
支持
内存
tantivy 在读取的时候需要借助 MmapDirectory 结构,它实际上是把 segment 文件 mmap 读的。
问题:
- 如果要实现 CMEK,则难度相当大,因为 CMEK 在 S3 和本地盘上都需要加密。感觉一种做法是用 gocryptfs 糊一层。
返回流式数据
关于 vibe coding
编程风格和编程理念
这里,我也想进一步探讨编程风格问题,我的想法比较复古:
- 坚持测试、可观测性
特别的,对于“莽一版”,需要重视:- 写测试的过程,也代表了对需求的理解,对边界情况、对性能的评估。如果不写测试,那么很难有自信说自己写的代码就是完全符合需求的。
- 莽的目的是能赶快有个版本上线给客户看,因此避免灾难性事故犹在提供功能之上。
- 相比于论证架构的性能上限,我更看重先论证架构的安全性
可以参考跨 Region 提交事务?这一章。 - 强调建立机制,而不是引入人力
人写代码就是会犯错的,而机器就是比人少犯错。因此应当:- 引入检查,让“烂”代码无法通过 review
- 强制有测试才能合入代码
- 不隐藏复杂度,只封装复杂度
系统的复杂度是由需求定义的,一个复杂的需求一定导致一个复杂的系统设计。
这里“隐藏”复杂度指的是下列行为:
关于 vibe coding,我认为有常见如下的 pitfall:
- 用一个单薄的实现去解决一个复杂场景
- 用一个臃肿实现去解决一个简单场景
- 认识不到风险场景,缺乏管控
- 写出的代码无法在系统中迭代维护