TiCI 的设计考量

主要介绍 TiCI 的一些实现。

TiCI 要解决什么问题?

拓展 TiFlash 的能力

TiFlash 中已经实现了 Vector Index 和 Inverted Index 了,在一些版本中也支持 FTS 了。但是在 TiFlash 中使用这些索引存在一些问题:

  • TiFlash 中的 Region 的分布随 TiKV 而定,这导致优化空间较小
    • Region 的迁入和迁出会导致重建 Inverted Index 字典和 HNSW 图,开销大。
    • TiFlash 需要和 TiKV 共用一个 PK,导致 TiFlash 无法选择更适合列存的 Sharding 方式。在“PK 的选择”这一章节中,我们会进一步阐述。
  • TiFlash 的并发较低,但本质上,并发低的原因还是和索引有关
  • TiFlash 默认提供了强一致和 SI 的隔离级别,但是这些特性并不是全部必须的,而它们又带来很高的成本,如 MVCC、Learner Read 等。新的引擎能够从架构上摆脱这些臃肿的功能。

支持 Hybrid Index

传统的列存的访问模式是 scan,它是顺序访问一个流,但是可以通过 min-max、延迟物化等特性,去 skip 掉某些 block 的读取,从而减少 IO 开销和编解码的开销。这种方式对于高 selectivity 的数据(也就是留下的数据多)是友好的,但是对于复杂 OR / 模糊匹配 / Top-K 这些场景就不友好了。在 这篇文章的《延迟物化》 这一章节中,举了一个因为 min-max 索引无效,从而退化为全表扫描的例子。

1
2
3
for row in block:
if predicate(row):
aggregate / output

倒排索引的访问模式是 lookup + intersect,它是根据多个条件去反查 doc ID。这种方式实际上是对 term 做点查,而传统行存是对 PK / key 做点查。倒排索引的方式能够返回一组行号,又可以理解为将本来要下推给行存的一部分条件过滤前置了。这样的方式,对于低 selectivity 的数据(也就是留下的数据少)是友好的。

1
2
3
posting = intersect(term1, term2)
for doc in posting:
score(doc)

查询规划核心问题其实是:“哪个 predicate 能最快把 selectivity 降下来?”

  • 先用低 selectivity 条件 缩小候选集
    如倒排索引、主键、稀疏 bitmap
  • 再用高 selectivity 条件 做 scan + aggregate
    如列存、向量化

架构

PK 的选择

关于 TiKV、TiDB、TiFlash 的一些思考 的“为什么 TiFlash 实现 HTAP 基于 Raft?”中介绍了在 HTAP 场景中,因为 TiFlash 使用了 Raft Group 复制,获得了一些 benefit。但是,Raft 会导致 TiFlash 和 TiKV 绑定很死,特别是 TiFlash 没办法换 PK,这就一定程度上影响了 QPS。

使用 TiCI 的架构,用 CDC 同步,可以避免 Raft 那一层的影响。主要体现在可以为列存指定一个新的 PK,并按照新 PK 做 sharding。这里的问题是:

  • 在任何时候,新的 PK 和老的 PK 一定是一一对应的。但是这个对应关系可能会变。例如 (new_key_col1, new_key_col2) -> old_key_col1 中如果 new_key_col2 的值变了,那么它在 TiCI 的分片可能就会变,但是它在 TiKV 的分片是不可能变的。这就体现了一行可能从一个 Shard 移动到另一个 Shard 中。
  • 后续支持 Snapshot Read 就会比较麻烦。
  • Import into 在导入的时候,数据会非常散。

数据源

Push or Pull

TiCI 设计主要以 TiCDC 作为数据源。所以一开始是每个 Shard 会不停 Poll S3 上的某个 prefix,复杂度是 O(shard_count*log_count)。为了减小 LIST 的读放大,对 CDC 的日志进行了编号,这样只需要不停 GET 就行。

但是如果 Shard 很多,那么每个 Shard 的 Poll 依然开销很大。所以,引入了一个 per node 的 Poller,去代理这个 poll 的过程。

延迟

根据 TiCDC 的原理,以 TiCDC 为数据源,很难将延迟降低到秒以内。

