LevelDB之Compaction实现

本文介绍LevelDB的SSTable之间的Compaction。Compaction分两种:

  1. Minor Compaction
    对应Memtable到SSTable的过程。
  2. Major Compaction
    对应SSTable文件之间的归并。涉及到两个Level的SSTable文件。
    Major Compaction中还可以细分,比如是否Manual等。对于非Manual,还有seek compaction和size compaction。

在本文中,还会介绍Version和VersionEdit概念,它们有助于理解LevelDB对MVCC的实现。

同样的,文章中的【Q】表示我在阅读源码的过程中产生的疑问,有的我找到的解答,或者自己产生了思考,有的则未必清楚。

我们首先来回顾一下LevelDB的整体架构

之前提到过,当一个Memtable满了之后,会转化为Immutable Memtable。Immutable Memtable会被Dump成SSTable文件,SSTable文件是不可变的。
这里GUARDED_BY(m)实际上是__attribute__(guarded_by(m))这个线程安全注解,方便编译器帮助检查有没有遗漏掉加锁的情况。

1
2
3
4
5
6
class DBImpl : public DB {
...
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
...
}

前置知识

LCS和STCS

有两种Compacton方案:Size-Tiered Compaction Strategy(STCS)和Leveled Compaction Strategy(LCS)。

STCS

Memtable刷成小sstable。当这些小的sstable达到一定个数时,会被compact成一个稍大些的sstable。当稍大些的sstable又达到一定个数时,又会被一起compact成更大的sstable。当然,如果说某些Key的更新频率比较高,那么在Compact的时候只会取最新的Sequence Number,这种情况下,可能不会增加太多。
下图是STCS的一个示意,可以看到,每层的SSTable数量不变,但是大小越来越大

LCS(Classic Leveled)

STCS存在一些问题,是可以被优化的:

  1. 存储放大1
    因为Compaction时,在新SSTable生成前,旧的SSTable不能删除(当然LevelDB中有Version的概念,其实更复杂点),所以可能会造成额外一倍的开销。
    于是我们临机一动,我们增加SSTable数量,而控制大小不变,不就能控制这额外一倍开销的绝对数量么?
  2. 存储放大2
    如果Key更新频繁,可能导致同一个Level以及不同Level中的SSTable中存在相同的Key。这里的Key实际上就是LevelDB里面的user key,而不是带有Sequence Number的InternalKey。
    【Q】为什么不同Level会存在呢?

为此,我们就得到了LCS:

  1. 当Level0层数量达到Level0层阈值时,将这些SSTable和L1层的所有SSTable做Compaction。
    实际上,具体涉及哪些SSTable,在LevelDB中控制更为精细。并且Compaction的条件也更复杂。
  2. 如果Level1层的SSTable数量还是超过L1层的阈值,再把这些超出的SSTable向上做Compaction。
  3. 除了Level0,其他层的所有SSTable中的key都是不重叠的。

下图是LCS的一个示意

我们注意到,LCS中,SSTable的大小不变,但是数量会增多,Level N+1的文件数量是Level N的10倍。【Q】这里看上去和LevelDB的实现还有区别,LevelDB里面的MaxBytesForLevel函数更多的是计算了10倍的大小,Why?
所以,假如Level1有10个文件,Level2就有100个文件。但是key在两个level中都是均匀分布的,因此我Level1拿出一个文件出来,Level2中估计只会有10个文件和它重叠,所以我们只需要合并重叠的这些文件就行了。
当然,Level0彼此重叠,所以还是emmmm。。。

LCS的缺点是写放大会比STCS显著提高。

【Q】既然LCS的写放大高了很多,为什么说基于LSM的写性能很好呢?可能是因为下面几点

  1. SSTable是顺序写,性能好【Q】
  2. 根据RocksDB的文档,在一些情况下写放大不是很严重
    首先是按key顺序的写,对于这种情况RocksDB可以优化。
    其次是有skew的写,会导致只有小部分的key被更新。

Level-N

相比LCS(Classic Leveled)有更高的读放大,和更小的写放大。

Tiered+Leveled

常见文件

需要注意的是,LevelDB是一个单机的数据库,所以实际承载的SSTable文件都位于一台机器上。

文件类型

1
2
3
4
5
6
7
8
9
enum FileType {
kLogFile,
kDBLockFile,
kTableFile,
kDescriptorFile,
kCurrentFile,
kTempFile,
kInfoLogFile // Either the current one, or an old one
};
  1. kLogFile:WAL日志文件,文件名数字.log
  2. kDBLockFile:db锁文件,文件名LOCK
  3. kTableFile:SSTable文件,文件名数字.sst
  4. kDescriptorFile:Manifest文件,存储VersionEdit信息,文件名为MANIFEST-数字
    对应descriptor_file_这个字段。
    Manifest文件中维护了所有的SSTable的key范围,层级,以及其他的元信息。
  5. kCurrentFile:记录当前的Manifest文件,文件名为CURRENT
  6. kTempFile:临时文件,db在修复【?】过程中会产生临时文件,文件名为数字.dbtmp
  7. kInfoLogFile:日志文件,文件名为LOG

Manifest

每一个VersionEdit对应Manifest里面的一个Entry。如下所示,包含

  1. 增加的SSTable
  2. 删除的SSTable
  3. 当前Compaction的下标
  4. 日志文件编号
  5. SeqNumber

Current

记录当前的Manifest文件名。

Version机制

大前提,Compaction过程是通过独立线程异步并发执行的。因此可能出现压缩前后的新老SSTable并存的情况。同时,我们不能立即删除老的SSTable文件,这可能是因为这个SSTable还在被读取,而要等到老SSTable的引用计数为0才行。因此Version机制可以用来辨别这些SSTable的版本。借助于Version机制,也能实现MVCC。

新版本New-Version由Version类和VersionEdit类来描述。即VersionEdit是New-Version相对于Version的改动。

1
New-Version = Version + VersionEdit

LevelDB将所有的Version置于一个双向链表之中,因此所有的Version组成一个名为VersionSet的集合。这个集合也代表了当前DB的状态,包含了最新的Version,以及其他正在服务的Version。

MVCC

根据Wikipedia,MVCC意图解决读写锁造成的多个、长时间的读操作饿死写操作问题。
每个事务读到的数据项都是一个snapshot并依赖于实现的隔离级别。写操作不覆盖已有数据项,而是创建一个新的版本,直至所在操作提交时才变为可见。快照隔离使得事务看到它启动时的数据状态。

具体来说:

  1. 当事务Ti读取对象P时,只有比Ti时间戳更早的最新对象版本是可见的。
  2. 当事务Ti写入对象P时,如果Tk要写同一对象,那么Ti必须早于Tk才能成功。【Q】这个说法就很奇怪,那对称地,Tk不是也要早于Ti么?

VersionEdit

