LevelDB之流程概览

在了解了LevelDB的相关模块的实现后,本文时序地展示LevelDB的流程概览。至少要先了解:

  1. Memtable
  2. SSTable
  3. Compaction机制

先跑一个Demo。

1
2
3
4
5
6
7
8
9
10
11
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;

leveldb::Status status = leveldb::DB::Open(options,"./testdb",&db);

std::string key = "calvinneo";
std::string value = "calvinneo@calvinneo.com";

status = db->Put(leveldb::WriteOptions(), key, value);//添加
status = db->Get(leveldb::ReadOptions(), key, &value);//获取

创建

创建的逻辑实际上是在打开逻辑DB::Open里面分出来的。但由于这部分逻辑简单独立,并且有益于理解整个数据库的layout所以提出来单独讲。
首先设置几个数:

  1. SetLogNumber将日志号设置为0
  2. DescriptorFileName生成Manifest文件,序号为1
  3. SetNextFile设置为2
    1
    2
    3
    4
    5
    6
    7
    Status DBImpl::NewDB() {
    VersionEdit new_db;
    new_db.SetComparatorName(user_comparator()->Name());
    new_db.SetLogNumber(0);
    new_db.SetNextFile(2);
    new_db.SetLastSequence(0);
    const std::string manifest = DescriptorFileName(dbname_, 1);

下面创建Manifest文件。

1
2
3
4
5
WritableFile* file;
Status s = env_->NewWritableFile(manifest, &file);
if (!s.ok()) {
return s;
}

下面一连串操作,就是把new_db去Encode到log里面,并且刷盘

1
2
3
4
5
6
7
8
9
10
11
12
13
{
log::Writer log(file);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
}
delete file;

设置CURRENT指向最新的Manifest

1
2
3
4
5
6
7
8
  if (s.ok()) {
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1);
} else {
env_->RemoveFile(manifest);
}
return s;
}

打开

调用链如下所示

DB::Open

DBImpl的构造函数只是一个初始化成员列表,并不包含其他逻辑了。
在得到DBImpl对象后,我们首先加锁,并且调用Recover方法。这个方法内容是加载Manifest文件,并恢复故障。
值得注意的是save_manifest这个参数,会被通过调用链传得很深,具体作用是:

  1. RecoverLogFile中可能出现Memtable被Dump的情况
  2. Version::Recover中,如果不能ReuseManifest
    1
    2
    3
    4
    5
    6
    7
    8
    9
    Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
    *dbptr = nullptr;

    DBImpl* impl = new DBImpl(options, dbname);
    impl->mutex_.Lock();
    VersionEdit edit;
    // Recover handles create_if_missing, error_if_exists
    bool save_manifest = false;
    Status s = impl->Recover(&edit, &save_manifest);

创建一个新的log文件。如果没有Memtable,需要创建一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}

【Q】有个问题,这里为啥还需要调用LogAndApply?因为在VersionSet::Recover里面已经看到有类似的过程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}

DBImpl::Recover

首先创建数据库目录,并且加文件锁,也就是目录下的LOCK文件,这个函数很有意思,后面专门来讲。

1
2
3
4
5
6
7
8
9
10
11
12
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
mutex_.AssertHeld();

// Ignore error from CreateDir since the creation of the DB is
// committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt.
env_->CreateDir(dbname_);
assert(db_lock_ == nullptr);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}

下面我们检查db目录下有没有CURRENT文件。如果没有,我们认为数据库就不存在,如果此时设置了options_.create_if_missing,就创建,否则返回错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
Log(options_.info_log, "Creating DB %s since it was missing.",
dbname_.c_str());
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(dbname_,
"exists (error_if_exists is true)");
}
}

下面调用VersionSet里面的Recover函数。这个函数负责读取Manifest文件,恢复版本信息。

1
2
3
4
s = versions_->Recover(save_manifest);
if (!s.ok()) {
return s;
}

下面,我们要分析Log文件,如果有Log文件大于Manifest中记录的值,就说明这些日志是上次关闭时丢失的数据,我们需要恢复这些日志。
注意PrevLogNumber不再使用了,但是出于兼容性,我们依旧关注这个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;

