LevelDB之流程概览

在了解了 LevelDB 的相关模块的实现后:

  1. Memtable
  2. SSTable
  3. Compaction 机制

本文时序地展示 LevelDB 的读、写、Recover流程。

目录:

  1. LevelDB之Memtable实现
  2. LevelDB之SSTable实现
  3. LevelDB之Version
  4. LevelDB之Compaction
  5. LevelDB之流程概览

先跑一个 Demo。

1
2
3
4
5
6
7
8
9
10
11
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;

leveldb::Status status = leveldb::DB::Open(options,"./testdb",&db);

std::string key = "calvinneo";
std::string value = "calvinneo@calvinneo.com";

status = db->Put(leveldb::WriteOptions(), key, value);//添加
status = db->Get(leveldb::ReadOptions(), key, &value);//获取

创建

创建的逻辑实际上是在打开逻辑DB::Open里面分出来的。但由于这部分逻辑简单独立,并且有益于理解整个数据库的layout所以提出来单独讲。
首先设置几个数:

  1. SetLogNumber将日志号设置为0
  2. DescriptorFileName生成Manifest文件,序号为1
  3. SetNextFile设置为2
1
2
3
4
5
6
7
8
Status DBImpl::NewDB() {
VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
const std::string manifest = DescriptorFileName(dbname_, 1);
...

下面创建Manifest文件。

1
2
3
4
5
6
7
...
WritableFile* file;
Status s = env_->NewWritableFile(manifest, &file);
if (!s.ok()) {
return s;
}
...

下面一连串操作,就是把new_db去Encode到log里面,并且刷盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
{
log::Writer log(file);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
}
delete file;
...

设置CURRENT指向最新的Manifest

1
2
3
4
5
6
7
8
9
...
if (s.ok()) {
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1);
} else {
env_->RemoveFile(manifest);
}
return s;
}

打开

调用链如下所示

DB::Open

DBImpl的构造函数只是一个初始化成员列表,并不包含其他逻辑了。
在得到DBImpl对象后,我们首先加锁,并且调用Recover方法。这个方法内容是加载Manifest文件,并恢复故障。
值得注意的是save_manifest这个参数,会被通过调用链传得很深,具体作用是:

  1. RecoverLogFile中可能出现Memtable被Dump的情况
  2. Version::Recover中,如果不能ReuseManifest
1
2
3
4
5
6
7
8
9
10
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;

DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
...

创建一个新的log文件。如果没有Memtable,需要创建一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
...

【Q】有个问题,这里为啥还需要调用LogAndApply?因为在VersionSet::Recover里面已经看到有类似的过程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}

DBImpl::Recover

首先创建数据库目录,并且加文件锁,也就是目录下的LOCK文件,这个函数很有意思,后面专门来讲。

1
2
3
4
5
6
7
8
9
10
11
12
13
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
mutex_.AssertHeld();

// Ignore error from CreateDir since the creation of the DB is
// committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt.
env_->CreateDir(dbname_);
assert(db_lock_ == nullptr);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}
...

下面我们检查db目录下有没有CURRENT文件。如果没有就认为数据库就不存在,如果此时设置了options_.create_if_missing,就创建,否则返回错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
Log(options_.info_log, "Creating DB %s since it was missing.",
dbname_.c_str());
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(dbname_,
"exists (error_if_exists is true)");
}
}
...

下面调用VersionSet里面的Recover函数。这个函数负责读取Manifest文件,恢复版本信息。

1
2
3
4
5
6
...
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}
...

下面,要分析Log文件,如果有Log文件大于Manifest中记录的值,就说明这些日志是上次关闭时丢失的数据,需要恢复这些日志。
注意PrevLogNumber不再使用了,但是出于兼容性,我们依旧关注这个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
...

filenames 表示数据库目录下面的所有文件,我们依次遍历这些文件,并用ParseFileName解析出他们的number。这里的number就是诸如MANIFEST-000002里面的2,应该也是对应到FileMetaData里面的number字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}
...

RecoverLogFile的作用是回放日志,既然这样,就需要对日志进行排序。回放日志会修改VersionEdit,并且可能会导致Compaction。

1
2
3
4
5
6
7
8
9
10
...
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
...

MarkFileNumberUsed的作用就是设置next_file_number_,确保next_file_number_要严格大于传入的logs[i]。即,如果小于等于传入的logs[i],就将它设置为logs[i]+1