介绍作为桥梁作用的VersionEdit类。这个类里面的方法大部分是用来读写里面的私有成员的,所以只介绍私有成员。

  1. std::string comparator_;
  2. uint64_t log_number_;
    包含

    1
    void SetLogNumber(uint64_t num)

    log文件的file number,也就是000003.log的这个3。
    小于这个值的Log是可以被删除的
    【Q】这个字段的作用是什么呢?
    目前来看,在Recover的时候会用到。
    【Q】为什么VersionSet里面也有?
    其实VersionSet里面的才是主要的,VersionEdit里面的这个字段,是在LogAndApply的时候,由VersionSet设置过来的。
    【Q】这个number,和版本的关系是什么,是一一对应的么?比如一次Compaction之后就要换个log?因为在实现上,可以看到NewFileNumber会产生log(DB::Open)和SSTable(WriteLevel0Table)文件的序列号。

  3. uint64_t prev_log_number_;/bool has_prev_log_number_;
    包括void SetPrevLogNumber(uint64_t num)这个函数。
    这篇文章prev_log_number_已经废弃了,出于兼容性才保留的。
  4. uint64_t next_file_number_;/bool has_next_file_number_;
    下一个可用的file number。VersionSet里面也有类似字段,详细介绍见VersionSet。
    包含

    1
    void SetNextFile(uint64_t num)
  5. SequenceNumber last_sequence_;/bool has_last_sequence_;
    SSTable 中的最大的Sequence Number。VersionSet里面也有个平行的。

  6. bool has_comparator_;
  7. bool has_log_number_;
  8. std::vector<std::pair<int, InternalKey>> compact_pointers_;
    主要用于Major Compaction的时候选择文件。first表示每个level。
    【Q】在Compaction类和VersionSet类里面也有一个这个字段。它们的作用是什么呢?
  9. DeletedFileSet deleted_files_;

    1
    typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;

    pair存储了level和file。表示将第level层中的file删除。

  10. std::vector<std::pair<int, FileMetaData>> new_files_;
    FileMetaData存储了文件大小,以及文件中最小的Key和最大的Key。

Version

相关字段

  1. VersionSet相关
    指向这个Version所属的VersionSet,以及双向链表和引用计数。
    所以说每个Version只能属于一个VersionSet,这个也是很好理解的,

    1
    2
    3
    4
    VersionSet* vset_;  // VersionSet to which this Version belongs
    Version* next_; // Next version in linked list
    Version* prev_; // Previous version in linked list
    int refs_; // Number of live refs to this version
  2. SSTable相关
    files_表示LevelDB中每一层中所有的SSTable的文件信息。
    file_to_compact(_level)_标记下一个要Compact的文件以及属于的Level。

    1
    2
    3
    4
    5
    6
    // List of files per level
    std::vector<FileMetaData*> files_[config::kNumLevels];

    // Next file to compact based on seek stats.
    FileMetaData* file_to_compact_;
    int file_to_compact_level_;
  3. 其他字段
    compaction_score_计算最迫切需要Compaction的Level,所以可以决定是否需要发起Major Compaction。这个分数取决于某一层所有SSTable的大小。
    NeedsCompaction会读取这个字段,计算是否需要根据Version的情况来Compaction,并呈递给MaybeScheduleCompaction

    1
    2
    3
    4
    // Level that should be compacted next and its compaction score.
    // Score < 1 means compaction is not strictly needed.
    double compaction_score_;
    int compaction_level_;

相关函数

  1. int PickLevelForMemTableOutput(const Slice& smallest_user_key, const Slice& largest_user_key);
    给定一个Memtable里面的Key的范围,返回这个Memtable被Dump的话要放到第几层。
  2. Compaction* PickCompaction();
    用来处理size compaction和seek compaction。
    这个函数,在“Compaction主函数”这个章节介绍。
  3. Compaction* CompactRange(int level, const InternalKey* begin, const InternalKey* end);

Version::PickLevelForMemTableOutput

OverlapInLevel

先介绍辅助函数OverlapInLevel,作用是判断范围[smallest_user_key,largest_user_key]和level中的文件有没有Overlap。

1
2
3
4
5
bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
smallest_user_key, largest_user_key);
}

SomeFileOverlapsRange返回files中有没有在范围[smallest_user_key,largest_user_key]中的key。
disjoint_sorted_files表示传入的files里面的key是不是不相交的,一般除了Level0,其他都是不相交的。
AfterFileBeforeFile都比较FileMetaData里面的largest/smallestuser_key()字段。他们的类型是InternalKey,也就是不带Sequence Number和Value Type的。
对于普通情况,对于一个文件f,如果smallest_user_key大于该文件中的最大值,或者largest_user_key小于最小值,那么认为是不重叠的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const std::vector<FileMetaData*>& files,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
const Comparator* ucmp = icmp.user_comparator();
if (!disjoint_sorted_files) {
// Need to check against all files
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
if (AfterFile(ucmp, smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)) {
// No overlap
} else {
return true; // Overlap
}
}
return false;
}

如果是不相交的文件,就可以基于FindFilefiles集合二分查找。
可以思考一下我们用什么做二分的key呢?答案是每个file的largest。我们要找到第一个largest大于等于smallest_user_key的文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
// Binary search over file list
uint32_t index = 0;
if (smallest_user_key != nullptr) {
// Find the earliest possible internal key for smallest_user_key
InternalKey small_key(*smallest_user_key, kMaxSequenceNumber,
kValueTypeForSeek);
index = FindFile(icmp, files, small_key.Encode());
}

if (index >= files.size()) {
// beginning of range is after all files, so no overlap.
return false;
}

二分法找到可能存在的文件files[index]后,不要忘了在判断下这个文件实际有没有overlap。

1
2
  return !BeforeFile(ucmp, largest_user_key, files[index]);
}

主流程

1
2
3
4
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
...

首先判断我们要加入的文件的[smallest_user_key,largest_user_key]和Level0有没有交叠。如果有交叠,就进不了这个if,直接放到第一层,等后面Major Compaction了。
如果没有交叠,我们尝试能否将它下放到config::kMaxMemCompactLevel之前的层。【Q】为什么我们要设置上限kMaxMemCompactLevel呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
// 为什么会有这个?下面讲。
if (level + 2 < config::kNumLevels) {
// Check that file does not overlap too many grandparent bytes.
...
}
level++;
}
}
return level;
}

判断level + 2层情况的分支详解

这里需要着重讲解一下level + 2 < config::kNumLevels这个分支的含义。

作为普通人呢,我觉得判断完OverlapInLevel(level + 1,...就可以直接level++了啊,但是大佬肯定是不平凡的。
大佬觉得现在我们想把文件放到level + 1层,但是要先打住,看看level + 2层是什么情况,也就对应到下面的代码。
我们要计算所有重叠的文件的总大小,如果这个大小超过了阈值,那么我们就不把这个SSTable进行下放。这个调用是防止level + 1和level + 2的重叠范围太大,导致这两层进行Compaction时涉及的SSTable过多,耗时过长。

1
2
3
4
5
6
7
8
if (level + 2 < config::kNumLevels) {
// Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
break;
}
}

于是,先要用GetOverlappingInputs这个函数,计算level + 2层中到底有哪些文件和[smallest_user_key,largest_user_key]有交叠,这些文件会放到overlaps里面。
TotalFileSize这个函数就是对FileMetaData::file_size求和。