filenames表示数据库目录下面的所有文件,我们依次遍历这些文件,并用ParseFileName解析出他们的number。这里的number就是诸如MANIFEST-000002里面的2,应该也是对应到FileMetaData里面的number字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

RecoverLogFile的作用是回放日志,既然这样,就需要对日志进行排序。回放日志会修改VersionEdit,并且可能会导致Compaction。

1
2
3
4
5
6
7
8
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}

MarkFileNumberUsed的作用就是设置next_file_number_,确保next_file_number_要严格大于传入的logs[i]。即,如果小于等于传入的logs[i],就将它设置为logs[i]+1

1
2
3
4
5
6
7
8
9
10
11
12
    // The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}

if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}

return Status::OK();
}

VersionSet::Recover

1
2
3
4
5
6
7
Status VersionSet::Recover(bool* save_manifest) {
struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};

首先读取CURRENT文件内容,得到当前用的Manifest文件。注意,到这里为止,肯定是存在CURRENT文件的,如果不存在,DBImpl::Recover流程就已经会去创建了。

1
2
3
4
5
6
7
8
9
10
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size() - 1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);

如果没找到Manifest,就返回一个错误。对于这种情况,应该也是能处理的

1
2
3
4
5
6
7
8
9
10
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
if (s.IsNotFound()) {
return Status::Corruption("CURRENT points to a non-existent file",
s.ToString());
}
return s;
}

下面就是根据Manifest文件里面的内容,读取并设置VersionSet。
【Q】在哪里写入的呢?答案是在VersionEdit::EncodeToWriter::AddRecord里面,这个函数在LogAndApply的时候被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
int read_records = 0;

{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true /*checksum*/,
0 /*initial_offset*/);

下面,我们用一个while循环,从reader中读取记录。
ReadRecord这个函数,将下一个record读入*record中,如果读取成功,返回true;如果EOF了,就返回false。可能会使用*scratch作为临时存储。*record是有效的,直到下一个对reader的变化操作,或者对*scratch的变化操作。

1
2
3
4
5
6
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
++read_records;
VersionEdit edit;
s = edit.DecodeFrom(record);

Manifest里面会记录当时的Comparator(用文本编辑框打开这个文件,能看到一个类名一样的东西),VersionEdit会比较这两个是否一致。

1
2
3
4
5
6
7
8
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + " does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}

【Q】在LogAndApply实现中,builder.Apply之后还会跟着builder.SaveTo,这里为啥不跟了?稍等,Apply是一条记录Apply一次,SaveTo是最后全搞好了,一次SaveTo。我们往后看,就能看到对SaveTo的调用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    if (s.ok()) {
builder.Apply(&edit);
}

if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}

if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}

if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}

if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}

到此为止,这个文件就读取完毕了,我们释放这个文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
delete file;
file = nullptr;

if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}

if (!have_prev_log_number) {
prev_log_number = 0;
}

MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}

下面就是SaveTo、Finalize、AppendVersion的流程,和LogAndApply是类似的

1
2
3
4
5
6
7
8
9
10
11
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;

检查是继续用现有的Manifest文件,还是重新建一个。这个可能修改descriptor_file_,从而影响到LogAndApply,但是这样的影响只会存在于Recover里面。
【Q】这么处理的目的是什么呢?
目的是为了解决Manifest文件过大的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    // See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
*save_manifest = true;
}
} else {
std::string error = s.ToString();
Log(options_->info_log, "Error recovering version set with %d records: %s",
read_records, error.c_str());
}

return s;
}

DBImpl::RecoverLogFile

【在阅读这个函数前,需要先学习VersionSet::Recover
RecoverLogFile用于读取Log,并且将应用尚未Apply到版本的Log。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // null if options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) *this->status = s;
}
};

mutex_.AssertHeld();

// Open the log file
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}

// Create the log reader.
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : nullptr);
// We intentionally make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long)log_number);

// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
int compactions = 0;
MemTable* mem = nullptr;

现在,我们开始循环读取日志到record中。接着调用InsertInto方法将它写到Memtable中,这个方法原理我们在介绍DB::Write时讲解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);

if (mem == nullptr) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}

接着我们更新last_seq。【Q】有点奇怪,这里为啥要加Count?参考写那一部分的分析。

1
2
3
4
5
const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}

