LevelDB之SSTable实现

本文介绍LevelDB的SSTable相关功能。
SSTable是LevelDB的内存数据结构。当一个Memtable满之后,会被变成Immutable Memtable,并写入SSTable Level0。Level0的SSTable是没有经过归并的,各个Key可能互相重叠。经过Compaction达到Level1之后,就是有序的了。

SSTable格式

SSTable体现在后缀为.sst或者.ldb的文件。
其实在官方文档中,对SSTable的格式已经有了介绍。

1
2
3
4
5
6
7
8
9
10
11
12
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer] (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>

  1. data block
    放KV对,是有序的(doc里面说的)。因此在查询SSTable文件的时候,也可以二分。【待确认】
    关于Block的组织,我们将专门讨论。
  2. meta block
    用来快速定位key是否在data block中
  3. metaindex block
    每个metaindex block一条记录。其中K是meta block的名字,V是指向这个meta block的BlockHandle。
    BlockHandle类似于指针,具有下面的结构。容易看到,这里面也用了VarInt结构。

    1
    2
    offset:   varint64
    size: varint64

    有两种meta block类型,filter和stats:

    1. 如果在数据库启动时指定了某个FilterPolicy,就会创建一个filter block。
    2. 统计信息
  4. index block
    每个data block一条entry。这个index block entry的.key()大于等于指向的data block最后一个K【性质1】,但是严格小于下一个data block的第一个K【性质2】。
    因此,我们可以通过和index block比较来快速定位data block。
  5. footer
    包含:
    1. metaindex handle
    2. index handle
    3. padding
    4. magic number

Block实现

SST的构建主要集中在table_builder.h/ccblock_builder.h/cc中。
SST的读取主要集中在table.h/ccblock.h/cc中。
从前面可以看到,SSTable主要有两层结构,Table(SSTable)和Block(data/index等)。
Table由多个Block构成,所以从Block开始分析。

BlockBuilder/Block原理

BlockBuilder负责生成诸如data block、index block等所有block。
Block对象负责读取这些block。

共享前缀

因为BlockBuilder是有序的,所以可以使用共享前缀来节约空间,我们比较下面两种方式:

  1. 普通

    1
    2
    Hello World
    Hello William
  2. 共享

    1
    2
    Hello World
    illiam

Block的构造如下图所示

其中:

  1. shared_bytes
    Key的共享前缀的长度,这里的共享指的是上一个entry
  2. key_delta/unshared_bytes
    Key共享前缀后的剩余串和其长度
  3. value/value_length
    值的串和其长度

那么如何读呢?

  1. 最坏状况,我们需要读到第一个Entry,考虑下面的Key

    1
    2
    3
    a
    ab
    abc
  2. 最好状况,我们读当前的就行

    1
    2
    a
    b

restart

可见共享前缀会给读带来困难,因此又引入restart机制,即每隔block_restart_interval之后会去存储一次完整的key,对应的entry的位置称为restart point。在block中,会存储下所有的restart point。
因为数字存储是有序的,所以我们能通过二分restart point来加速读取,具体代码在Block::Iter中。读取需求一般是给定target,要求找到第一个K大于等于target的entry。因此我们可以得到如下二分

  1. mid < target
    则搜索[mid, right]
  2. mid >= target
    搜索[left, mid-1]
    因为这是一个TTT…F/T的二分,所以我们要令mid = (left + right + 1)/2

filter block

在每个data block内部,借助于二分restart可以实现$log(n)$复杂度的查询,那么能在data block之间二分么?

可以通过filter block来判断某个key是否属于该data block,实现是bloom filter。

BlockBuilder实现

方法

  1. void Add(const Slice& key, const Slice& value);
    每一次Add的Key,必须是有序的,从小到大的。
  2. Slice Finish();
  3. void Reset();

BlockBuilder::Add

首先是三个assert:

  1. 第一个很好理解,如果finished_,相当于已经调用了Finish或者Abandon等方法。
  2. block_restart_interval表示每过多少个key就要设置一个restarts。设置完之后,counter_会被重置为0,所以这个不等式是成立的。
  3. 最后一个是有序性检验,要不是空的,要不新来的key要大于老的last_key_piece
    1
    2
    3
    4
    5
    6
    void BlockBuilder::Add(const Slice& key, const Slice& value) {
    Slice last_key_piece(last_key_);
    assert(!finished_);
    assert(counter_ <= options_->block_restart_interval);
    assert(buffer_.empty() // No values yet?
    || options_->comparator->Compare(key, last_key_piece) > 0);

下面判断要不要restart:

  1. 如果不要,和前一个key即last_key_比较,算出来能share多少长度。
  2. 如果要,就restart。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    size_t shared = 0;
    if (counter_ < options_->block_restart_interval) {
    // See how much sharing to do with previous string
    const size_t min_length = std::min(last_key_piece.size(), key.size());
    while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
    shared++;
    }
    } else {
    // Restart compression
    restarts_.push_back(buffer_.size());
    counter_ = 0;
    }
    const size_t non_shared = key.size() - shared;

下面就是往buffer_里面写数据了。

1
2
3
4
5
6
7
8
// Add "<shared><non_shared><value_size>" to buffer_
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());