GetOverlappingInputs/MaxGrandParentOverlapBytes

GetOverlappingInputs的目标是找到level中和[begin,end]重叠的所有文件,并放到inputs里面。这个函数对Level0有特殊的处理。
user_beginuser_end是从InternalKey中提取出的user key。如果传入nullptr,表示在比较时,begin永远小于任何key。
【Q】这里为什么去找的user key而不是InternalKey呢?貌似很多地方都是找user key。在这篇文章中,作者指出了一个其实我们很容易注意到的性质,就是除了Level0,每一层Level都是有序的。进一步地,由于LevelDB使用leveled策略(LCS),即强调一个key在每一层至多只有1条记录,不存在冗余记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Store in "*inputs" all files in "level" that overlap [begin,end]
void Version::GetOverlappingInputs(int level, const InternalKey* begin,
const InternalKey* end,
std::vector<FileMetaData*>* inputs) {
assert(level >= 0);
assert(level < config::kNumLevels);
inputs->clear();
Slice user_begin, user_end;
if (begin != nullptr) {
user_begin = begin->user_key();
}
if (end != nullptr) {
user_end = end->user_key();
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();

默认,我们遍历这一层的所有的文件。前面两个if分别处理文件和range完全不重叠的情况。

1
2
3
4
5
6
7
8
9
for (size_t i = 0; i < files_[level].size();) {
FileMetaData* f = files_[level][i++];
const Slice file_start = f->smallest.user_key();
const Slice file_limit = f->largest.user_key();
if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
} else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
} else {

否则,就是有重叠的,我们把这个文件加入到inputs里面作为结果返回。对于PickLevelForMemTableOutput的逻辑而言,这里就到此为止了。
但是,这个函数还会在GetOverlappingInputsCompactRangeSetupOtherInputs这些函数中用到,此时,需要处理Level0的逻辑。
且慢,我们已经逐文件遍历了啊,还会有什么问题呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
      inputs->push_back(f);
if (level == 0) {
// Level-0 files may overlap each other. So check if the newly
// added file has expanded the range. If so, restart search.
if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
} else if (end != nullptr &&
user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
}
}
}
}
}

这篇文章中,详细解释了原因。这是因为我们认为Level1的文件是比Level0要旧的,所以如果要把Level0中的某个文件f移动到Level1中,我们要把Level0中所有和fOverlap的文件都放到Level1里面。

我们进一步想,那么在Level0往Level1归并的时候,也要看到这个过程。事实上观看PickCompaction的代码实现,我们也能看到在最后有个if (level == 0)的判断。

这个应当同样解决我们在IsTrivialMove的一个疑问。

所以,当检查到user_begin在文件[file_start,file_limit]中后,需要将user_begin调整为文件的开头file_start。对user_end也是同理的。

VersionSet

成员介绍

  1. Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu)
    这个函数接受一个VersionEdit,将它应用在current_,并借助于VersionSet::Builder生成一个新的Version。Builder类的实现是比较巧妙的,我们会在稍后来讲解。
    此后,它会调用Finalize函数更新compaction_level_compaction_score_
    此后,更新Manifest文件。主要是把VersionEdit中的内容EncodeTo到Manifest文件里面。
    此后,调用AppendVersion设置新的current_
  2. std::string compact_pointer_[config::kNumLevels];
    这个字段在Major Compaction过程中被用到。表示每一层上,下一次Compaction需要开始的key的位置。它要么是一个空串,要么是一个InternalKey。
    【Q】在什么时候被设置呢?
    根据文章,这个compact_pointer_实际上表示这一层上一次Compact时文件的largest。
  3. Status Recover(bool* save_manifest);
    关于Recover机制,我们不在这篇文章中介绍。详见“LevelDB之流程概览”这篇文章。

有关Sequence:

  1. uint64_t LastSequence() const { return last_sequence_; }
    还有个对应的SetLastSequence方法。
    返回最近的Sequence Number。这个是在写入记录的时候会使用并且更新。
    【Q】VersionEdit里面也有个平行的,他们之间的关系是什么呢?
    首先VersionSet的last_sequence_会随着DBImpl::Write操作更新。
    当需要进行Compact的时候,会在LogAndApply中赋给VersionEdit中的对应字段。而VersionEdit的目的,似乎只是持久化这个信息。

有关日志:

  1. prev_log_number_/log_number_
    【Q】和VersionEdit里面同名字段的关系是什么?见VersionEdit的解释。

有关文件编号:

  1. next_file_number_
    包含

    1
    2
    3
    4
    5
    6
    uint64_t NewFileNumber() { return next_file_number_++; }
    void ReuseFileNumber(uint64_t file_number) {
    if (next_file_number_ == file_number + 1) {
    next_file_number_ = file_number;
    }
    }

    这个字段用来生成系统中下个文件的编号。VersionEdit需要在LogAndApply时传入,以persist。
    【Q】这里的file number指的是SSTable的file number么?看起来并不是的,而是Manifest文件、SSTable文件啥的共用一个编号,这也是为什么一开始Log文件是0,Minifest文件是1,SetNextFile是2的原因。

  2. manifest_file_number_;
    表示Manifest文件的编号,主要在Recover时用到

疑问:

  1. VersionSet和DBImpl是一一对应的么?

VersionSet::LogAndApply

在前面已经简单介绍过这个函数的功能了。这个函数主要在下面几个地方用到:

  1. DB::Open
    当DB启动的时候,可能需要从通过DBImpl::Recover从log中恢复一部分数据。这些数据会以VersionEdit的方式被Apply。
  2. DBImpl::CompactMemTable
    Minor Compaction。
    一般在下面的地方调用:
    • BackgroundCompaction
    • DoCompactionWork:也就是在Major Compaction的过程中也要有限处理Minor Compaction。
  3. BackgroundCompaction的非manual情况(平凡情况)
    这种情况只是将某个SSTable移动到别的层。
  4. BackgroundCompaction的manual情况(一般情况)
    需要归并。

下面这里讲解一下源码。
__attribute__((exclusive_locks_required))表示检查在调用LogAndApply函数之前就要持有锁mu。因此同时只会有一个线程执行LogAndApply

1
2
Status LogAndApply(VersionEdit* edit, port::Mutex* mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);

下面是把VersionSet的LogNumber传给VersionEdit。

1
2
3
4
5
6
7
8
9
10
11
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}

if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}

我们要把VersionSet的last_sequence_传给edit,我们先前已经推断过这里的作用了。

1
2
3
4
5
6
7
8
9
10
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);

Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);

下面的descriptor_file_就是一个Manifest文件。
如果此时descriptor_log_是NULL,根据注释,这个对应到首次打开数据库的状态。
DescriptorFileName产生一个"/MANIFEST-%06llu"格式的文件名字。
通过WriteSnapshotdescriptor_log_写到新的Manifest文件里面,这个实际上jiushi Current Version的快照。WriteSnapshot里面也会调用EncodeToAddRecord
【Q】为什么有这个函数?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == nullptr) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}