1
2
3
4
5
6
7
8
9
10
11
12
13
...
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}

if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}

return Status::OK();
}

VersionSet::Recover

1
2
3
4
5
6
7
8
Status VersionSet::Recover(bool* save_manifest) {
struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
...

首先读取CURRENT文件内容,得到当前用的Manifest文件。注意,到这里为止,肯定是存在CURRENT文件的,如果不存在,DBImpl::Recover流程就已经会去创建了。

1
2
3
4
5
6
7
8
9
10
11
...
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size() - 1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);

如果没找到Manifest,就返回一个错误。对于这种情况,应该也是能处理的

1
2
3
4
5
6
7
8
9
10
11
  std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
if (s.IsNotFound()) {
return Status::Corruption("CURRENT points to a non-existent file",
s.ToString());
}
return s;
}
...

下面就是根据Manifest文件里面的内容,读取并设置VersionSet。
【Q】在哪里写入的呢?答案是在VersionEdit::EncodeToWriter::AddRecord里面,这个函数在LogAndApply的时候被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
int read_records = 0;

{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true /*checksum*/,
0 /*initial_offset*/);
...

下面,我们用一个while循环,从reader中读取记录。
ReadRecord这个函数,将下一个record读入*record中,如果读取成功,返回true;如果EOF了,就返回false。可能会使用*scratch作为临时存储。*record是有效的,直到下一个对reader的变化操作,或者对*scratch的变化操作。

1
2
3
4
5
6
7
8
...
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
++read_records;
VersionEdit edit;
s = edit.DecodeFrom(record);
...

Manifest里面会记录当时的Comparator(用文本编辑框打开这个文件,能看到一个类名一样的东西),VersionEdit会比较这两个是否一致。

1
2
3
4
5
6
7
8
9
10
...
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
...

【Q】在LogAndApply实现中,builder.Apply之后还会跟着builder.SaveTo,这里为啥不跟了?稍等,Apply是一条记录Apply一次,SaveTo是最后全搞好了,一次SaveTo。我们往后看,就能看到对SaveTo的调用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
...
if (s.ok()) {
builder.Apply(&edit);
}

if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}

if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}

if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}

if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
...

到此为止,这个文件就读取完毕了,我们释放这个文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
delete file;
file = nullptr;

if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}

if (!have_prev_log_number) {
prev_log_number = 0;
}

MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
...

下面就是SaveTo、Finalize、AppendVersion的流程,和LogAndApply是类似的

1
2
3
4
5
6
7
8
9
10
11
12
13
...
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
...

检查是继续用现有的Manifest文件,还是重新建一个。这个可能修改descriptor_file_,从而影响到LogAndApply,但是这样的影响只会存在于Recover里面。
【Q】这么处理的目的是什么呢?
目的是为了解决Manifest文件过大的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
// See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
*save_manifest = true;
}
} else {
std::string error = s.ToString();
Log(options_->info_log, "Error recovering version set with %d records: %s",
read_records, error.c_str());
}

return s;
}

DBImpl::RecoverLogFile

【在阅读这个函数前,需要先学习VersionSet::Recover
RecoverLogFile用于读取Log,并且将应用尚未Apply到版本的Log。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // null if options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) *this->status = s;
}
};

mutex_.AssertHeld();

// Open the log file
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}

// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : nullptr);
// We intentionally make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long)log_number);

// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
int compactions = 0;
MemTable* mem = nullptr;
...

现在开始循环读取日志到record中。接着调用InsertInto方法将它写到Memtable中,这个方法原理在介绍DB::Write时讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);

if (mem == nullptr) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
...

接着更新last_seq。【Q】有点奇怪,这里为啥要加Count?参考写那一部分的分析。

1
2
3
4
5
6
7
...
const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
...

如果Memtable内存超限了,就开启Minor Compaction。当然,这里是一个局部的Compaction,因为不需要维护版本,所以没有LogAndApply调用。因为也不会产生多余的文件,所以也没有RemoveObsoleteFiles调用。回忆一下WriteLevel0Table的实现,我们实际要做的是:

  1. 生成SSTable
  2. 计算SSTable放到那哪一层
  3. 写VersionEdit