如果Memtable内存超限了,就开启Minor Compaction。当然,这里是一个局部的Compaction,因为不需要维护版本,所以没有LogAndApply调用。因为也不会产生多余的文件,所以也没有RemoveObsoleteFiles调用。回忆一下WriteLevel0Table的实现,我们实际要做的是:

  1. 生成SSTable
  2. 计算SSTable放到那哪一层
  3. 写VersionEdit

如果需要将Memtable落盘,那么就要设置save_manifest为true。这个值是从DBImpl::Open开始一层一层传下来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
  if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
mem = nullptr;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
}
}
}

到现在为止,上面的while循环就结束了,我们释放掉这个日志文件。但是这里同样要看一下是否可以重新利用log文件fname

1
2
3
4
5
6
7
8
9
10
delete file;

// See if we should keep reusing the last log file.
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == nullptr);
assert(log_ == nullptr);
assert(mem_ == nullptr);
uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) {

如果重新利用Log,就不需要走到后面的WriteLevel0Table了。

1
2
3
4
5
6
7
8
9
10
11
12
13
    Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
logfile_number_ = log_number;
if (mem != nullptr) {
mem_ = mem;
mem = nullptr;
} else {
// mem can be nullptr if lognum exists but was empty.
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
  if (mem != nullptr) {
// mem did not get reused; compact it.
if (status.ok()) {
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
}
mem->Unref();
}

return status;
}

文件锁

PosixLockTable

PosixLockTable这个类用来管理所有通过LockFile锁住的文件。
需要注意的是fcntl(F_SETLK)也可以实现文件锁,但是它不能保证同一个进程中的并发访问,所以在此之外,还需要再包一层。
【Q】为什么进程中还会有并发访问?在下文中解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class PosixLockTable {
public:
bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
mu_.Lock();
bool succeeded = locked_files_.insert(fname).second;
mu_.Unlock();
return succeeded;
}
void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
mu_.Lock();
locked_files_.erase(fname);
mu_.Unlock();
}

private:
port::Mutex mu_;
std::set<std::string> locked_files_ GUARDED_BY(mu_);
};

LockFile

为了加锁,我们首先得往自己进程中的PosixLockTable locks_中加入加锁记录。如果加锁失败,说明这个锁已经被我们进程持有了,就退出。

1
2
3
4
5
6
7
8
9
10
11
12
Status LockFile(const std::string& filename, FileLock** lock) override {
*lock = nullptr;

int fd = ::open(filename.c_str(), O_RDWR | O_CREAT | kOpenBaseFlags, 0644);
if (fd < 0) {
return PosixError(filename, errno);
}

if (!locks_.Insert(filename)) {
::close(fd);
return Status::IOError("lock " + filename, "already held by process");
}

如果我们进程没有持有锁,再调用LockOrUnlock加文件锁。如果加锁失败,说明锁已经被其他进程占用了,这时候就要将它从locks_移除出去。

1
2
3
4
5
6
7
8
9
10
11
12
  if (LockOrUnlock(fd, true) == -1) {
int lock_errno = errno;
::close(fd);
locks_.Remove(filename);
return PosixError("lock " + filename, lock_errno);
}

*lock = new PosixFileLock(fd, filename);
return Status::OK();
}

PosixLockTable locks_;

LockOrUnlock

LockOrUnlock根据传入的lock对文件进行F_SETLK操作。F_SETLK是非阻塞的,还有一个F_SETLKW函数是阻塞的。
F_SETLK可以锁定文件的某些部分,在这里,设置l_startl_len都为0,表示锁定整个文件。

1
2
3
4
5
6
7
8
9
10
int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct ::flock file_lock_info;
std::memset(&file_lock_info, 0, sizeof(file_lock_info));
file_lock_info.l_type = (lock ? F_WRLCK : F_UNLCK);
file_lock_info.l_whence = SEEK_SET;
file_lock_info.l_start = 0;
file_lock_info.l_len = 0; // Lock/unlock entire file.
return ::fcntl(fd, F_SETLK, &file_lock_info);
}

有关Linux进程和线程的补充说明

这里需要注意,Linux中pthread库创建出来的线程可能具有相同的PID,不同的TID,我们可以从下面的代码看到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/syscall.h>
#define gettid() syscall(SYS_gettid)