下面,是把VersionEdit中的内容EncodeTo到Manifest文件里面。这里不是写快照了,而是写一条Log。其实Manifest文件的格式就是Log。
在这里,将写文件的操作都集中在一起,期间是不要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Unlock during expensive MANIFEST log write
{
mu->Unlock();

// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}

// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

mu->Lock();
}

我们现在得到了一个新的Version即v,调用AppendVersion将它设置为current_。这个函数还会将v添加到VersionSet里面的那个双向链表里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  // Install the new version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->RemoveFile(new_manifest_file);
}
}

return s;
}

VersionSet::AppendVersion

这里dummy_versions_是VersionSet维护的环状链表头,dummy_versions_.prev_就是current_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != nullptr) {
current_->Unref();
}
current_ = v;
v->Ref();

// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}

可以通过下面的图清晰看出

VersionSet::Builder

  1. VersionSet* vset_;
    在构造时传入的VersionSet。
  2. Version* base_;
    在构造时传入的,一般为current_
  3. LevelState levels_[config::kNumLevels];
    LevelState里面记录了增加和删除的文件。
  4. void Apply(VersionEdit* edit)
    edit里面的变动应用到current_
  5. void SaveTo(Version* v)

VersionSet::Builder::Apply

首先将VersionEdit记录的compact_pointers_应用到VersionSet。

1
2
3
4
5
6
7
8
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
// Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}

然后把要增加和删除的文件记录到自己的levels_字段里面。

1
2
3
4
5
6
// Delete files
for (const auto& deleted_file_set_kvp : edit->deleted_files_) {
const int level = deleted_file_set_kvp.first;
const uint64_t number = deleted_file_set_kvp.second;
levels_[level].deleted_files.insert(number);
}

在增加文件的时候,需要处理allowed_seeks字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  // Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;

// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = static_cast<int>((f->file_size / 16384U));
if (f->allowed_seeks < 100) f->allowed_seeks = 100;

levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}

VersionSet::Builder::SaveTo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Save the current state in *v.
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added_files = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files->size());
for (const auto& added_file : *added_files) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos =
std::upper_bound(base_iter, base_end, added_file, cmp);
base_iter != bpos; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}

MaybeAddFile(v, level, added_file);
}

// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}

在Debug的状态下,会去检查除Level0之外的层有没有重叠。检查方法也很简单,就是看后一个文件的smallest是不是一定严格大于前一个文件的largest。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i - 1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
std::fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
std::abort();
}
}
}
#endif
}
}

VersionSet::Finalize

1
2
3
4
5
6
7
8
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;

for (int level = 0; level < config::kNumLevels - 1; level++) {
double score;
...

下面是针对第0层的特殊情况。我们知道LevelDB的第0层最多存在4个文件【Q】(我觉得未必,详见kL0_SlowdownWritesTrigger),这就是由kL0_CompactionTrigger控制的。这里使用文件数量,注释里面列了两个原因:

  1. 允许更大的写buffer,从而减少Level0 Compaction的数量。
    这里的写buffer应该是options_.write_buffer_size这个东西。这个阈值控制Memtable何时转换成Immutable Memtable,以及在Recover的时候何时直接dump成SSTable。
    佶屈聱牙,实际上的意思是,这个意思是,如果写buffer太大,如果我们用固定的size限制死了的话,可能Level0的文件数量会很少,比如就1个,这样会导致频繁的Level0 Compaction。
  2. Level0的文件每次读取都会被Merge。我们不希望有很多个小文件(perhaps because of a small write-buffer setting, or very high compression ratios, or lots of overwrites/deletions)。
    如果写buffer很小,这样会导致更多的Level0文件。因为Level0的文件是overlap的,所以如果数量过多,每次查询需要Seek的文件数量就越多。
1
2
3
4
5
...
if (level == 0) {
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger); // ==4
} else {

对于第1层以下的层,计算文件总大小,而不是文件数量了。MaxBytesForLevel的大概意思就是Level1总大小是10M,下面每一层翻10倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
      // Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}

if (score > best_score) {
best_level = level;
best_score = score;
}
}

v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}

MaxBytesForLevel

这个函数计算每一层的最大大小。

1
2
3
4
5
6
7
8
9
10
11
12
static double MaxBytesForLevel(const Options* options, int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.

// Result for both level-0 and level-1
double result = 10. * 1048576.0;
while (level > 1) {
result *= 10;
level--;
}
return result;
}

析构函数

将自己从链表中移除。
对于自己管理的所有文件,引用计数减一。【Q】这边不搞个原子操作么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Version::~Version() {
assert(refs_ == 0);

// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;

// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}

Snapshot机制

我们在这里介绍Snapshot机制,主要是为了方便说明它对Compaction的影响:导致同一个user key的不同的Sequence Number版本存在多个。

Snapshot实际上就是某个特定的Sequence Number。
【Q】Sequence Number是全局递增的么?应该是这样的,在Put和Get的实现中,看到的都是读取的VersionSet::LastSequence()这个。

1
2
3
4
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}

Compaction主函数

总览

调用路径

  1. BackgroundCompaction
    1. BackgroundCall
      1. BGWork
        1. MaybeScheduleCompaction
          会Schedule方法BGWork
          这个函数在BackgroundCall,以及诸如Get等读写方法中都会被调用。

Compaction条件

  1. Minor Compaction
    在Recover过程中ApproximateMemoryUsage检测到Memtable超限,会直接触发对Memtable的Compaction。但这个Compaction是局部的,因为我们在恢复过程中,所以不需要诸如LogAndApply这种维护Version的工作。
    存在Immutable Memtable
  2. Manual Compaction
    CompactRange调用
  3. size_compaction
    VersionSet::PickCompaction中检查并启动。
    当Level0文件数目过多,或者某个Level的总大小过大。
    在函数NeedsCompaction中判断当前Version的compaction_score_(size compaction)和file_to_compact_(seek compaction)。
  4. seek_compaction
    seek次数太多。我们知道,当一个文件找不到时,就需要到高一级的Level中去查找。假如在Level(n)中没找到,但是在Level(n+1)中找到了,就认为Level(n)有一次未命中。容易发现如果未命中次数多了,就说明Level N和Level N+1
    的文件overlap很厉害,这就需要通过一次Major Compaction来解决这个问题。

DBImpl类

LevelDB通过class DB对外暴露C++接口,这个DB的实现就是DBImpl

DBImpl::BackgroundCall

BackgroundCall是在后台线程中执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
BackgroundCompaction();
}

background_compaction_scheduled_ = false;

// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();

MakeRoomForWrite函数会在background_work_finished_signal_等待Compaction结束。

1
2
  background_work_finished_signal_.SignalAll();
}

DBImpl::MaybeScheduleCompaction

函数MaybeScheduleCompaction决定是否进行Compaction。
这里需要加锁,不然可能会导致开两个后台进程,而LevelDB只允许一个后台进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}

PosixEnv::Schedule