如果需要将Memtable落盘,那么就要设置save_manifest为true。这个值是从DBImpl::Open开始一层一层传下来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
mem = nullptr;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
}
}
}
...

到现在为止,上面的while循环就结束了,我们释放掉这个日志文件。但是这里同样要看一下是否可以重新利用log文件fname

1
2
3
4
5
6
7
8
9
10
11
...
delete file;

// See if we should keep reusing the last log file.
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == nullptr);
assert(log_ == nullptr);
assert(mem_ == nullptr);
uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) {

如果重新利用Log,就不需要走到后面的WriteLevel0Table了。

1
2
3
4
5
6
7
8
9
10
11
12
13
    Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
logfile_number_ = log_number;
if (mem != nullptr) {
mem_ = mem;
mem = nullptr;
} else {
// mem can be nullptr if lognum exists but was empty.
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
  if (mem != nullptr) {
// mem did not get reused; compact it.
if (status.ok()) {
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
}
mem->Unref();
}

return status;
}

文件锁

PosixLockTable

PosixLockTable这个类用来管理所有通过LockFile锁住的文件。
需要注意的是fcntl(F_SETLK)也可以实现文件锁,但是它不能保证同一个进程中的并发访问,所以在此之外,还需要再包一层。
【Q】为什么进程中还会有并发访问?在下文中解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class PosixLockTable {
public:
bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
mu_.Lock();
bool succeeded = locked_files_.insert(fname).second;
mu_.Unlock();
return succeeded;
}
void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
mu_.Lock();
locked_files_.erase(fname);
mu_.Unlock();
}

private:
port::Mutex mu_;
std::set<std::string> locked_files_ GUARDED_BY(mu_);
};

LockFile

为了加锁,我们首先得往自己进程中的PosixLockTable locks_中加入加锁记录。如果加锁失败,说明这个锁已经被我们进程持有了,就退出。

1
2
3
4
5
6
7
8
9
10
11
12
Status LockFile(const std::string& filename, FileLock** lock) override {
*lock = nullptr;

int fd = ::open(filename.c_str(), O_RDWR | O_CREAT | kOpenBaseFlags, 0644);
if (fd < 0) {
return PosixError(filename, errno);
}

if (!locks_.Insert(filename)) {
::close(fd);
return Status::IOError("lock " + filename, "already held by process");
}

如果我们进程没有持有锁,再调用LockOrUnlock加文件锁。如果加锁失败,说明锁已经被其他进程占用了,这时候就要将它从locks_移除出去。

1
2
3
4
5
6
7
8
9
10
11
12
  if (LockOrUnlock(fd, true) == -1) {
int lock_errno = errno;
::close(fd);
locks_.Remove(filename);
return PosixError("lock " + filename, lock_errno);
}

*lock = new PosixFileLock(fd, filename);
return Status::OK();
}

PosixLockTable locks_;

LockOrUnlock

LockOrUnlock根据传入的lock对文件进行F_SETLK操作。F_SETLK是非阻塞的,还有一个F_SETLKW函数是阻塞的。
F_SETLK可以锁定文件的某些部分,在这里,设置l_startl_len都为0,表示锁定整个文件。

1
2
3
4
5
6
7
8
9
10
int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct ::flock file_lock_info;
std::memset(&file_lock_info, 0, sizeof(file_lock_info));
file_lock_info.l_type = (lock ? F_WRLCK : F_UNLCK);
file_lock_info.l_whence = SEEK_SET;
file_lock_info.l_start = 0;
file_lock_info.l_len = 0; // Lock/unlock entire file.
return ::fcntl(fd, F_SETLK, &file_lock_info);
}

有关Linux进程和线程的补充说明

这里需要注意,Linux中pthread库创建出来的线程可能具有相同的PID,不同的TID,我们可以从下面的代码看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
#define gettid() syscall(SYS_gettid)

void *start_routine(void* index) {
char msg[99] = "";
snprintf(msg, sizeof(msg)-1, "thd %d: getpid %d gettid %d\n", *(int*)index, getpid(), gettid());
while (1) {
write(1, msg, strlen(msg));
sleep(1);
}
}