void *start_routine(void* index) {
char msg[99] = "";
snprintf(msg, sizeof(msg)-1, "thd %d: getpid %d gettid %d\n", *(int*)index, getpid(), gettid());
while (1) {
write(1, msg, strlen(msg));
sleep(1);
}
}

int main() {

int th1 = 1;
pthread_t tid1;
pthread_create(&tid1, NULL, start_routine, &th1);

int th2 = 2;
pthread_t tid2;
pthread_create(&tid2, NULL, start_routine, &th2);

const char *msg = "main: i am main\n";
while (1) {
write(1, msg, strlen(msg));
sleep(1);
}

return 0;
}
/*
main: i am main
thd 1: getpid 31270 gettid 31271
thd 2: getpid 31270 gettid 31272
main: i am main
thd 1: getpid 31270 gettid 31271
thd 2: getpid 31270 gettid 31272
main: i am main
thd 1: getpid 31270 gettid 31271
thd 2: getpid 31270 gettid 31272
*/

LevelDB可以通过WriteBatch支持批量更新的功能。当然了,作为对Write函数的一个简易化封装,Put只会更新一个字段。

1
2
3
4
5
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

写数据库的流程:

  1. 写WAL
  2. 写MemTable
  3. 更新Sequence Number

如下所示,写是可以并发的,因此会有类似于InnoDB中的组提交机制。

DBImpl::Write

首先,全局有个writers_队列,维护所有的写。

1
2
3
4
5
6
class DBImpl : public DB {
...
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
...
}

我们新创建一个DBImpl::Writer这个对象,这个对象中有一个关联到mutex_的条件变量w.cv
接着将这个Writer对象放到writers_中,然后我们等待下面的条件:

  1. w.done()
    表示其他线程已经帮w写完了。
  2. w == writers_.front()
    表示这个Writer位于队头,并且抢到了锁。
    1
    2
    3
    4
    5
    Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
    Writer w(&mutex_);
    w.batch = updates;
    w.sync = options.sync;
    w.done = false;

所以当一个写线程进入时,首先先要获得锁,这个锁可能会被其他的写入(的部分阶段)持有,或者被后台Compaction(的部分阶段)线程持有。获得锁之后,它能做的其实也就是把自己的Writer挂到writers_队列上,然后如果现在不是队头,就要去等待信号量。

1
2
3
4
5
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}

如果从条件变量上醒过来,还是要再检查一下有没有w.done(),因为可能是另一个条件醒过来的。

1
2
3
if (w.done) {
return w.status;
}

下面调用MakeRoomForWrite,如果updates是nullptr的话,force就是1,强制MakeRoomForWrite进行Compaction。
【Q】什么时候updates是nullptr呢?DBImpl::TEST_CompactMemTable里面有个注释,说如果设置为nullptr,就是在催促。

1
2
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);

MakeRoomForWrite之后,肯定是可以往数据库里面写东西的了。
我们需要得到一个Sequence Number才能写,我们首先取出上一次写的Sequence Number。

1
2
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;

BuildBatchGroup会合并队列里的多个写入到tmp_batch_里面。这个batch算作一次更新,具有全局唯一的一个Sequence Number,从之前递增而来。在合并的时候需要考虑:

  1. 总写入数据大小
  2. 如果有请求是sync==false了,那么就不加入sync==true
    在合并结束后,BuildBatchGroup会更新last_writer,表示最后一个写入。
    【Q】是不是可能在Memtable有两个record,他们的Sequence Number是相同的?现在看来是有可能的,这是因为批量写的话只会有一个Sequence Number。但是假如有Count个一次性写入,那么Sequence Number会在这个之后增加Count次。有点奇怪。
    1
    2
    3
    4
    if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

下面是写日志的操作对应AddRecord
【Q】根据注释,这个操作是不需要加锁的,为什么呢?文章说,这样可以先让其他请求进入队列中排队。
这样做是安全的,因为只有一个写,就是&w
同时,可以看出这一步会给写入速度带来比较好的提升,因为只有拿到锁才能往writers_里面push。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Add to log and apply to memtable.  We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}

数据库的通用原理,写完日志,状态OK了,才能写Memtable,对应InsertInto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

