顺着 log 的使用,其实可以找到 LSM-Tree 的主线:根据写 WAL 可以找到 MemTable 的写流程,根据写 MANIFEST 可以顺出 Compaction 的流程。这篇文章主要关注 MemTable。

Arean

Arean 是 LevelDB 自己实现的内容管理,其主要是与 SkipList 绑定(实际上也是唯一的用途),而 SkipList 又是与 MemTable 进行绑定的。MemTable 内部是使用引用计数法来管理的,每当使用 MemTable 时引用会加一,结束使用时,引用会减一。当 MemTable 变为 Immutable MemTable 写入 SSTable 时引用会减为 0,这时就会销毁 MemTable,同时也会销毁内部的 SkipList 和 Arean

classDiagram class Arena{ - char* alloc_ptr_ - size_t alloc_bytes_remaining_ - vector~char*~ blocks_ - atomic~size_t~ memory_usage_ + Allocate() char* + AllocateAligned() char* + MemoryUsage() size_t - AllocateFallback(size_t bytes) char* - AllocateNewBlock(size_t block_bytes) char* }

Arean 对外只提供了 AllocateAllocateAligned 两种分配的方法,区别在于是否需要对齐。Arean 内部也是以 block 为单位进行管理的,这一点从 Aren 的成员变量就能看出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}
char* Arena::AllocateFallback(size_t bytes) {
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
// 大于 1/4 block 单独分配一个 特定大小的 block,避免浪费
char* result = AllocateNewBlock(bytes);
return result;
}

// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;

char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}

如果需要分配的字节数量小于 block 中剩余的字节数量,那么就可以直接分配,否则就需要考虑换一个 block 存储了。如果要分配的字节数量大于 block 大小的 1/4,那么就会为其单独分配一个特定大小的 block,原来的 block 仍然可以用于后续的分配操作。这样做的好处是可以将大对象单独放在一个特定大小的 block,还支持分配大于一个 block size 的对象。如果要分配的字节数量小于 block 大小的 1/4,那么说明当前 block 剩余空间不足 1/4,那么就直接换一个 block(原来的 block 不再用于分配内存了)

下面是 SkipList 使用 Arean 的方式

1
2
3
4
5
6
7
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}

SkipList

SkipList 是一个多级链表结构,可以实现 O(LogN)O(Log{N}) 复杂度的插入和查询操作,其相较于平衡树,SkipList 的优势在于实现简单。

classDiagram direction LR SkipList *-- Iterator class SkipList { - compare_ : Comparator - arena_ Arena* : const - head_ : Node* const - max_height_ : atomic~int~ - rnd_ : Random + Insert(const Key& key) void + Contains(const Key& key) bool - GetMaxHeight() int - NewNode(const Key& key, int height) Node* - RandomHeight() int - Equal(const Key& a, const Key& b) bool - KeyIsAfterNode(const Key& key, Node* n) bool - FindGreaterOrEqual(const Key& key, Node** prev) Node* - FindLessThan(const Key& key) Node* - FindLast() Node* } class Iterator { - list_ : SkipList* - node_ Node* +Valid() bool +key() Key& +Next() void +Prev() void +Seek(const Key& target) void +SeekToFirst() void +SeekToLast() void }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class SkipList {
private:
struct Node;

public:
explicit SkipList(Comparator cmp, Arena* arena);
SkipList(const SkipList&) = delete;
SkipList& operator=(const SkipList&) = delete;

// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key& key);

// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const Key& key) const;

// Iteration over the contents of a skip list
class Iterator {
public:
explicit Iterator(const SkipList* list);
bool Valid() const;
const Key& key() const;
void Next();
void Prev();
void Seek(const Key& target);
void SeekToFirst();
void SeekToLast();

private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};

private:
enum { kMaxHeight = 12 };

inline int GetMaxHeight() const {
return max_height_.load(std::memory_order_relaxed);
}

Node* NewNode(const Key& key, int height);
int RandomHeight();
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;

// Return the earliest node that comes at or after key.
// Return nullptr if there is no such node.
//
// If prev is non-null, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

// Return the latest node with a key < key.
// Return head_ if there is no such node.
Node* FindLessThan(const Key& key) const;

// Return the last node in the list.
// Return head_ if list is empty.
Node* FindLast() const;

Comparator const compare_;
Arena* const arena_; // Arena used for allocations of nodes
Node* const head_;
std::atomic<int> max_height_; // Height of the entire list
Random rnd_;
};

SkipList 对外只提供了插入方法 Insert 和一个迭代器 Iterator,其中插入方法只支持插入 key。在内部,SKipList 实现了 FindGreateOrEqual 等辅助方法,用于帮助实现 SKipList 的功能,这里只列出 FindGreatOrEqual 的实现,因为其与 SKipList 的许多操作密切相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 找到第一个大于等于 key 的节点,并将前驱结点存储到 prev 中
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) { // 如果查找 key > next->key,说明要查找的 key 在后面,需要继续向前查找
// Keep searching in this list ; key > next->key
x = next;
} else { // 否则 key <= next->key,可以将该 node 作为 prev 前驱结点
// key <= next->key
if (prev != nullptr) prev[level] = x; // 记录前驱节点,可以用于插入操作
if (level == 0) { // 如果已经查到第 0 层了,可以直接返回了
return next;
} else {
// Switch to next list
level--; // 否则查找下一层
}
}
}
}