int main() {

int th1 = 1;
pthread_t tid1;
pthread_create(&tid1, NULL, start_routine, &th1);

int th2 = 2;
pthread_t tid2;
pthread_create(&tid2, NULL, start_routine, &th2);

const char *msg = "main: i am main\n";
while (1) {
write(1, msg, strlen(msg));
sleep(1);
}

return 0;
}
/*
main: i am main
thd 1: getpid 31270 gettid 31271
thd 2: getpid 31270 gettid 31272
main: i am main
thd 1: getpid 31270 gettid 31271
thd 2: getpid 31270 gettid 31272
main: i am main
thd 1: getpid 31270 gettid 31271
thd 2: getpid 31270 gettid 31272
*/

LevelDB可以通过WriteBatch支持批量更新的功能。当然了,作为对Write函数的一个简易化封装,Put只会更新一个字段。

1
2
3
4
5
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

写数据库的流程:

  1. 写WAL
  2. 写MemTable
  3. 更新Sequence Number

如下所示,写是可以并发的,因此会有类似于 InnoDB 中的 Group Commit。

DBImpl::Write

首先,全局有个writers_队列,维护所有的写。

1
2
3
4
5
6
class DBImpl : public DB {
...
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
...
}

我们新创建一个DBImpl::Writer这个对象,这个对象中有一个关联到mutex_的条件变量w.cv
接着将这个Writer对象放到writers_中,然后我们等待下面的条件:

  1. w.done()
    表示其他线程已经帮w写完了。
  2. w == writers_.front()
    表示这个Writer位于队头,并且抢到了锁。
    1
    2
    3
    4
    5
    6
    Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
    Writer w(&mutex_);
    w.batch = updates;
    w.sync = options.sync;
    w.done = false;
    ...

所以当一个写线程进入时,首先先要获得锁,这个锁可能会被其他的写入(的部分阶段)持有,或者被后台Compaction(的部分阶段)线程持有。获得锁之后,它能做的其实也就是把自己的Writer挂到writers_队列上,然后如果现在不是队头,就要去等待信号量。

1
2
3
4
5
6
7
...
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
...

如果从条件变量上醒过来,还是要再检查一下有没有w.done(),因为可能是另一个条件醒过来的。

1
2
3
4
5
...
if (w.done) {
return w.status;
}
...

下面调用MakeRoomForWrite,如果updates是nullptr的话,force就是1,强制MakeRoomForWrite进行Compaction。
【Q】什么时候updates是nullptr呢?DBImpl::TEST_CompactMemTable里面有个注释,说如果设置为nullptr,就是在催促。

1
2
3
4
...
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
...

MakeRoomForWrite之后,肯定是可以往数据库里面写东西的了。
我们需要得到一个Sequence Number才能写,所以首先取出上一次写的Sequence Number。

1
2
3
4
...
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
...

下面是一个Group Commit的过程。
BuildBatchGroup会合并队列里的多个写入到tmp_batch_里面。这个batch算作一次更新,具有全局唯一的一个Sequence Number,从之前递增而来。在合并的时候需要考虑:

  1. 总写入数据大小
  2. 如果有请求是sync==false了,那么就不加入sync==true
    在合并结束后,BuildBatchGroup会更新last_writer,表示最后一个写入。

【Q】是不是可能在Memtable有两个record,他们的Sequence Number是相同的?现在看来是有可能的,这是因为批量写的话只会有一个Sequence Number。但是假如有Count个一次性写入,那么Sequence Number会在这个之后增加Count次。有点奇怪。

1
2
3
4
5
...
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

下面是写日志的操作对应AddRecord
【Q】根据注释,这个操作是不需要加锁的,为什么呢?文章说,这样可以先让其他请求进入队列中排队。
这样做是安全的,因为只有一个写,就是&w
同时,可以看出这一步会给写入速度带来比较好的提升,因为只有拿到锁才能往writers_里面push。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
...

先写日志,写完才能写Memtable,对应InsertInto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}
...

逐个弹出writers_里的元素,并唤起等待write的线程,直到遇到last_writer。它表示本 write group 中的最后一个 writer。

1
2
3
4
5
6
7
8
9
10
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

处理完writers队列,应当Signal一下。

1
2
3
4
5
6
7
  // Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

DBImpl::BuildBatchGroup

1
2
3
4
5
6
7
8
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != nullptr);

讨论第一个batch的大小来设置max_size

  1. 如果比较小
    就设置为size + (128 << 10)
  2. 如果还可以
    就设置为1 << 20