逐个弹出writers_里的元素,并唤起等待write的线程,直到遇到last_writer

1
2
3
4
5
6
7
8
9
10
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

我们处理完writers队列中的一个项目了,应当Signal一下,通知下一个项目进来。

1
2
3
4
5
6
7
  // Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

DBImpl::BuildBatchGroup

1
2
3
4
5
6
7
8
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != nullptr);

讨论第一个batch的大小来设置max_size

  1. 如果比较小
    就设置为size + (128 << 10)
  2. 如果还可以
    就设置为1 << 20
    1
    2
    3
    4
    5
    6
    7
    8
    9
    size_t size = WriteBatchInternal::ByteSize(first->batch);

    // Allow the group to grow up to a maximum size, but if the
    // original write is small, limit the growth so we do not slow
    // down the small write too much.
    size_t max_size = 1 << 20;
    if (size <= (128 << 10)) { // 128 << 10 == 1 << 17
    max_size = size + (128 << 10);
    }

firstwriters_队头,下面,我们就遍历整个writers_队列,直到:

  1. 如果first是non sync的话,那么我们会在遇到第一个要加入的sync请求的时候就break掉。反之,如果first是sync的话,那么可以兼容non sync的请求的
  2. 大小超限
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    *last_writer = first;
    std::deque<Writer*>::iterator iter = writers_.begin();
    ++iter; // Advance past "first"
    for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
    // Do not include a sync write into a batch handled by a non-sync write.
    break;
    }

    if (w->batch != nullptr) {
    size += WriteBatchInternal::ByteSize(w->batch);
    if (size > max_size) {
    // Do not make batch too big
    break;
    }

我们把这些batch,全部加到result里面。如果涉及多个batch,result就指向tmp_batch_,否则就指向first->batch

1
2
3
4
5
6
7
8
9
10
11
12
13
      // Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}

DBImpl::MakeRoomForWrite

MakeRoomForWrite用来确保我们有空间写入,如果此时Memtable满了,就需要去dump成Immutable Memtable。如果现在Level0负荷过重,那么就要延迟一下写入速度。
在研究这个函数时,我们要特别注意各个if条件的判断顺序,这体现了优先级。

1
2
3
4
5
6
7
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;

一进来,首先一个while循环。唔,这个功能为啥要有while?原因是因为里面要等待信号量的。还有一个原因是,当产生Immutable Memtable之后,我们需要等待它刷盘。

1
2
3
4
5
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;

如果force为false,也就是不强制执行Compaction,就认为是允许延迟的。【Q】其实我没搞懂这个逻辑。
如果允许延迟,并且Level0的文件数达到至少8个,那么就开始慢速写。注意,Level0层最大文件数不是4,这是个误区。当有4个文件的时候开始Compaction,当有12个文件的时候,才停止写入。
慢速写的实现就是主线程睡1000ms,这个时候后台的Compaction线程是可以开始Compact的。在睡眠结束之后,要将allow_delay设为false,也就是说对于一次写,我们只慢速一次。

1
2
3
4
5
6
7
8
9
10
11
12
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();

下面,如果不强制Compaction,并且Memtable的大小没有超标,那么就啥都不要做,这个应该是最通常的情况。

1
2
3
4
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;

如果此时上一轮Immutable Memtable还没有Minor Compact完毕,那么我们就在background_work_finished_signal_这个条件变量上面等待。
我们注意到在进入这个函数时是持有mutex_的,所以这个生产者消费者模式是安全的。

1
2
3
4
5
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();

同理,如果Level0满了,即达到12个文件了,那我们同样要在信号量上等待。

1
2
3
4
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();

对于剩余的情况,我们要将Memtable改成Immutable Memtable。
同时,我们注意到这个分支并不会在最后break掉!这是因为此时有了Immutable Memtable了,我们需要等它被刷成SSTable落盘,所以至少还需要一次while循环。
这个这个刷盘过程等到什么时候呢?

  1. 对于CompactMemTable来说,至少要执行完LogAndApply之后,才会将imm_设置为nullptr。
  2. 而这个条件变量,在MaybeScheduleCompaction调用完之后会被Signal。当然,需要注意,在Major Compaction过程中,如果有Immutable Memtable需要落盘,那么还是要先执行CompactMemTable的,在这个之后,也会触发一次Signal。
    注意,这一次刷盘还可能会导致Level0文件达到上限,那就要等更久了。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
        } else {
    // Attempt to switch to a new memtable and trigger compaction of old
    assert(versions_->PrevLogNumber() == 0);
    uint64_t new_log_number = versions_->NewFileNumber();
    WritableFile* lfile = nullptr;
    s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
    if (!s.ok()) {
    // Avoid chewing through file number space in a tight loop.
    versions_->ReuseFileNumber(new_log_number);
    break;
    }
    delete log_;
    delete logfile_;
    logfile_ = lfile;
    logfile_number_ = new_log_number;
    log_ = new log::Writer(lfile);
    imm_ = mem_;
    has_imm_.store(true, std::memory_order_release);
    mem_ = new MemTable(internal_comparator_);
    mem_->Ref();
    force = false; // Do not force another compaction if have room
    MaybeScheduleCompaction();
    }
    }
    return s;
    }

