在了解了LevelDB的相关模块的实现后,本文时序地展示LevelDB的流程概览。至少要先了解:
- Memtable
- SSTable
- Compaction机制
先跑一个Demo。
1 | leveldb::DB* db; |
创建
创建的逻辑实际上是在打开逻辑DB::Open
里面分出来的。但由于这部分逻辑简单独立,并且有益于理解整个数据库的layout所以提出来单独讲。
首先设置几个数:
SetLogNumber
将日志号设置为0DescriptorFileName
生成Manifest文件,序号为1SetNextFile
设置为21
2
3
4
5
6
7Status DBImpl::NewDB() {
VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
const std::string manifest = DescriptorFileName(dbname_, 1);
下面创建Manifest文件。
1 | WritableFile* file; |
下面一连串操作,就是把new_db
去Encode到log
里面,并且刷盘
1 | { |
设置CURRENT指向最新的Manifest
1 | if (s.ok()) { |
打开
调用链如下所示
DB::Open
DBImpl
的构造函数只是一个初始化成员列表,并不包含其他逻辑了。
在得到DBImpl
对象后,我们首先加锁,并且调用Recover
方法。这个方法内容是加载Manifest文件,并恢复故障。
值得注意的是save_manifest
这个参数,会被通过调用链传得很深,具体作用是:
- 在
RecoverLogFile
中可能出现Memtable被Dump的情况 - 在
Version::Recover
中,如果不能ReuseManifest
1
2
3
4
5
6
7
8
9Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
创建一个新的log文件。如果没有Memtable,需要创建一个。
1 | if (s.ok() && impl->mem_ == nullptr) { |
【Q】有个问题,这里为啥还需要调用LogAndApply?因为在VersionSet::Recover
里面已经看到有类似的过程了。
1 | if (s.ok() && save_manifest) { |
DBImpl::Recover
首先创建数据库目录,并且加文件锁,也就是目录下的LOCK
文件,这个函数很有意思,后面专门来讲。
1 | Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { |
下面我们检查db目录下有没有CURRENT文件。如果没有,我们认为数据库就不存在,如果此时设置了options_.create_if_missing
,就创建,否则返回错误。
1 | if (!env_->FileExists(CurrentFileName(dbname_))) { |
下面调用VersionSet里面的Recover函数。这个函数负责读取Manifest文件,恢复版本信息。
1 | s = versions_->Recover(save_manifest); |
下面,我们要分析Log文件,如果有Log文件大于Manifest中记录的值,就说明这些日志是上次关闭时丢失的数据,我们需要恢复这些日志。
注意PrevLogNumber
不再使用了,但是出于兼容性,我们依旧关注这个字段。
1 | SequenceNumber max_sequence(0); |
filenames
表示数据库目录下面的所有文件,我们依次遍历这些文件,并用ParseFileName
解析出他们的number。这里的number就是诸如MANIFEST-000002
里面的2,应该也是对应到FileMetaData
里面的number字段。
1 | for (size_t i = 0; i < filenames.size(); i++) { |
RecoverLogFile
的作用是回放日志,既然这样,就需要对日志进行排序。回放日志会修改VersionEdit,并且可能会导致Compaction。
1 | // Recover in the order in which the logs were generated |
MarkFileNumberUsed
的作用就是设置next_file_number_
,确保next_file_number_
要严格大于传入的logs[i]
。即,如果小于等于传入的logs[i]
,就将它设置为logs[i]+1
。
1 | // The previous incarnation may not have written any MANIFEST |
VersionSet::Recover
1 | Status VersionSet::Recover(bool* save_manifest) { |
首先读取CURRENT文件内容,得到当前用的Manifest文件。注意,到这里为止,肯定是存在CURRENT文件的,如果不存在,DBImpl::Recover
流程就已经会去创建了。
1 | // Read "CURRENT" file, which contains a pointer to the current manifest file |
如果没找到Manifest,就返回一个错误。对于这种情况,应该也是能处理的。
1 | std::string dscname = dbname_ + "/" + current; |
下面就是根据Manifest文件里面的内容,读取并设置VersionSet。
【Q】在哪里写入的呢?答案是在VersionEdit::EncodeTo
和Writer::AddRecord
里面,这个函数在LogAndApply的时候被调用。
1 | bool have_log_number = false; |
下面,我们用一个while循环,从reader中读取记录。ReadRecord
这个函数,将下一个record读入*record
中,如果读取成功,返回true;如果EOF了,就返回false。可能会使用*scratch
作为临时存储。*record
是有效的,直到下一个对reader
的变化操作,或者对*scratch
的变化操作。
1 | Slice record; |
Manifest里面会记录当时的Comparator(用文本编辑框打开这个文件,能看到一个类名一样的东西),VersionEdit会比较这两个是否一致。
1 | if (s.ok()) { |
【Q】在LogAndApply实现中,builder.Apply
之后还会跟着builder.SaveTo
,这里为啥不跟了?稍等,Apply是一条记录Apply一次,SaveTo是最后全搞好了,一次SaveTo。我们往后看,就能看到对SaveTo
的调用了。
1 | if (s.ok()) { |
到此为止,这个文件就读取完毕了,我们释放这个文件。
1 | delete file; |
下面就是SaveTo、Finalize、AppendVersion的流程,和LogAndApply
是类似的
1 | if (s.ok()) { |
检查是继续用现有的Manifest文件,还是重新建一个。这个可能修改descriptor_file_
,从而影响到LogAndApply
,但是这样的影响只会存在于Recover里面。
【Q】这么处理的目的是什么呢?
目的是为了解决Manifest文件过大的问题。
1 | // See if we can reuse the existing MANIFEST file. |
DBImpl::RecoverLogFile
【在阅读这个函数前,需要先学习VersionSet::Recover
】
RecoverLogFile用于读取Log,并且将应用尚未Apply到版本的Log。
1 | Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, |
现在,我们开始循环读取日志到record
中。接着调用InsertInto
方法将它写到Memtable中,这个方法原理我们在介绍DB::Write
时讲解。
1 | while (reader.ReadRecord(&record, &scratch) && status.ok()) { |
接着我们更新last_seq
。【Q】有点奇怪,这里为啥要加Count?参考写那一部分的分析。
1 | const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + |
如果Memtable内存超限了,就开启Minor Compaction。当然,这里是一个局部的Compaction,因为不需要维护版本,所以没有LogAndApply调用。因为也不会产生多余的文件,所以也没有RemoveObsoleteFiles
调用。回忆一下WriteLevel0Table的实现,我们实际要做的是:
- 生成SSTable
- 计算SSTable放到那哪一层
- 写VersionEdit
如果需要将Memtable落盘,那么就要设置save_manifest
为true。这个值是从DBImpl::Open
开始一层一层传下来的。
1 | if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { |
到现在为止,上面的while循环就结束了,我们释放掉这个日志文件。但是这里同样要看一下是否可以重新利用log文件fname
。
1 | delete file; |
如果重新利用Log,就不需要走到后面的WriteLevel0Table
了。
1 | Log(options_.info_log, "Reusing old log %s \n", fname.c_str()); |
1 | if (mem != nullptr) { |
文件锁
PosixLockTable
PosixLockTable
这个类用来管理所有通过LockFile
锁住的文件。
需要注意的是fcntl(F_SETLK)
也可以实现文件锁,但是它不能保证同一个进程中的并发访问,所以在此之外,还需要再包一层。
【Q】为什么进程中还会有并发访问?在下文中解释。
1 | class PosixLockTable { |
LockFile
为了加锁,我们首先得往自己进程中的PosixLockTable locks_
中加入加锁记录。如果加锁失败,说明这个锁已经被我们进程持有了,就退出。
1 | Status LockFile(const std::string& filename, FileLock** lock) override { |
如果我们进程没有持有锁,再调用LockOrUnlock
加文件锁。如果加锁失败,说明锁已经被其他进程占用了,这时候就要将它从locks_
移除出去。
1 | if (LockOrUnlock(fd, true) == -1) { |
LockOrUnlock
LockOrUnlock
根据传入的lock
对文件进行F_SETLK
操作。F_SETLK
是非阻塞的,还有一个F_SETLKW
函数是阻塞的。F_SETLK
可以锁定文件的某些部分,在这里,设置l_start
和l_len
都为0,表示锁定整个文件。
1 | int LockOrUnlock(int fd, bool lock) { |
有关Linux进程和线程的补充说明
这里需要注意,Linux中pthread库创建出来的线程可能具有相同的PID,不同的TID,我们可以从下面的代码看到。
1 |
|
写
LevelDB可以通过WriteBatch
支持批量更新的功能。当然了,作为对Write
函数的一个简易化封装,Put
只会更新一个字段。
1 | Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { |
写数据库的流程:
- 写WAL
- 写MemTable
- 更新Sequence Number
如下所示,写是可以并发的,因此会有类似于InnoDB中的组提交机制。
DBImpl::Write
首先,全局有个writers_
队列,维护所有的写。
1 | class DBImpl : public DB { |
我们新创建一个DBImpl::Writer
这个对象,这个对象中有一个关联到mutex_
的条件变量w.cv
。
接着将这个Writer对象放到writers_
中,然后我们等待下面的条件:
w.done()
表示其他线程已经帮w
写完了。w == writers_.front()
表示这个Writer位于队头,并且抢到了锁。1
2
3
4
5Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
所以当一个写线程进入时,首先先要获得锁,这个锁可能会被其他的写入(的部分阶段)持有,或者被后台Compaction(的部分阶段)线程持有。获得锁之后,它能做的其实也就是把自己的Writer
挂到writers_
队列上,然后如果现在不是队头,就要去等待信号量。
1 | MutexLock l(&mutex_); |
如果从条件变量上醒过来,还是要再检查一下有没有w.done()
,因为可能是另一个条件醒过来的。
1 | if (w.done) { |
下面调用MakeRoomForWrite
,如果updates
是nullptr的话,force就是1,强制MakeRoomForWrite
进行Compaction。
【Q】什么时候updates
是nullptr呢?DBImpl::TEST_CompactMemTable
里面有个注释,说如果设置为nullptr,就是在催促。
1 | // May temporarily unlock and wait. |
在MakeRoomForWrite
之后,肯定是可以往数据库里面写东西的了。
我们需要得到一个Sequence Number才能写,我们首先取出上一次写的Sequence Number。
1 | uint64_t last_sequence = versions_->LastSequence(); |
BuildBatchGroup
会合并队列里的多个写入到tmp_batch_
里面。这个batch算作一次更新,具有全局唯一的一个Sequence Number,从之前递增而来。在合并的时候需要考虑:
- 总写入数据大小
- 如果有请求是
sync==false
了,那么就不加入sync==true
的
在合并结束后,BuildBatchGroup
会更新last_writer
,表示最后一个写入。
【Q】是不是可能在Memtable有两个record,他们的Sequence Number是相同的?现在看来是有可能的,这是因为批量写的话只会有一个Sequence Number。但是假如有Count个一次性写入,那么Sequence Number会在这个之后增加Count次。有点奇怪。1
2
3
4if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
下面是写日志的操作对应AddRecord
。
【Q】根据注释,这个操作是不需要加锁的,为什么呢?文章说,这样可以先让其他请求进入队列中排队。
这样做是安全的,因为只有一个写,就是&w
。
同时,可以看出这一步会给写入速度带来比较好的提升,因为只有拿到锁才能往writers_
里面push。
1 | // Add to log and apply to memtable. We can release the lock |
数据库的通用原理,写完日志,状态OK了,才能写Memtable,对应InsertInto
。
1 | if (status.ok()) { |
逐个弹出writers_
里的元素,并唤起等待write的线程,直到遇到last_writer
。
1 | while (true) { |
我们处理完writers
队列中的一个项目了,应当Signal一下,通知下一个项目进来。
1 | // Notify new head of write queue |
DBImpl::BuildBatchGroup
1 | // REQUIRES: Writer list must be non-empty |
讨论第一个batch的大小来设置max_size
:
- 如果比较小
就设置为size + (128 << 10)
- 如果还可以
就设置为1 << 20
1
2
3
4
5
6
7
8
9size_t size = WriteBatchInternal::ByteSize(first->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) { // 128 << 10 == 1 << 17
max_size = size + (128 << 10);
}
first
是writers_
队头,下面,我们就遍历整个writers_
队列,直到:
- 如果
first
是non sync的话,那么我们会在遇到第一个要加入的sync请求的时候就break掉。反之,如果first
是sync的话,那么可以兼容non sync的请求的。 - 大小超限
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
我们把这些batch,全部加到result
里面。如果涉及多个batch,result就指向tmp_batch_
,否则就指向first->batch
1 | // Append to *result |
DBImpl::MakeRoomForWrite
MakeRoomForWrite
用来确保我们有空间写入,如果此时Memtable满了,就需要去dump成Immutable Memtable。如果现在Level0负荷过重,那么就要延迟一下写入速度。
在研究这个函数时,我们要特别注意各个if条件的判断顺序,这体现了优先级。
1 | // REQUIRES: mutex_ is held |
一进来,首先一个while循环。唔,这个功能为啥要有while?原因是因为里面要等待信号量的。还有一个原因是,当产生Immutable Memtable之后,我们需要等待它刷盘。
1 | while (true) { |
如果force为false,也就是不强制执行Compaction,就认为是允许延迟的。【Q】其实我没搞懂这个逻辑。
如果允许延迟,并且Level0的文件数达到至少8个,那么就开始慢速写。注意,Level0层最大文件数不是4,这是个误区。当有4个文件的时候开始Compaction,当有12个文件的时候,才停止写入。
慢速写的实现就是主线程睡1000ms,这个时候后台的Compaction线程是可以开始Compact的。在睡眠结束之后,要将allow_delay
设为false,也就是说对于一次写,我们只慢速一次。
1 | } else if (allow_delay && versions_->NumLevelFiles(0) >= |
下面,如果不强制Compaction,并且Memtable的大小没有超标,那么就啥都不要做,这个应该是最通常的情况。
1 | } else if (!force && |
如果此时上一轮Immutable Memtable还没有Minor Compact完毕,那么我们就在background_work_finished_signal_
这个条件变量上面等待。
我们注意到在进入这个函数时是持有mutex_
的,所以这个生产者消费者模式是安全的。
1 | } else if (imm_ != nullptr) { |
同理,如果Level0满了,即达到12个文件了,那我们同样要在信号量上等待。
1 | } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { |
对于剩余的情况,我们要将Memtable改成Immutable Memtable。
同时,我们注意到这个分支并不会在最后break掉!这是因为此时有了Immutable Memtable了,我们需要等它被刷成SSTable落盘,所以至少还需要一次while循环。
这个这个刷盘过程等到什么时候呢?
- 对于
CompactMemTable
来说,至少要执行完LogAndApply之后,才会将imm_
设置为nullptr。 - 而这个条件变量,在
MaybeScheduleCompaction
调用完之后会被Signal。当然,需要注意,在Major Compaction过程中,如果有Immutable Memtable需要落盘,那么还是要先执行CompactMemTable
的,在这个之后,也会触发一次Signal。
注意,这一次刷盘还可能会导致Level0文件达到上限,那就要等更久了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
读
【Q】思考
- 读要加锁么?
我们首先考虑分布式共识这一块,为了实现一致读写,Raft即使是读请求,也需要走一遍LogEntry的。而ZK的话,可以选择直接读,所以未必是一致读。
当然,这个离题了。我觉得根据LevelDB的MVCC模式,其实至少有一部分是可以不加锁的。 - 在哪些地方可以非线性地查找?
在非0层找SSTable时,见FindFile。
在BlockReader返回Iterator之后,可以通过Seek来二分。 - 在读取的时候会做缓存么?
LevelDB在Table和Block两个层面进行缓存。
在Table层面通过TableCache。
在Block层面通过BlockReader里面的table->rep_->options.block_cache
分支。
DBImpl::Get
1 | Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
可以看到,在获取了current
之后,就可以解锁了。
【Q】这里还取出了mem_
和imm_
,是不是在MVCC下面,可能同时存在多个mem_
和imm_
?此时,永远写最新的Memtable,但是可能会读旧的Memtable。
下面就是经典的读取三部曲:
- 首先看Memtable
- 然后看Immutable Memtable
- 然后就去SSTable里面找,具体是调用
current->Get
1 | // Unlock while reading from files and memtables |
到此为止,锁要重新加回来。
【Q】看起来读操作也会触发Compaction。
1 | if (have_stat_update && current->UpdateStats(stats)) { |
Version::Get
辅助函数
这个函数根据smallest和largest找到对应的文件。
容易想到func
的作用是在文件里面找key。
1 | void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg, |
从Compaction一文的介绍中了解到,files_
里面存放了当前Version中所有SSTable的元信息。
我们首先要遍历第0层的所有文件,放到tmp
里面,按照f->number
排序。排完序,我们就开始查找,在文件中查找需要借助于传入的func
,实际上是State::Match
这个函数。
1 | tmp.reserve(files_[0].size()); |
下面,就可以用之前介绍过的FindFile
来二分查找了。
1 | // Search other levels. |
State类
State类中主要定义了从SSTable中找对应Key的函数Match
。
在研究之前,我们先来复习一下SSTable的格式:
- data block
- meta block
- meta index block
- index block
记录每个data block的“largest”,满足两个性质。
注意,这里的largest不是单纯的largest,而要进行一些修正,它实际上是分隔两个Data Block的最短Key, - footer
记录index block和meta index block的位置
所以,我们要先通过index block去定位data block,得到这个data block。
接着,我们复习一下block的格式
- record
- restart
- 额外信息
num restarts
type
crc32
所以,我们要用LookupKey先去找restart,然后从restart开始找。
同时,我们注意由于,meta block的存在,会有一些优化。
1 | struct State { |
TableCache::Get和TableCache::FindTable
TableCache这一块是一个缓存层,如果缓存中没有,才去读SSTable,并把它加到缓存里面。Get的第一步是FindTable,先介绍这个。
首先在cache_
里面查文件的handle,如果没找到,就新建一个,并且调用Table::Open
从文件中读取。
1 | Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, |
TableAndFile
就是打包RandomAccessFile*
和Table*
。
1 | if (!s.ok()) { |
下面来看Get函数,现在我们已经能得到对应的Table*
了,此时调用InternalGet
方法。
1 | Status TableCache::Get(const ReadOptions& options, uint64_t file_number, |
Table::InternalGet
1 | Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, |
首先,可以通过布隆过滤器判断这个block里面有没有。
1 | if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() && |
由于布隆过滤器可能假阳,所以这边还需要实际Seek一下。我们先前介绍过BlockReader
,这个函数返回一个Iterator。实际上是一个Block::Iter
对象。
当时他被用在创建TwoLevelIterator
里面,这个双层迭代器实际上就是index block上的迭代器和data block上的迭代器的组合。
1 | Iterator* block_iter = BlockReader(this, options, iiter->value()); |
Table::Open
【这一部分可以先不读,因为所有对SSTable的读key请求,最后都是从Cache里面处理了】Table::Open
负责读取SSTable到表对象Table
中。
1 | Status Table::Open(const Options& options, RandomAccessFile* file, |
先读取footer。
1 | char footer_space[Footer::kEncodedLength]; |
再读取block。
1 | // Read the index block |
主体函数
主要就是构造一个state,然后调用ForEachOverlapping
1 | Status Version::Get(const ReadOptions& options, const LookupKey& k, |
故障恢复
Manifest损坏/丢失
Reference
- http://luodw.cc/2015/10/30/leveldb-14/
介绍WriteBatch
- https://zhuanlan.zhihu.com/p/340804308
介绍Revocer逻辑
- https://blog.csdn.net/sparkliang/article/details/9311487
介绍RecoverLogFile
- https://izualzhy.cn/leveldb-write-read
介绍了LevelDB读写流程,我使用了它的部分图片 - https://leeshine.github.io/2019/01/24/leveldb-put-get/
- https://sf-zhou.github.io/leveldb/leveldb_10_details.html
讲述多线程写的demo,很值得一看 - http://1feng.github.io/2016/08/24/mvcc-and-manifest/
介绍MVCC机制,很好 - https://www.cnblogs.com/cobbliu/p/6194072.html
介绍SSTable、Block的格式,一张大图,非常屌 - https://blog.csdn.net/weixin_42663840/article/details/82629473
我见过最屌有关读写的注释