这里的env_的实现实际上是一个PosixEnv
我们查看源码,原来这个后台进程只有一个started_background_thread_,一开始先检查它是否存在,如果不存在,就创建一个,然后detach掉。
接下来就是一个生产者消费者模式。不过有点奇怪,是先Signal,再入队,不应该先修改条件,再Signal么。
我在文章中提过陈硕大佬的一篇博客,在CV语境中,先Signal,再设置条件flag(代码里面的Case 6)也是可以的,但只限于单waiter使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void PosixEnv::Schedule(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
background_work_mutex_.Lock();

// Start the background thread, if we haven't done so already.
if (!started_background_thread_) {
started_background_thread_ = true;
std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
background_thread.detach();
}

// If the queue is empty, the background thread may be waiting for work.
if (background_work_queue_.empty()) {
background_work_cv_.Signal();
}

background_work_queue_.emplace(background_work_function, background_work_arg);
background_work_mutex_.Unlock();
}

下面放一下消费者的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void PosixEnv::BackgroundThreadMain() {
while (true) {
background_work_mutex_.Lock();

// Wait until there is work to be done.
while (background_work_queue_.empty()) {
background_work_cv_.Wait();
}

assert(!background_work_queue_.empty());
auto background_work_function = background_work_queue_.front().function;
void* background_work_arg = background_work_queue_.front().arg;
background_work_queue_.pop();

background_work_mutex_.Unlock();
background_work_function(background_work_arg);
}
}

PosixEnv::PosixEnv()
: background_work_cv_(&background_work_mutex_),
started_background_thread_(false),
mmap_limiter_(MaxMmaps()),
fd_limiter_(MaxOpenFiles()) {}

NeedsCompaction

1
2
3
4
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
}

Compaction类

定义在version_set.h文件里面。

主要成员和成员函数

  1. std::vector<FileMetaData*> inputs_[2];
    表示这个Compaction涉及的两个level的文件,也就是输入。
    其中level层是inputs_[0]。level + 1层是inputs_[1],称为parents。
  2. std::vector<FileMetaData*> grandparents_;
    level + 2层的文件,通常称为grandparents。
  3. int level() const { return level_; }
    我们将level_level_+1层进行压缩。
  4. int num_input_files(int which) const
  5. bool IsTrivialMove() const;
    是否可以直接移动,而不涉及merge或者split操作。
  6. bool ShouldStopBefore(const Slice& internal_key);
  7. VersionEdit* edit() { return &edit_; }/edit_
    这个应该很好理解,Compaction肯定会有文件增删,即使是移动,也是跨层的。所以这里需要一个VersionEdit来描述。

IsTrivialMove

这个函数用来判断在Major Compaction的时候能不能直接移动老的文件到下面一层,而不归并生成新的文件,条件有三个:

  1. level层只有一个
    【Q】疑问:如果level层有多个,level+1层没有,那么我直接移动到下面一层也是安全的?那么禁止这么做的目的是什么?
    检查对GetOverlappingInputs的分析,发现可能是不安全的。如果说Level0的某个文件f和Level1的文件有Overlap,那么就必须要扫描整个Level0层的所有文件,将与f有Overlap的文件都要移到下一层。
  2. level + 1层没有
    这个原因应该好理解,如果level+1层有,那么我们就得比较和这个文件有没有Overlap。
  3. 和level + 2层的overlap没有超过阈值(实际上是20M)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    bool Compaction::IsTrivialMove() const {
    const VersionSet* vset = input_version_->vset_;
    // Avoid a move if there is lots of overlapping grandparent data.
    // Otherwise, the move could create a parent file that will require
    // a very expensive merge later on.
    return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
    TotalFileSize(grandparents_) <=
    MaxGrandParentOverlapBytes(vset->options_));
    }

DBImpl::BackgroundCompaction

这个过程是Compaction的主过程,需要全程持锁。

Minor

我们首先需要去CompactMemTable,也就是Minor Compaction。这个肯定是优先级更高的,因为我们只有两个Memtable,所以我们肯定想把Immutable Memtable快速腾空。

1
2
3
4
5
6
7
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();

if (imm_ != nullptr) {
CompactMemTable();
return;
}

Major

详见Major Compaction章节

Minor Compaction流程

CompactMemTable

主要流程三部分:

  1. WriteLevel0Table
    1. 将Immutable Memtable生成SSTable文件
      这个文件的基本信息写到FileMetaData里面,并在最后写入VersionEdit
      注意,在Recover的过程中,这里其实也可以传入Memtable。
    2. 计算添加到哪一层
      这个文件未必会放到Level0,可能会直接放到Level1甚至Level2,具体由kMaxMemCompactLevel控制。
    3. 将上面说的FileMetaData写入VersionEdit
      因此这个函数的实际返回是传入的VersionEdit* edit
  2. LogAndApply
    用我们得到的VersionEdit,去更新数据库状态,并记录。
  3. RemoveObsoleteFiles
    重置Immutable Memtable。
    删除无用文件。主要包括kLogFile/kLogFile/kTableFile等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != nullptr);

// Save the contents of the memtable as a new Table
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();

if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}

下面,就是要把edit应用到当前的VersionSet上。
【Q】SetPrevLogNumber是啥意思?为啥要设置为0呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  // Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);
}

if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
has_imm_.store(false, std::memory_order_release);
RemoveObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}

WriteLevel0Table

在前文中,已经介绍过了WriteLevel0Table的作用,下面看实现。

首先,我们计算出一个NewFileNumber,也就是落盘时体现的文件名。关于这个函数,我们之前已经介绍过了,体现在诸如MANIFEST-xxxxx或者yyyyy.log这里的序号。

pending_outputs_中保存了所有正在Compact的SSTable文件,这些文件不能被删除。这引发了我两个问题:

  1. 什么时候会删除?
    RemoveObsoleteFiles里面,马上就能看到了,不急不急
  2. 为什么在BuildTable之后就可以删除了?
1
2
3
4
5
6
7
8
9
10
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long)meta.number);

接着,BuildTable创建一个TableBuilder写入数据。值得注意的是,这里并没有加锁。我之前认为这是因为BuildTable里面会自带加锁,但是检查代码并没有。这可能是因为Compaction是单独的线程,诸如生成并写SSTable的过程是可以单独提出来处理的。

1
2
3
4
5
6
7
8
9
10
11
12
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}

Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long)meta.number, (unsigned long long)meta.file_size,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);

新生成的文件未必会放到Level0,可能会直接放到Level1。例如,如果新的SSTable文件和Level1中的文件没有重叠,那么就有可能被放到Level1,具体还需要查看Level2和新SSTable的重叠情况。因此PickLevelForMemTableOutput会生成一个level,表示放到哪一层。
下面的edit->AddFile就是将这个SSTable加到当前的VersionEdit中。

1
2
3
4
5
6
7
8
9
10
11
12
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
}

env_实际上是封装了文件系统等操作。

1
2
3
4
5
6
  CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);
return s;
}

RemoveObsoleteFiles