// Add string delta to buffer_ followed by value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());

下面这个优化点也很有趣,首先last_key_保存的是一个完整的key。但是我们可以复用之前一个key的shared部分,这个是安全的。接着我们把non shared部分append上去。

1
2
3
4
5
6
  // Update state
last_key_.resize(shared);
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;
}

BlockBuilder::Finish

为啥restarts_不用VarInt存呢?

1
2
3
4
5
6
7
8
9
Slice BlockBuilder::Finish() {
// Append restart array
for (size_t i = 0; i < restarts_.size(); i++) {
PutFixed32(&buffer_, restarts_[i]);
}
PutFixed32(&buffer_, restarts_.size());
finished_ = true;
return Slice(buffer_);
}

Block实现

  1. uint32_t NumRestarts() const;
    之前了解过block的结构,在Block的最后,是restart点的个数。

    1
    2
    3
    4
    inline uint32_t Block::NumRestarts() const {
    assert(size_ >= sizeof(uint32_t));
    return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
    }
  2. const char* data_;/size_t size_;
    也就是这个Block的指针和长度。

  3. uint32_t restart_offset_;
    表示restart的开始位置

    1
    size_ - (1 + NumRestarts()) * sizeof(uint32_t)
  4. bool owned_;
    取决于构造函数传入的BlockContentsowned_字段。

Block::Iter

Seek

这个函数是一个二分的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void Seek(const Slice& target) override {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
int current_key_compare = 0;

if (Valid()) {
// If we're already scanning, use the current position as a starting
// point. This is beneficial if the key we're seeking to is ahead of the
// current position.
current_key_compare = Compare(key_, target);
if (current_key_compare < 0) {
// key_ is smaller than target
left = restart_index_;
} else if (current_key_compare > 0) {
right = restart_index_;
} else {
// We're seeking to the key we're already at.
return;
}
}

while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr =
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
&non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}

// We might be able to use our current position within the restart block.
// This is true if we determined the key we desire is in the current block
// and is after than the current key.
assert(current_key_compare == 0 || Valid());
bool skip_seek = left == restart_index_ && current_key_compare < 0;
if (!skip_seek) {
SeekToRestartPoint(left);
}
// Linear search (within restart block) for first key >= target
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}

Table实现

TableBuilder

对于Add/Flush/Finish/Abandon,此时Adandon和Finish不能已经被调用。

  1. Status ChangeOptions(const Options& options);
    修改option,但是只有某些可以在构建之后被修改。对于那些不能修改的,会报错。
  2. void Add(const Slice& key, const Slice& value);
    增加一个KV对。
    key is after any previously added key according to comparator,往TableBuilder里面加KV,必须是有序的?
  3. void Flush();
    刷盘,一般不直接用。主要被用来保证两个相邻的Entry不会在同一个data block中。
  4. Status status() const;
  5. Status Finish();
    结束当前表的构建。
  6. void Abandon();
    表示需要丢弃当前缓存内容,并且结束表的构建。
  7. uint64_t NumEntries() const;
    Add了多少次,实际上返回的是TableBuilder::Rep里面的num_entries
  8. uint64_t FileSize() const;
  9. void WriteBlock(BlockBuilder* block, BlockHandle* handle);
  10. void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);