【Q】思考

  1. 读要加锁么?
    我们首先考虑分布式共识这一块,为了实现一致读写,Raft即使是读请求,也需要走一遍LogEntry的。而ZK的话,可以选择直接读,所以未必是一致读。
    当然,这个离题了。我觉得根据LevelDB的MVCC模式,其实至少有一部分是可以不加锁的。
  2. 在哪些地方可以非线性地查找?
    在非0层找SSTable时,见FindFile。
    在BlockReader返回Iterator之后,可以通过Seek来二分。
  3. 在读取的时候会做缓存么?
    LevelDB在Table和Block两个层面进行缓存。
    在Table层面通过TableCache。
    在Block层面通过BlockReader里面的table->rep_->options.block_cache分支。

DBImpl::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}

MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();

bool have_stat_update = false;
Version::GetStats stats;

可以看到,在获取了current之后,就可以解锁了。
【Q】这里还取出了mem_imm_,是不是在MVCC下面,可能同时存在多个mem_imm_此时,永远写最新的Memtable,但是可能会读旧的Memtable。
下面就是经典的读取三部曲:

  1. 首先看Memtable
  2. 然后看Immutable Memtable
  3. 然后就去SSTable里面找,具体是调用current->Get
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

到此为止,锁要重新加回来。
【Q】看起来读操作也会触发Compaction。

1
2
3
4
5
6
7
8
  if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}

Version::Get

辅助函数

这个函数根据smallest和largest找到对应的文件。
容易想到func的作用是在文件里面找key。

1
2
3
4
5
6
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();

// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;

从Compaction一文的介绍中了解到,files_里面存放了当前Version中所有SSTable的元信息。
我们首先要遍历第0层的所有文件,放到tmp里面,按照f->number排序。排完序,我们就开始查找,在文件中查找需要借助于传入的func,实际上是State::Match这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}

下面,就可以用之前介绍过的FindFile来二分查找了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  // Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;

// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}

State类

State类中主要定义了从SSTable中找对应Key的函数Match
在研究之前,我们先来复习一下SSTable的格式:

  1. data block
  2. meta block
  3. meta index block
  4. index block
    记录每个data block的“largest”,满足两个性质
    注意,这里的largest不是单纯的largest,而要进行一些修正,它实际上是分隔两个Data Block的最短Key,
  5. footer
    记录index block和meta index block的位置

所以,我们要先通过index block去定位data block,得到这个data block。

接着,我们复习一下block的格式

  1. record
  2. restart
  3. 额外信息
    num restarts
    type
    crc32
    所以,我们要用LookupKey先去找restart,然后从restart开始找。

同时,我们注意由于,meta block的存在,会有一些优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
struct State {
Saver saver;
GetStats* stats;
const ReadOptions* options;
Slice ikey;
FileMetaData* last_file_read;
int last_file_read_level;

VersionSet* vset;
Status s;
bool found;

static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);

if (state->stats->seek_file == nullptr &&
state->last_file_read != nullptr) {
// We have had more than one seek for this read. Charge the 1st file.
state->stats->seek_file = state->last_file_read;
state->stats->seek_file_level = state->last_file_read_level;
}

state->last_file_read = f;
state->last_file_read_level = level;