搞清楚几个问题:

  1. 清理文件的范围?看env_->GetChildren的实现,应该是所有这个db下的文件。
  2. 清理文件的类型?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void DBImpl::RemoveObsoleteFiles() {
mutex_.AssertHeld();

if (!bg_error_.ok()) {
// After a background error, we don't know whether a new version may
// or may not have been committed, so we cannot safely garbage collect.
return;
}

// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);

std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number;
FileType type;
std::vector<std::string> files_to_delete;
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
...
for (std::string& filename : filenames) {
if (ParseFileName(filename, &number, &type)) {
bool keep = true;
switch (type) {
case kLogFile:
keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
break;
case kDescriptorFile:
// Keep my manifest file, and any newer incarnations'
// (in case there is a race that allows other incarnations)
keep = (number >= versions_->ManifestFileNumber());
break;
case kTableFile:
keep = (live.find(number) != live.end());
break;
case kTempFile:
// Any temp files that are currently being written to must
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
break;
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
keep = true;
break;
}

if (!keep) {
files_to_delete.push_back(std::move(filename));
if (type == kTableFile) {
table_cache_->Evict(number);
}
Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
static_cast<unsigned long long>(number));
}
}
}

// While deleting all files unblock other threads. All files being deleted
// have unique names which will not collide with newly created files and
// are therefore safe to delete while allowing other threads to proceed.
mutex_.Unlock();
for (const std::string& filename : files_to_delete) {
env_->RemoveFile(dbname_ + "/" + filename);
}
mutex_.Lock();
}

Major Compaction流程

【Q】思考

在开始研究Major Compaction前,我们主动思考这个问题

  1. 对于Level0里面的文件,是不是可以直接和Level1中的文件Merge?
    答案是不行的,见GetOverlappingInputs的论述。
  2. 如果level中的某个文件的key的range过大,它可能和level+1层的很多文件有重合,这样的compaction写放大很重,如何解决这个问题?
    首先,这也是为什么LevelDB要分成很多层的原因,在Merge的时候,最多和下一层中的所有文件Overlap,写放大是可控的。
    其次,在Compact的时候,LevelDB一直关注和level+2层的key的重叠情是否超过一定量,即MaxGrandParentOverlapBytes函数。
    • ShouldStopBefore判断是否要结束当前SSTable写入,新开文件的时候,考虑当前文件和level+2的Overlap,如果过了,就新开文件。
    • IsTrivialMove判断是否可以直接移动文件到下层的时候,考虑要移动的文件和level+2层的Overlap,如果过了,就不能移动。
    • PickLevelForMemTableOutput选择Minor Compaction的层时,考虑这个Immutable Memtable的Overlap,如果过了,就不能放在这一层。
  3. 从level到level+1的Compaction会对level+2产生什么影响?
  4. LevelDB中多个不相干的合并是可以并发进行的,这个的实现是怎样的?
    需要注意,Level0文件是彼此Overlap的,所以是相干的。
    【Q】那么当一个Major Compaction开始的时候,是如何判定是否相干,如果不相干就不Compact的呢?从LevelDB的代码来看,只有一个后台线程进行Compact操作,所以我认为虽然在设计上LSM树是允许并行Compact的,但是LevelDB并没有实现,但RocksDB肯定是实现的。
  5. LevelDB中,每个user key在一层中是不是只会出现一次?
    大多数情况是的,有两个例外。
    首先,Level0是Overlap的,可能有多个。
    其次,如果使用了Snapshot,那么在下层可能也会有user key相同,但是sequence不同的。见AddBoundaryInputs的论述。
  6. 我们往Manifest文件里面写了什么?
  7. LevelDB有容量限制么?
    应该是没有的,但是当最下面一层变得特别大之后,Compaction的开销会很大
  8. LevelDB到底是限制的每一层的文件数量还是大小?
    【Q】如果限制的是总大小,如果保证生成的SSTable的大小是大致相同的?
    对于Major Compaction来说,是在DoCompactionWork里面通过下面的代码来判断的

    1
    2
    if (compact->builder->FileSize() >=
    compact->compaction->MaxOutputFileSize()) {

    这个调用最后会转到options->max_file_size上。

  9. LevelDB每一层的文件数量有限制么?
    首先Level0肯定有,大家说是4个么?我觉得不是。参考下面的代码,4只是表示有4个文件就开始Level0的Compaction。当文件数达到12个,才是上限,这个时候就要停止写了。

    1
    2
    3
    4
    5
    6
    // Level-0 compaction is started when we hit this many files.
    static const int kL0_CompactionTrigger = 4;
    // Soft limit on number of level-0 files. We slow down writes at this point.
    static const int kL0_SlowdownWritesTrigger = 8;
    // Maximum number of level-0 files. We stop writes at this point.
    static const int kL0_StopWritesTrigger = 12;
  10. LevelDB底层SSTable中的数据永无出头之日么?
    怎么可能,只要数据被修改,那么就会先到Memtable里面。

DBImpl::BackgroundCompaction

下面是对Major Compaction的处理。

计算Compaction对象

首先,我们要处理Manual Compaction的情况。如果manual_compaction_不是null,就触发Manual Compaction。我没看到非测试的代码里面有设置manual_compaction_的,但是leveldb_compact_range这个api会显示调用CompactRange,并且DB这个接口中也有CompactRange方法,也就是说,LevelDB对外暴露这个方法。

1
2
3
class LEVELDB_EXPORT DB {
...
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;

其次,我们调用PickCompaction处理size compaction和seek compaction的情况。PickCompaction会返回当前要Compact的文件,如果返回null,就啥事都不做。对于PickCompaction而言,如果既没有size compaction,又没有seek compaction,返回null。

这个过程是持锁的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void DBImpl::BackgroundCompaction() {
...
// 前面是对Minor Compaction的处理
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == nullptr);
if (c != nullptr) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
c = versions_->PickCompaction();
}

根据Compaction对象进行Compact操作

经过上面的代码,我们就得到了一个Compaction* c对象。
如果之前PickCompaction没给出这个c,那么就说明这一次不要Compact。
如果满足IsTrivialMove条件,就可以不生成新的文件,直接将原文件移动到下一层。
对于Trivial的情况我们直接更新c->edit(),不走InstallCompactionResults的逻辑了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Status status;
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {

如果不满足IsTrivialMove条件,就是一般情况,由DoCompactionWork处理。
DBImpl::CompactionState这个类又封装了Compaction,这是因为要处理两个Level之间的合并,所以要加一些额外的字段。
然后我们要CleanupCompaction,这个除了清空compact对象,还需要根据compact->outputs,找到pending_outputs_里面对应的文件,并移除出pending_outputs_。我们知道compact->outputs记录了每个输出文件的元信息,而pending_outputs_记录了正在compact的文件,我们compact结束,就把这些文件移出去。在Major Compaction中,文件是在DoCompactionWork -> OpenCompactionOutputFile中被加入pending_outputs_的。

1
2
3
4
5
6
7
8
9
10
  CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
}
delete c;

收尾

如果是Manual的,需要清空Manual状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  if (status.ok()) {
// Done
} else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
}

if (is_manual) {
ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = nullptr;
}
}