Meta Service

Outline

Shard 管理

在 shard 的大小默认设置为 4 GiB,这样对于 80 TiB 的集群规模,会有 20k+ 个 shard。一个 shard 中有多个 fragment,fragment 之间是可以 overlap 的,我们也按照传统方法引入了 leveled compaction。

过多的 shard/fragment 意味更小的 shard/fragment 的 size,这对于查询是不利的:

  • 过多的 fragment,会导致每个 fragment 中的词典会很小,压缩率不高
  • 此外,一个查询如果覆盖多个 fragment,就需要按照每个 fragment 的词典都过滤一遍,再 merge,这样并发上不去

设计能向后兼容的初始策略

要实现好 split、compaction 等是困难的,但是我们给出了最基础的策略:

  • Split
    根据 shard 的大小以及行数。同时,我们还比较写入速度和 L0 的 Compaction 速度,如果发现 L0 的 Compaction 速度很慢,则会 split。
  • Compaction
    根据 fragment 的数量,以及 segment 的数量,可以按照 level 配置
  • Merge
    根据当前 shard 的大小,以及写入的流量。并且要确保 merge 后不会立即 split。

这里面另一个简化是,除了 merge 是定期检查外,Split 和 Compaction 只在写入后检查。

此外,因为 Compaction 的策略并不是很明确,所以我们在存储格式上,特别注意将 fragment 按照线性表的形式存起来,而将分层信息单独存放。
因为 L0 和 L1 都是 Tiered 策略,所以原有的线性表在没有分层信息的情况下,依然是可以用的。Apply compaction 实际上就是一个 split。这样一来,就使得后续我们如果要支持更复杂的 compaction 策略,就不会涉及到改动已有数据的兼容性。

大量小表

考虑大量小表的场景,Shard 就会很多。因为这些 Shard 的写入带宽都不会很大,所以在无状态架构下,基本定时都会上传一个小文件到 S3 上。

写入路径

S3 的 Slowdown

https://zhuanlan.zhihu.com/p/606711742 来看,prefix 怎么分完全依赖 S3 内部实现。⽽且请求量上去之后,S3 怎么扩容、拆分不同分⽚的流量,⽀撑起所需 qps 的时间没有 qos 保证。

基于我们的需求,做了测试:

因为并发较大,所以要达到精确的 QPS。对此需要关注:

  • 因为不同的 Worker 达到稳定的时间不一致,所以一个 Worker 在并发达到稳定之后,应当能够保持负载
  • 通常需要让每个 Worker 睡眠一段时间来实现精确的 QPS,这里的 sleep 时间不能过长,否则会影响实际的 QPS,因此对于不同并发的要求,Worker 的数量是有合适区间的

测试结论如下:

  • 访问区域影响访问耗时
    从 Local 访问 AWS S3 us-west-2 ⼤概耗时 1.7s,在 us-west-2 EC2 上访问耗时⼤概在 0.1/0.6s。在⼀些情况下,upload 的时间会显著增⻓,达到 1s+,此时 QPS 会受到严重影响。
    在极个别的时间段中,upload 的时间达到 10s+,此时 S3 的可⽤性下降明显,⽆法达到最⾼的 QPS。
    这种情况下,不仅能到 Slowdown 报错,还能看到 InternalError 报错。
  • 先前有过前⼀次“扩容”,也并不会缩短后⼀次“扩容”所需的时间
    这个和 AWS 的客服的回复是不一样的,暂时不知道具体原因。
  • 3000 QPS 的上限是比较准确的,扩容也是根据 prefix 来进行的,速度大概是在秒级别
    prefix 数量为2,最⾼能压到 6500 QPS,达到稳定耗时 30s+。压到 6000 QPS 内,达到稳定耗时在 20s 左右。

Split & Compaction

理论上 Compaction 结束后,遍历所有的 Shard,将其中匹配 compaction.input 的串替换为 compaction.output 就行了。但是因为 TiCI 是 cloud native 的架构,所以 Split 只能是逻辑分裂。因此,它的 manifest 中需要记录每一个 fragment 它到底大概引用了多少比例的数据。那么考虑:

  • Shard 1 开始 Compact
  • Shard 1 分裂为 Shard 2 和 3
  • Shard 1 的 Compaction 完成,需要替换到 Shard 2 和 3 上