1
2
3
4
5
6
7
8
9
size_t size = WriteBatchInternal::ByteSize(first->batch);

// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) { // 128 << 10 == 1 << 17
max_size = size + (128 << 10);
}

firstwriters_队头,下面,我们就遍历整个writers_队列,直到:

  1. 如果first是non sync的话,那么我们会在遇到第一个要加入的sync请求的时候就break掉。反之,如果first是sync的话,那么可以兼容non sync的请求的
  2. 大小超限
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
...

我们把这些batch,全部加到result里面。如果涉及多个batch,result就指向tmp_batch_,否则就指向first->batch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}

DBImpl::MakeRoomForWrite

MakeRoomForWrite 用来确保有空间写入,如果此时 Memtable 满了,就需要去 dump 成 Immutable Memtable。如果现在 Level0 负荷过重,那么就要延迟一下写入速度。
在研究这个函数时,要注意各个 if 条件的判断顺序,这体现了优先级。

1
2
3
4
5
6
7
8
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
...

一进来,首先一个 while 循环。唔,这个功能为啥要有 while?原因是因为里面要等待信号量的。还有一个原因是,当产生 Immutable Memtable 之后,需要等待它落盘。

1
2
3
4
5
6
7
...
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
...

如果 force 为 false,也就是不强制执行 Compaction,就认为是允许延迟的。【Q】其实我没搞懂这个逻辑。
如果允许延迟,并且 Level0 的文件数达到至少8个,那么就开始慢速写。注意,Level0 层最大文件数不是4,这是个误区。当有4个文件的时候开始 Compaction,当有12个文件的时候,才停止写入。
慢速写的实现就是主线程睡1000ms,这个时候后台的 Compaction 线程是可以开始 Compact 的。在睡眠结束之后,要将 allow_delay设为 false,也就是说对于一次写,只慢速一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
...

下面,如果不强制Compaction,并且Memtable的大小没有超标,那么就啥都不要做,这个应该是最通常的情况。

1
2
3
4
5
6
...
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
...

如果此时上一轮Immutable Memtable还没有Minor Compact完毕,那就在background_work_finished_signal_这个条件变量上面等待。
注意到在进入这个函数时是持有mutex_的,所以这个生产者消费者模式是安全的。

1
2
3
4
5
6
7
...
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
...

同理,如果Level0满了,即达到12个文件了,那我们同样要在信号量上等待。

1
2
3
4
5
6
...
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
...

对于剩余的情况,我们要将 Memtable 改成 Immutable Memtable。然后创建一个新的 Memtable,并为这个新 Memtable 使用新的 log。
要注意到这个分支并不会在最后 break 掉!这因为此时有了 Immutable Memtable 了,我们需要等它被刷成 SSTable 落盘,所以至少还需要一次while循环。
这个落盘过程等到什么时候呢?

  1. 对于 CompactMemTable 来说,至少要执行完 LogAndApply 之后,才会将 imm_ 设置为nullptr。
  2. 而这个条件变量,在 MaybeScheduleCompaction 调用完之后会被 Signal。当然,需要注意,在 Major Compaction 过程中,如果有 Immutable Memtable 需要落盘,那么还是要先执行 CompactMemTable 的,在这个之后,也会触发一次 Signal。
    注意,这一次刷盘还可能会导致 Level0 文件达到上限,那就要等更久了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
...
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}

【Q】思考

  1. 读要加锁么?
    首先考虑分布式共识这一块,为了实现一致读写,Raft 即使是读请求,也需要走一遍 LogEntry 的。而 ZK 的话,可以选择直接读,所以未必是一致读。
    当然,这个离题了。我觉得根据 LevelDB 的 MVCC 模式,其实至少有一部分是可以不加锁的。
  2. 在哪些地方可以非线性地查找?
    在非0层找 SSTable 时,见 FindFile。
    在 BlockReader 返回 Iterator 之后,可以通过 Seek 来二分。
  3. 在读取的时候会做缓存么?
    LevelDB 在 Table 和 Block 两个层面进行缓存。
    在 Table 层面通过 TableCache。
    在 Block 层面通过 BlockReader 里面的 table->rep_->options.block_cache 分支。

迭代器