Version::PickCompaction

size compaction的优先级是高于seek compaction的。
遍历current_->compaction_level_这一层的所有文件,找到第一个largest大于compact_pointer_[level]的文件,放到Compaction* cinputs_[0]中。
如果一轮循环下来没找到,说明所有的文件的largest都小于compact_pointer_[level],也就是这一层所有的key都小于compact_pointer_[level],那就把第一个文件放进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;

// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);

// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}

对于seek compaction,把要Compact的那个文件加到c->inputs_[0]就行,逻辑很简单。

1
2
3
4
5
6
7
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}

对于Level0,有个特别的处理,这个参考GetOverlappingInputs函数的说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
c->input_version_ = current_;
c->input_version_->Ref();

// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}

现在,我们已经得到了c->inputs_[0]除了c->inputs_[0]的情况,否则c->inputs_[0]里面都只有一个文件
通过SetupOtherInputs可以计算c->inputs_[1],也就是level+1层涉及哪些文件。

1
2
3
4
  SetupOtherInputs(c);

return c;
}

Version::SetupOtherInputs

SetupOtherInputs计算在Compaction时,level+1层涉及哪些文件。在这个函数之后,我们就得到了正确的c->inputs_数组、c->grandparents_字段,以及compact_pointer_字段。在这个函数之后,PickCompaction就结束了,BackgroundCompaction会执行后面的流程,也就是DoCompactionWork
基本的思想是:所有和level层有重叠的level+1层文件都要参与Compact。得到这些文件后,反过来看下,利用这些level+1层的文件,能不能Compact更多level层的文件?
这个函数被CompactRangePickCompaction调用,也就是所有的Major Compaction逻辑都会走到这里。

GetRange和GetRange2

GetRange计算inputs_[0]/inputs_[1]的区间。
GetRange2计算inputs_[0]inputs_[1]的区间。
GetRange很简单,遍历每一个文件,然后更新smallest和largest,这里注意都需要icmp_.Compare

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest, InternalKey* largest) {
assert(!inputs.empty());
smallest->Clear();
largest->Clear();
for (size_t i = 0; i < inputs.size(); i++) {
FileMetaData* f = inputs[i];
if (i == 0) {
*smallest = f->smallest;
*largest = f->largest;
} else {
if (icmp_.Compare(f->smallest, *smallest) < 0) {
*smallest = f->smallest;
}
if (icmp_.Compare(f->largest, *largest) > 0) {
*largest = f->largest;
}
}
}
}

GetRange2很简单,就直接合并inputs_[0]inputs_[1]的内容到一个vector里面,然后调用GetRange。

1
2
3
4
5
6
7
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest, InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
}

AddBoundaryInputs

AddBoundaryInputs是一个很重要的函数,但只有很少的Blog能讲明白这个函数的来龙去脉。

翻译一下AddBoundaryInputs这个函数的注释。他提取出compaction_files里面最大的文件b1,在这里是c->inputs_[0]里面最大的文件。
然后在level_files里面找到一个b2,满足b1和b2的user key是相等的,这样的b2称为boundary file。我们需要将这个b2加入到compaction_files里面,并且继续找上界。
如果有两个块(应该就是SSTable)b1和b2,他们的范围分别是(l1, u1)(l2, u2),如果我们只Compact b1,不Compact b2,那么在读取的时候就会出错。因为它只会返回b2的结果,而永远不会返回b1的结果,因为b1在b2上层了。与此同时,我们需要注意到b2的结果可能还是一个较旧的数据,因为根据Memtable里面的介绍,Sequence Number是从新到旧来排序的。

1
2
3
void AddBoundaryInputs(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& level_files,
std::vector<FileMetaData*>* compaction_files)

【Q】看起来,这个函数做的是和GetOverlappingInputs一样的事情,他们的区别是什么呢?首先,GetOverlappingInputs的初心不是扩展边界而是计算某一层和某个range重合的文件,只是对Level0要特殊处理一下。其次,这篇文章中进行了解释。

如下图所示,两个sstable中,出现了user key相同(都为key2)但是Sequence Number不同的两个Internal Key。

所以可以看到GetOverlappingInputs的特殊处理关注的是Level0上某一个要Compact的文件中的所有key是否还会出现在其他的SSTable文件中。而AddBoundaryInputs关注的是某个Key的其他版本是否还会出现在其他的SSTable中。

【Q】这里引发了第二个疑问,为什么同一层中会出现两个相同user key的Key呢?我觉得这个可能是因为这个Key出现在两个SSTable的边界上,所以这个函数叫AddBoundaryInputs吧。

仔细回顾一下DoCompactionWork的实现,似乎是可能没处理完一个Key,就ShouldStopBefore了的,但即使这样,后面的文件里面也不会再写有关这个user key的内容了。那么究竟在什么情况下会发生这种情况呢?根据这篇文章中指出Snapshot机制会导致“同一层中会出现两个相同user key的Key”这个问题。

【Q】这里引发了第三个疑问,出现了两个user key,会不会影响读取呢?实际上只要位于同一层就不影响,因为根据Memtable里面的介绍,Sequence Number是从新到旧来排序的。我们的查找方式允许我们每一次都找到b1里面的值。

特别值得注意的是,这个问题关系到Issue 320PR 339AddBoundaryInputs函数也是在那个时候引进的。不过值得注意的是,这个patch在2016年就提了,但是2019年才被合进去。

SetupOtherInputs主体

首先AddBoundaryInputs扩充一下c->inputs_[0]
然后获得Level N的range。
然后计算Level N+1和Level N重叠的SSTable文件,并放入c->inputs_[1]
最后,计算Level N和Level N+1合并起来的range。

1
2
3
4
5
6
7
8
9
10
11
12
13
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;

AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
GetRange(c->inputs_[0], &smallest, &largest);

current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);

// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

下面的的代码,就是之前说的优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
AddBoundaryInputs(icmp_, current_->files_[level], &expanded0);
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
&expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level, int(c->inputs_[0].size()), int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size), int(expanded0.size()),
int(expanded1.size()), long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}

下面,设置一下c->grandparents_这个字段。

1
2
3
4
5
6
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}

记录下一轮的压缩起始文件,也就是设置compact_pointer_。我们在这里立即更新,而不是等到VersionEdit被Apply的时候更新,这样当Compaction失败后,我们能下次能尝试一个不同的key range。

  1. 【Q】什么是压缩起始文件?
    查看PickCompaction函数,它会找到largest大于compact_pointer_[level]后的第一个文件。
    可以发现,其实每一次要Compaction的文件就是通过compact_pointer_指定的。
  2. 【Q】在这之后,Compaction会因为什么失败?
    1
    2
    3
    4
      // Update the place where we will do the next compaction for this level.
    compact_pointer_[level] = largest.Encode().ToString();
    c->edit_.SetCompactPointer(level, largest);
    }