state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
if (!state->s.ok()) {
state->found = true;
return false;
}
switch (state->saver.state) {
case kNotFound:
return true; // Keep searching in other files
case kFound:
state->found = true;
return false;
case kDeleted:
return false;
case kCorrupt:
state->s =
Status::Corruption("corrupted key for ", state->saver.user_key);
state->found = true;
return false;
}

// Not reached. Added to avoid false compilation warnings of
// "control reaches end of non-void function".
return false;
}
};

TableCache::Get和TableCache::FindTable

TableCache这一块是一个缓存层,如果缓存中没有,才去读SSTable,并把它加到缓存里面。Get的第一步是FindTable,先介绍这个。
首先在cache_里面查文件的handle,如果没找到,就新建一个,并且调用Table::Open从文件中读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = nullptr;
Table* table = nullptr;
s = env_->NewRandomAccessFile(fname, &file);
if (!s.ok()) {
std::string old_fname = SSTTableFileName(dbname_, file_number);
if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
s = Status::OK();
}
}
if (s.ok()) {
s = Table::Open(options_, file, file_size, &table);
}

TableAndFile就是打包RandomAccessFile*Table*

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    if (!s.ok()) {
assert(table == nullptr);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}

下面来看Get函数,现在我们已经能得到对应的Table*了,此时调用InternalGet方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, handle_result);
cache_->Release(handle);
}
return s;
}

Table::InternalGet

1
2
3
4
5
6
7
8
9
10
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;

首先,可以通过布隆过滤器判断这个block里面有没有。

1
2
3
4
if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {

由于布隆过滤器可能假阳,所以这边还需要实际Seek一下。我们先前介绍过BlockReader,这个函数返回一个Iterator。实际上是一个Block::Iter对象。
当时他被用在创建TwoLevelIterator里面,这个双层迭代器实际上就是index block上的迭代器和data block上的迭代器的组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
      Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
(*handle_result)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok()) {
s = iiter->status();
}
delete iiter;
return s;
}

Table::Open

【这一部分可以先不读,因为所有对SSTable的读key请求,最后都是从Cache里面处理了】
Table::Open负责读取SSTable到表对象Table中。

1
2
3
4
5
6
Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}

先读取footer。

1
2
3
4
5
6
7
8
9
char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;

Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;

再读取block。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  // Read the index block
BlockContents index_block_contents;
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);

if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
(*table)->ReadMeta(footer);
}

return s;
}

主体函数

主要就是构造一个state,然后调用ForEachOverlapping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Status Version::Get(const ReadOptions& options, const LookupKey& k,
std::string* value, GetStats* stats) {
stats->seek_file = nullptr;
stats->seek_file_level = -1;

struct State {
...
};

State state;
state.found = false;
state.stats = stats;
state.last_file_read = nullptr;
state.last_file_read_level = -1;

state.options = &options;
state.ikey = k.internal_key();
state.vset = vset_;

state.saver.state = kNotFound;
state.saver.ucmp = vset_->icmp_.user_comparator();
state.saver.user_key = k.user_key();
state.saver.value = value;

ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);

return state.found ? state.s : Status::NotFound(Slice());
}

故障恢复

Manifest损坏/丢失

Reference

  1. http://luodw.cc/2015/10/30/leveldb-14/
     介绍WriteBatch
    
  2. https://zhuanlan.zhihu.com/p/340804308
     介绍Revocer逻辑
    
  3. https://blog.csdn.net/sparkliang/article/details/9311487
     介绍RecoverLogFile
    
  4. https://izualzhy.cn/leveldb-write-read
    介绍了LevelDB读写流程,我使用了它的部分图片
  5. https://leeshine.github.io/2019/01/24/leveldb-put-get/
  6. https://sf-zhou.github.io/leveldb/leveldb_10_details.html
    讲述多线程写的demo,很值得一看
  7. http://1feng.github.io/2016/08/24/mvcc-and-manifest/
    介绍MVCC机制,很好
  8. https://www.cnblogs.com/cobbliu/p/6194072.html
    介绍SSTable、Block的格式,一张大图,非常屌
  9. https://blog.csdn.net/weixin_42663840/article/details/82629473
    我见过最屌有关读写的注释