此外,TableBuilder持有一个Req类型的对象指针,用来隐藏相关实现。

TableBuilder::Rep

具有下面的:

  1. Options options;
  2. Options index_block_options;
  3. WritableFile* file;
    WritableFile是一个接口,具体实现可以分为随机读写文件,顺序读写文件等。
  4. uint64_t offset;
  5. Status status;
    ok()的返回值,表示是否发生了错误。一般,错误会在file里面的Append和Flush方法中出现。
  6. BlockBuilder data_block;
  7. BlockBuilder index_block;
  8. std::string last_key;
    每一次Add会更新这个字段。因为Add是有序的,所以实际上就表示了当前最大的key。
  9. int64_t num_entries;
    NumEntries
  10. bool closed;
    Finish和Abandon会设置为true。
  11. FilterBlockBuilder* filter_block;
  12. bool pending_index_entry;
    当写完一个data block之后,设置pending_index_entry,表示需要更新index block。
    因此这里有个不变量,当且仅当data_block为空的时候pending_index_entry才是true。也就是说当写入一个data block后(注意一个table中有多个data block),设置pending_index_entry为true,之后更新index block。
    原因是直到我们见到下一个data block的第一个key的时候才能写index。这样我们可以在写index的时候用尽可能短的key。例如我们已经知道第一个data block中最大的是”the quick brown fox”,而第二个data block中最小的是”the who”。这样通过之前提到的FindShortestSeparator,我们就可以用”the r”作为index block中的entry。这个entry满足条件,即大于等于第一个block中的所有key,并且小于第二个block中的所有key。
    因此,顺序是:
    1. 写一个data block
    2. 接到下一个Add请求
    3. 根据last_key和当前传入的Key,写index
    4. 正常处理该Add请求
  13. BlockHandle pending_handle;
    Handle to add to index block。不是说用这个handle来写index block,而是会把这个handle里面的值写到index block里面作为index。
  14. std::string compressed_output;

TableBuilder::Add

1
2
3
4
5
6
7
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}

如果pending_index_entry是true,说明之前已经写入了一个data block。于是我们就要插入index,并且清空pending_index_entry标志。这一部分原理在pending_index_entry讲解过了。

1
2
3
4
5
6
7
8
if (r->pending_index_entry) {
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}

下面是添加的逻辑,先处理filter block。

1
2
3
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}

然后设置last_key,并向当前的data block中添加KV。
估算当前data block的大小,如果超过配置的阈值options.block_size就进行dump。这个估算实际上就是统计所有entry以及restart的总大小。相比Spark里面的大小估计,感觉LevelDB/Redis里面的大小估计要简单很多,感觉得益于C/C++能自己管理内存。

1
2
3
4
5
6
7
8
9
  r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);

const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}

TableBuilder::WriteBlock

WriteBlock

Table要写某个Block,首先Finish这个block,也就是说把所有restart都写到文件里面。

1
2
3
4
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();

接着,是写data block。实际写入的block_contents可能是被压缩了的,也可能是没有被压缩的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;

case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();

清空这个block里面buffer_restarts_等状态。

1
2
  block->Reset();
}

TableBuilder::WriteRawBlock

每一个block包含:

  1. data
  2. type 表示有没有压缩
  3. crc32
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    void TableBuilder::WriteRawBlock(const Slice& block_contents,
    CompressionType type, BlockHandle* handle) {
    Rep* r = rep_;
    handle->set_offset(r->offset);
    handle->set_size(block_contents.size());
    r->status = r->file->Append(block_contents);
    if (r->status.ok()) {
    char trailer[kBlockTrailerSize];
    trailer[0] = type;
    uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
    crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
    EncodeFixed32(trailer + 1, crc32c::Mask(crc));
    r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
    if (r->status.ok()) {
    r->offset += block_contents.size() + kBlockTrailerSize;
    }
    }
    }