DBImpl::CompactionState

  1. SequenceNumber smallest_snapshot;
    小于smallest_snapshot的Sequence Number是不重要的,因为我们不会为提供smallest_snapshot的snapshot。
    所以,如果我们看到Sequence Number小于等于smallest_snapshot的某个S,就可以丢弃小于S的这个key的其他版本。
    【Q】这里是不是在说,如果只有S这个独苗,那还是要写进去的?
  2. std::vector<Output> outputs;
  3. Output* current_output() { return &outputs[outputs.size() - 1]; }
    保存每个输出文件的元信息。例如smallest和largest。
  4. WritableFile* outfile;
    Major Compaction过程中,需要输出到level+1层的文件。注意,可能有多个这样的文件,参考ShouldStopBefore
  5. TableBuilder* builder;
  6. uint64_t total_bytes;

DBImpl::DoCompactionWork

这个对应了一般情况下的Compact过程,来自BackgroundCompaction的调用。
那么这个函数做啥呢,不就是个归并排序么?且慢,我们如何处理同一个user key有不同Sequence Number呢?我们的目标肯定是只保留最新的。
其中CompactionState封装了Compaction

1
2
3
4
5
6
7
8
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions

Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);

这里要assert一下,即要压缩的Level N层是要有文件的。
【Q】这个Snapshot啥回事?
根据文章,如果有 Snapshot,则保留大于 Snapshot SN 的所有 Record,以及一个小于 Snapshot SN 的 Record 中,SN 最大的 Record。

1
2
3
4
5
6
7
8
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(compact->outfile == nullptr);
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
}

我们执行MakeInputIterator,得到的迭代器可以按照key大小遍历所有冲突文件中的每个KV对。

1
2
3
4
5
6
7
8
9
10
11
Iterator* input = versions_->MakeInputIterator(compact->compaction);

// Release mutex while we're actually doing the compaction work
mutex_.Unlock();

input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;

下面这个while循环遍历刚才得到的迭代器input,进行Major Compaction。
但是,且慢,每一次我们都需要先检查有没有Immatable Memtable,如果有的话,就需要先执行Minor Compaction。这也说明了Minor Compaction的优先级更高

1
2
3
4
5
6
7
8
9
10
11
12
13
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
// Prioritize immutable compaction work
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
CompactMemTable();
// Wake up MakeRoomForWrite() if necessary.
background_work_finished_signal_.SignalAll();
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}

检查当前输出文件(应当位于level+1层)是否与level+2层文件有过多冲突,如果是就要完成当前输出文件,并产生新的输出文件。

1
2
3
4
5
6
7
8
Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}

下面就是判断是不是能drop,也就是和前面计算的compact->smallest_snapshot比较。
正常情况下ParseInternalKey不会失败,我们跳过这个分支

1
2
3
4
5
6
7
8
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {

下面这个if,判断的是current_user_key第一次出现的情况,包括处理完上一个user key,到达下一个user key,或者刚开始处理第一个user key的情况。我们设置last_sequence_for_key为最大,那么就永远不会触发drop。

1
2
3
4
5
6
7
8
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}

下面我们比较Sequence Number,如果last_sequence_for_key都小于compact->smallest_snapshot了,那么我这个key肯定更小,这是因为Sequence Number是按照降序排列的。对于这种情况,我们省点事,直接不要了。

1
2
3
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)

下一个判断复杂点,表示对于特定情况下,一个删除操作也是可以丢掉的。
如果某个删除操作的版本小于快照版本,并且在更高层没有相同的user key,那么这个删除操作及其之前更早的插入操作可以同时丢弃了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  } else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}

last_sequence_for_key = ikey.sequence;
}

如果drop条件不符合,那么就写入到compact->current_output()里面,同时更新largest。
同时我们关注文件大小,如果超限了,就FinishCompactionOutputFile。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  if (!drop) {
// Open output file if necessary
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());

// Close output file if it is big enough
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}

input->Next();
}

截至现在,我们已经遍历完迭代器了。

1
2
3
4
5
6
7
8
9
10
11
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
delete input;
input = nullptr;

更新状态

1
2
3
4
5
6
7
8
9
10
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}

下面,我们加锁。所以其实在遍历input这个迭代器的时候,是没有在加锁的。
InstallCompactionResults是一个关键过程,它将这次Compaction的内容加入到VersionEdit里面,并且最终调用LogAndApply。内容包括什么呢?增加和删除的文件:

  1. InstallCompactionResults会调用Compaction::AddInputDeletions,需要删除的文件,包括input_[0]input_[1]
  2. compact->compaction->edit()中添加compact->outputs中的所有文件
1
2
3
4
5
6
7
8
9
10
11
12
13
  mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);

if (status.ok()) {
status = InstallCompactionResults(compact);
}
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}

Reference

  1. https://bean-li.github.io/leveldb-version/
  2. https://zhuanlan.zhihu.com/p/34674504
  3. https://blog.csdn.net/tmshasha/article/details/47703245
  4. https://zhuanlan.zhihu.com/p/51573929
  5. https://leveldb-handbook.readthedocs.io/zh/latest/basic.html
  6. https://blog.lovezhy.cc/2020/08/17/LevelDB%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%EF%BC%88%E4%BA%94%EF%BC%89-%20CURRENT%E5%92%8CManifest/
  7. https://sf-zhou.github.io/leveldb/leveldb_08_complete_process.html
  8. http://blog.jcix.top/2018-05-11/leveldb_paths/
  9. http://bean-li.github.io/leveldb-version/
  10. https://zhuanlan.zhihu.com/p/46718964
  11. http://www.hootina.org/blog/articles/leveldb%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/leveldb%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%9019.html
  12. https://sf-zhou.github.io/leveldb/leveldb_08_complete_process.html
    这是一个DB完整执行过程的表述。
  13. https://www.ravenxrz.ink/archives/1ba074b9.html
    介绍了Snapshot
  14. https://izualzhy.cn/leveldb-PickCompaction
    解释了GetOverlappingInputs的原理
  15. https://izualzhy.cn/leveldb-version
    解释了Version的实现
  16. http://lerencao.github.io/posts/lsm-tree-compaction-strategy/
  17. http://www.scylladb.com/2018/01/17/compaction-series-space-amplification/
    上面两篇文章介绍STCS和LCS
  18. https://zhuanlan.zhihu.com/p/181498475
    图解Compact过程
  19. https://github.com/facebook/rocksdb/wiki/Compaction
    RocksDB对Compaction的讲解
  20. https://blog.csdn.net/weixin_36145588/article/details/78064777
  21. https://sf-zhou.github.io/leveldb/leveldb_09_compaction.html
    这位同学解释了AddBoundaryInputs的来源
  22. http://www.petermao.com/leveldb/leveldb-8-snapshot.html
    介绍了snapshot机制
  23. https://zhuanlan.zhihu.com/p/60188395
    带Snapshot的Compaction,以及为什么会导致Issue 320的问题
  24. https://zhuanlan.zhihu.com/p/360345923
    也讲解了AddBoundaryInputs的来源,并且指出了快照会导致Issue 320的问题。
  25. https://zhuanlan.zhihu.com/p/35343043
    讲解VersionSet/VersionEdit里面出现的各种文件编号