此时,很难推断出 Shard 2 和 Shard 3 上应该各有多少行,因为我们不知道 Compaction 到底清理了多少数据。当然,也可以粗略地按照 range 截取一下。

有状态还是无状态的 Worker

使用无状态的 worker,则:

  • 不需要挂载 EBS,成本低
  • 如果要支持高频写入,则对 S3 的 QPS 有要求。后续对 Compaction 的速度也有要求
  • 如果不支持高频写入,则数据的 freshness 就会比较差

使用有状态的 worker,则:

  • 需要挂载一个小容量的 EBS,定期 flush 到 S3
  • 查询的时候,需要从 EBS 上 merge
    类似 TiFlash 的方案

CDC 的延迟问题

CDC 这一套架构的延迟是有上限的,基本上只能保证秒级。其中原理在 “CDC 的延迟”中已经有介绍。因此,我觉得要想获得更好的延迟,就得有一套支持直接写入的 API 和 client。

心跳

分布式架构和高并发相关场景中介绍了心跳的一些基本的概念。这里不再详细论述。

tantivy

保序

同时支持 ASC 和 DESC

Tantivy 的倒排表文件是顺序压缩存储的,例如:

  • doc_id 列表经过 delta-encoded
    如果要支持逆向,则需要保存两个方向的差值,十分麻烦
  • 然后通过 bit-packed 或 block-wise 压缩
  • 压缩的 Block 按顺序写入文件
    如果要支持逆向,则需要能定位到前一个 Block 的 offset。
  • 查找时通过 BlockReader 顺序解码

这么做的目的是因为顺序访问能够提高吞吐量。而对于随机访问,tantivy 主要是通过如下方式进行快速的 seek:

  • skiplist

对于搜索引擎而言,存储空间通常比随机访问代价更便宜,尤其是 posting list 占比主要来自长尾词,常见高频词列表短,存两份并不会显著膨胀。

为什么常见高频词反而消耗 posting list 的物理空间更小?

  • 因为足够高频,所以 delta-encode + bitpack 的优化效果非常好
  • 高频词可能是 Stopword,不会为它生成倒排表
  • 基于布尔逻辑,或者范围过滤的短路逻辑,常常能够跳过这些高频词的 posting list 的读取

内存

tantivy 在读取的时候需要借助 MmapDirectory 结构,它实际上是把 segment 文件 mmap 读的。

问题:

  • 如果要实现 CMEK,则难度相当大,因为 CMEK 在 S3 和本地盘上都需要加密。感觉一种做法是用 gocryptfs 糊一层。

返回流式数据

关于 vibe coding

编程风格和编程理念

这里,我也想进一步探讨编程风格问题,我的想法比较复古:

  • 坚持测试、可观测性
    特别的,对于“莽一版”,需要重视:
    • 写测试的过程,也代表了对需求的理解,对边界情况、对性能的评估。如果不写测试,那么很难有自信说自己写的代码就是完全符合需求的。
    • 莽的目的是能赶快有个版本上线给客户看,因此避免灾难性事故犹在提供功能之上。
  • 相比于论证架构的性能上限,我更看重先论证架构的安全性
    可以参考跨 Region 提交事务?这一章。
  • 强调建立机制,而不是引入人力
    人写代码就是会犯错的,而机器就是比人少犯错。因此应当:
    • 引入检查,让“烂”代码无法通过 review
    • 强制有测试才能合入代码
  • 不隐藏复杂度,只封装复杂度
    系统的复杂度是由需求定义的,一个复杂的需求一定导致一个复杂的系统设计。
    这里“隐藏”复杂度指的是下列行为:

关于 vibe coding,我认为有常见如下的 pitfall:

  • 用一个单薄的实现去解决一个复杂场景
  • 用一个臃肿实现去解决一个简单场景
  • 认识不到风险场景,缺乏管控
  • 写出的代码无法在系统中迭代维护