TableBuilder::Flush

前面是判断一些条件。如果说data block是空的,那么就直接返回。接着断言pending_index_entry这个是true,这个是之前提到的不变量。

1
2
3
4
5
6
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);

根据pending_handle的说明,它的值会被写到index block里面。

1
2
3
4
5
6
7
8
9
  WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}

TableBuilder::Finish

Finish操作用来生成一个SSTable。
首先先Flush,也就是把data_block写盘。

1
2
3
4
5
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;

什么时候不ok呢?也就是发生错误的情况。
下面,写入filter block。

1
2
3
4
5
6
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}

如果上面的写入是成功的,接着写入meta index block。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}

// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}

如果上面的写入是成功的,接着写入index block。

1
2
3
4
5
6
7
8
9
10
11
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}

如果上面的写入是成功的,接着写入footer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  // Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}

Table

下面介绍Table类,它的作用是负责读取SSTable。

  1. static Status Open(const Options& options, RandomAccessFile* file, uint64_t file_size, Table** table);
    解析传入的file
    如果成功,返回ok,并且设置*table,这是一个指针,由调用方释放。
    如果失败,返回一个非ok,并且设置*table为nullptr。
    Does not take ownership of “*source”, but the client must ensure that “source” remains live for the duration of the returned table’s lifetime.
  2. Iterator* NewIterator(const ReadOptions&) const;
  3. uint64_t ApproximateOffsetOf(const Slice& key) const;
    传入一个key,返回它在文件中的大概位置。对不存在的key,返回如果存在,那么大概在的位置。
  4. static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);
    TwoLevelIterator需要这个函数,通过它来构建一个data_iter_
  5. Status InternalGet(const ReadOptions&, const Slice& key, void* arg, void (*handle_result)(void* arg, const Slice& k, const Slice& v));
  6. void ReadMeta(const Footer& footer);
  7. void ReadFilter(const Slice& filter_handle_value);

Table::Rep

  1. Options options;
  2. Status status;
  3. RandomAccessFile* file;
  4. uint64_t cache_id;
  5. FilterBlockReader* filter;
  6. const char* filter_data;
  7. BlockHandle metaindex_handle;
  8. Block* index_block;

Table::Open

首先解析出footer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}

char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;

Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;

接下来,解析index block。

1
2
3
4
5
6
7
// Read the index block
BlockContents index_block_contents;
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);

初始化rep_字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
(*table)->ReadMeta(footer);
}

return s;
}

Table::NewIterator

实际上调用NewTwoLevelIterator得到一个TwoLevelIterator

1
2
3
4
5
6
7
8
9
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
}

Iterator* NewTwoLevelIterator(Iterator* index_iter,
BlockFunction block_function, void* arg,
const ReadOptions& options);

方法NewIterator的实现如下。如果没有restart点,那么就创建一个空的迭代器,否则创建一个Block::Iter

1
2
3
4
5
6
7
8
9
10
11
Iterator* Block::NewIterator(const Comparator* comparator) {
if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(comparator, data_, restart_offset_, num_restarts);
}
}

两个Level的含义是:

  1. IteratorWrapper index_iter_负责查询index block,找到key所在的data block。
    IteratorWrapper封装了Iterator,可以理解为一层对valid()key()的cache。Iterator是个接口,实际类型应该是Block::Iter【待确认】。
  2. IteratorWrapper data_iter_负责在这个block里面查找。

TwoLevelIterator

  1. BlockFunction block_function_;
    block_function_可以从一个index_iter_创建一个data_iter_
    在Table的实现中,是Table::BlockReader这个函数。我们将在后面详细分析这个函数。
  2. void* arg_;
    在Table的实现中,传入了Table* this
  3. const ReadOptions options_;
  4. Status status_;
  5. IteratorWrapper index_iter_;
  6. IteratorWrapper data_iter_;
  7. std::string data_block_handle_;
    如果data_iter_不是null,那么data_block_handle_持有传给block_function_的那个index_iter_的值。