在前几篇文章中已经介绍了各种迭代器了,这里只是统一做一个分类。

  1. Iterator
    这个是所有迭代器的基类。
  2. MemTableIterator
  3. Block::Iter
    Block::NewIterator 返回类型,用来遍历一个 Block。
  4. TwoLevelIterator
    Table::NewIterator 返回类型,用来遍历一个 SSTable。此时指定 block_function 为 BlockReader。
  5. DBIter
  6. MergingIterator

DBImpl::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;
...

可以看到,在获取了当前版本current之后,就可以解锁了。
【Q】这里还取出了mem_imm_,是不是在MVCC下面,可能同时存在多个mem_imm_我想从 Compaction 的逻辑来看,不会出现这种情况。这里取出来的目的一方面是增加引用计数,防止被 gc 掉。另一方面也是方便解锁后读。
下面就是经典的读取三部曲:

  1. 首先查 Memtable
    MemTable::Get 的实现很简单,就是用专门的 Iterator 去读。
  2. 然后查 Immutable Memtable
  3. 然后就去 SSTable 里面找,具体是调用 current->Get
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
...

读完之后,锁要重新加回来。
读操作也会触发 Compaction,毕竟有 seek compaction 的嘛。

1
2
3
4
5
6
7
8
9
...
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}

Version::Get

主体函数

主要就是构造一个 state,然后调用ForEachOverlapping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Status Version::Get(const ReadOptions& options, const LookupKey& k,
std::string* value, GetStats* stats) {
stats->seek_file = nullptr;
stats->seek_file_level = -1;

struct State {
...
};

State state;
state.found = false;
state.stats = stats;
state.last_file_read = nullptr;
state.last_file_read_level = -1;

state.options = &options;
state.ikey = k.internal_key();
state.vset = vset_;

state.saver.state = kNotFound;
state.saver.ucmp = vset_->icmp_.user_comparator();
state.saver.user_key = k.user_key();
state.saver.value = value;

ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);

return state.found ? state.s : Status::NotFound(Slice());
}

ForEachOverlapping

ForEachOverlapping 根据 smallest 和 largest 找到对应的文件。

  1. func 的作用是在文件里面找 key。
    实际上是 State::Match 这个函数。这个函数返回 true 说明需要继续搜索其它的文件,返回 false 则说明搜索结束。
    进一步比较 State::Status 和 State::found 可以判断是否找到,以及是否发生了错误。
  2. arg 实际上是 State 对象,但 ForEachOverlapping 并不关心其具体内容,而是作为参数传给 func
1
2
3
4
5
6
7
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();

// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
...

从 Compaction 一文的介绍中了解到,files_ 里面存放了当前 Version 中所有 SSTable 的元信息。
首先遍历第0层的所有文件,放到 tmp 里面,按照 f->number 排序。排完序就借助于 func 查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && // user_ley >= smallest
ucmp->Compare(user_key, f->largest.user_key()) <= 0) { // user_ley <= smallest
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}
...

下面,就可以用之前介绍过的 FindFile 来二分查找剩余的层了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;

// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}

State类

State 类中主要定义了从 SSTable 中找对应 Key 的函数 Match
Match 函数的过程如下:

  1. 设置 seek_file,其作用是判断何时调度 seek_compaction
  2. 尝试从刚才找到的文件 f 对应的 Table Cache 中获得对应的 internal key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
...
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
};
...
struct State {
Saver saver;
GetStats* stats;
const ReadOptions* options;
Slice ikey;
FileMetaData* last_file_read;
int last_file_read_level;

VersionSet* vset;
Status s;
bool found;

static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);

if (state->stats->seek_file == nullptr &&
state->last_file_read != nullptr) {
// We have had more than one seek for this read. Charge the 1st file.
state->stats->seek_file = state->last_file_read;
state->stats->seek_file_level = state->last_file_read_level;
}

state->last_file_read = f;
state->last_file_read_level = level;

state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
if (!state->s.ok()) {
state->found = true;
return false;
}
switch (state->saver.state) {
case kNotFound:
return true; // Keep searching in other files
case kFound:
state->found = true;
return false;
case kDeleted:
return false;
case kCorrupt:
state->s =
Status::Corruption("corrupted key for ", state->saver.user_key);
state->found = true;
return false;
}

// Not reached. Added to avoid false compilation warnings of
// "control reaches end of non-void function".
return false;
}
};
...

TableCache::Get