在 SkipList 的插入和迭代查找操作中,都调用了 FindGreatOrEqual 方法,这得益于 key 在 SkipList 中排序的方式。SkipList 中的 key 是按照 user key 的大小升序排列的,如果 user key 相同,按照 sequence 降序排列。这样 FindGreatOrEqual 方法就能找到最接近某个版本的数据了。下面是查找和插入操作的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}

template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight();
if (height > GetMaxHeight()) {
// 更新当前最大高度,并将新增高度的前驱结点设置为 head_
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
max_height_.store(height, std::memory_order_relaxed);
}

// 插入节点
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}

MemTable

classDiagram class MemTable { - refs_ : int - arena_ : Arena - table_ : Table + MemTable(InternalKeyComparator comparator) - ~MemTable() + Ref() void + Unref() void + ApproximateMemoryUsage() size_t + NewIterator() Iterator + Add(SequenceNumber seq, ValueType type, Slice& key, Slice& value) void + Get(const LookupKey& key, std::string* value, Status* s) bool }

MemTable 对外提供了 Get()Add() 方法,分别对应于查找和插入元素。由于 SkipList 只支持插入一个 key,所以 MemTable 需要将 key 和 value 合起来作为一个 插入 key 插入到 SkipList 中。key 的结构如下所示

1
2
3
4
5
6
7
8
9
10
|         Lookup key        |
| internal key |
| user key|
+-----------------------------------------------+
| ikey_size | key | tag | value_size | value |
+-----------------------------------------------+
/ \
+------------+
| seq | type |
+------------+

Add 方法很简单,就是按照上面的格式将数据组织成 key,然后插入 SkipList 中。需要说明的是 key_size 和 value_size 都是使用 varint32 变长编码存储的。然后就是 tag 的生成方式是采用 seq << 8 | type 的方式生成的,也就是说高 7 位用作序列号,低 1 位用作 type,type 表明这是插入操作还是删除操作

LevelDB 提供了两种写入操作,分别是插入单个 key 和原子地插入多个 key,其实最终都会调用后者的方法,也就是说在插入 MemTable 前只会写一次 WAL 日志,然后再批量插入 MemTable 中。但是所谓的批量插入 MemTable 也只是循环调用了 Add 方法

1
2
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value);
Status Write(const WriteOptions& options, WriteBatch* updates) = 0;

DBImpl::Write

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
// 先写 log
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
// 如果开启了同步选项,将文件刷盘
status = logfile_->Sync();
}
if (status.ok()) {
// 插入 memTable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

从上面的代码可以看出,确实只会写入一次 WAL,然后再调用 WriteBatchInternal::InsertInto 批量插入 MemTable。需要注意的是 Sequence 的修改,它只会将下一个 seq 设置到 WriteBatch 中(WriteBatch 的结构可以查看之前的文章),并更新一次(更新为 last_sequence + count + 1)

WriteBatchInternal::InsertInto 最终会调用 WriteBatch::Iterate 方法,将 WriteBatch 中的 key/value 全部插入到 MemTable 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);

input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
// 插入 memTable 时,会调用 MemTableInserter::Put(key, value)
handler->Put(key, value);
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
return Status::OK();
}

void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}

可以看到 WriteBatch::Iterate 确实会遍历 WriteBatch 中的所有 key/value,然后插入 MemTable。需要注意的是,不管是 Put 还是 Delete,都会增加 sequence,这与有些博客说的 WriteBatch 中的数据公用一个序列号的说法不一致

为了方便查找,LevelDB 封装了一个 LookupKey,里面包含需要查找的 user key 和一个序列号,以及 LookupKey 的长度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) { // char space_[200] 是为了避免 key 太短了
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8); // 写入 size
kstart_ = dst; // 记录 user key 开始的位置
std::memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst; // 记录 LookupKey 结束的位置
}

查找的时候,会将 sequence 设置为当前最新的 sequence,由于 SkipList 中查找时会将第一个大于等于查找 key 的元素返回,再结合 key 在 SkipList 中的排列顺序,可以发现如果 key 存在于 SkipList 中,就会返回最新版本的 key。

DBImpl::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
// 先从 memtable 中查找,其次是 immMemtable. 最后从 sst 中查找
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) { // 传入的是当前最新的 key
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key(); // memKey 是 internal_key_size + user key + tag 部分
Table::Iterator iter(&table_);
iter.Seek(memkey.data()); // 查找第一个大于等于 memkey 的元素
if (iter.Valid()) {
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
// 再次查看是否是我们需要的 key,因为 key 有可能不存在
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion: // key 已经被删除了,所以返回 NotFound
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}

可以看到,找到元素后需要再比较一次,因为 SkipList 返回的是第一个大于等于 key 的元素。如果 key 不存在,返回的元素就不是当前元素了,所以需要再比较一次