TwoLevelIterator::Next

存在一个问题,如果我们一直data_iter_.Next(),我们迟早会碰到一个Block的右边界,这样后面迭代器就Invalid了。因此需要检查如果data_iter_当前已经失效了,那么就递增index_iter_,获取下一个data_iter_,具体实现见下面。

1
2
3
4
5
void TwoLevelIterator::Next() {
assert(Valid());
data_iter_.Next();
SkipEmptyDataBlocksForward();
}

TwoLevelIterator::SkipEmptyDataBlocksForward

【Q】在上面说过这个函数的作用了,但是为啥这里实现是while而不是if呢?

1
2
3
4
void TwoLevelIterator::SkipEmptyDataBlocksForward() {
while (data_iter_.iter() == nullptr || !data_iter_.Valid()) {
// Move to next block
...

SetDataIterator函数接受一个迭代器作为参数,如果迭代器不是空,那么就设置为data_iter_,并且释放掉原来的iter_内存。

1
2
3
4
5
6
7
8
...
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
return;
}
index_iter_.Next();
InitDataBlock();
...

需要注意,这里需要显式将data_iter_移动到当前data block的开头。

1
2
3
4
...
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
}
}

TwoLevelIterator::InitDataBlock

InitDataBlock作用是从index_iter_构建(解析)出一个data block。
如果index_iter_无效,那么设置data_iter_也无效。
如果data_iter_不为空,并且等于之前的data_block_handle_,说明data_iter_现在就指向的这个data block,那么就跳过。
否则,以index_iter_为参数,通过block_function_生成一个新的data_iter_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TwoLevelIterator::InitDataBlock() {
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
} else {
Slice handle = index_iter_.value();
if (data_iter_.iter() != nullptr &&
handle.compare(data_block_handle_) == 0) {
} else {
Iterator* iter = (*block_function_)(arg_, options_, handle);
data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter);
}
}
}

TwoLevelIterator::Seek

首先,在index block层Seek。下面证明只要找这个index_iter_指向的data block就行,也就是说,target不会出现在(index_iter_ - 1)(index_iter_ + 1)指向的data block里面。

  1. 因为LevelDB中的性质,Seek得到的是第一个大于等于target的指针。此时,(index_iter_ - 1)中的.key()严格小于target的。而根据index block的【性质1】,这个index block entry指向的data block中的所有K都小于等于(index_iter_ - 1).key()。因此,(index_iter_ - 1)指向的data block里面所有的K,都小于target。
  2. 此外,index_iter_.key()是大于等于target的。
  3. 下面,还要证明(index_iter_ + 1).key()指向的data block里面的所有K都大于target。根据【性质2】我们知道index_iter_.key()会严格小于它指向的下一个data block中的所有K,根据我们上一条结论可以知道target严格小于下一个data block中的所有K,所以target如果存在的话,一定是当前data block上的。
    1
    2
    void TwoLevelIterator::Seek(const Slice& target) {
    index_iter_.Seek(target);

接着初始化data_iter_。接着在data block层Seek。

1
2
3
4
  InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.Seek(target);
SkipEmptyDataBlocksForward();
}

Table::BlockReader

接受三个参数:

  1. arg
    这个类型设置就很奇怪,实际上是一个Table*,表示我们现在读的那个Table的上下文。
  2. index_value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg, const ReadOptions& options,
const Slice& index_value) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = nullptr;
Cache::Handle* cache_handle = nullptr;

BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.

if (s.ok()) {
BlockContents contents;
if (block_cache != nullptr) {
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer + 8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
if (contents.cachable && options.fill_cache) {
cache_handle = block_cache->Insert(key, block, block->size(),
&DeleteCachedBlock);
}
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  Iterator* iter;
if (block != nullptr) {
iter = block->NewIterator(table->rep_->options.comparator);
if (cache_handle == nullptr) {
iter->RegisterCleanup(&DeleteBlock, block, nullptr);
} else {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}

Reference

  1. https://izualzhy.cn/leveldb-block
  2. https://izualzhy.cn/leveldb-sstable