TableCache 这一块是一个缓存层,如果缓存中没有,才去读 SSTable,并把它加到缓存里面。

  1. 通过 TableCache::FindTable 找到对应的 TableAndFile 对象
  2. Table::InternalGet
1
2
3
4
5
6
7
8
9
10
11
12
13
Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, handle_result);
cache_->Release(handle);
}
return s;
}

TableCache::FindTable

TableCache 实际上是用的一个通用的 Cache 的实现。这个函数通过参数返回一个 Cache::Handle 指针,实际上是 TableAndFile 就是打包 RandomAccessFile*Table*
首先在 cache_ 里面查文件的 file_number,如果没找到说明 Cache Miss,就得调用 Table::Open 去读文件了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = nullptr;
Table* table = nullptr;
s = env_->NewRandomAccessFile(fname, &file);
if (!s.ok()) {
std::string old_fname = SSTTableFileName(dbname_, file_number);
if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
s = Status::OK();
}
}
if (s.ok()) {
s = Table::Open(options_, file, file_size, &table);
}
...

最后调用 Cache::Insert 将这个文件加到 Cache 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
if (!s.ok()) {
assert(table == nullptr);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}

SSTable 格式的回顾

在介绍 Table::InternalGet 前,先来复习一下 SSTable的格式:

  1. data block
  2. meta block
  3. meta index block
  4. index block
    记录每个 data block 的“largest”,满足两个性质。也就是大于等于当前 block 的 key,但是严格小鱼下一个 block 的 key。
    注意,从之前介绍的 FindShortestSeparator 来看,这里的 largest 不是单纯的 largest,而要进行一些修正。它实际上是分隔两个 Data Block 的最短 Key。
  5. footer
    记录 index block 和 meta index block 的位置

所以要先通过 index block 去定位 data block。

接着复习一下 block 的格式

  1. record
  2. restart
  3. 额外信息
    num restarts
    type
    crc32

所以要用 LookupKey 先去找 restart,然后从 restart 开始找。同时注意由于 meta block 的存在,会有一些优化。

Table::InternalGet

首先,遍历 index block。

1
2
3
4
5
6
7
8
9
10
11
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
...

可以通过布隆过滤器判断这个 block 里面有没有。

1
2
3
4
5
6
...
if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
...

布隆过滤器可能假阳,所以后续还要再用 BlockReader 去 Seek 一下。先前介绍过 BlockReader,这个函数返回一个 Iterator,实际上是一个 Block::Iter 对象。
当时他被用在创建 TwoLevelIterator 里面,这个双层迭代器实际上指 index block 上的迭代器和 data block 上的迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
(*handle_result)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok()) {
s = iiter->status();
}
delete iiter;
return s;
}

Table::Open

【这一部分可以先不读,因为是个非主要路径】
Table::Open 负责读取 SSTable 到表对象 Table 中。

1
2
3
4
5
6
7
Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}
...

先读取 footer。

1
2
3
4
5
6
7
8
9
10
11
...
char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;

Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;
...

再读取 block。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
...
// Read the index block
BlockContents index_block_contents;
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);

if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
(*table)->ReadMeta(footer);
}

return s;
}

WAL 格式

在介绍故障恢复之前,先介绍 LevelDB 的 WAL 格式。

故障恢复

Manifest 损坏/丢失

Reference

  1. http://luodw.cc/2015/10/30/leveldb-14/
     介绍WriteBatch
    
  2. https://zhuanlan.zhihu.com/p/340804308
     介绍Revocer逻辑
    
  3. https://blog.csdn.net/sparkliang/article/details/9311487
     介绍RecoverLogFile
    
  4. https://izualzhy.cn/leveldb-write-read
    介绍了LevelDB读写流程,我使用了它的部分图片
  5. https://leeshine.github.io/2019/01/24/leveldb-put-get/
  6. https://sf-zhou.github.io/leveldb/leveldb_10_details.html
    讲述多线程写的demo,很值得一看
  7. http://1feng.github.io/2016/08/24/mvcc-and-manifest/
    介绍MVCC机制,很好
  8. https://www.cnblogs.com/cobbliu/p/6194072.html
    介绍SSTable、Block的格式,一张大图,非常屌
  9. https://blog.csdn.net/weixin_42663840/article/details/82629473
    我见过最屌有关读写的注释
  10. https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format
    介绍WAL格式