编码

LevelDB 提供了两种编码方式:定长编码和变长编码

  • 定长编码,编码时,将数据以字节为单位放入字符数组中。解码时,每次读取一个字节的数据,放入字符数组中
  • varint 变长编码,不显示地存储数据地长度,它在每个字节的 8 bit 中留出 1 位用来表示数据是否结束。
    • 如果该位为 1,表示数据还没有读取完,需要继续读取;
    • 如果为 0,表示这是数据的最后一个字节。比如数据 1234 的存储格式为 0000 0100 1101 0010

varint 变长编码的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
uint8_t* ptr = reinterpret_cast<uint8_t*>(dst);
static const int B = 128; // 1000 0000 | v 获取低 7 位的值
if (v < (1 << 7)) {
*(ptr++) = v;
} else if (v < (1 << 14)) {
*(ptr++) = v | B;
*(ptr++) = v >> 7;
} else if (v < (1 << 21)) {
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = v >> 14;
} else if (v < (1 << 28)) {
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = (v >> 14) | B;
*(ptr++) = v >> 21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v >> 7) | B;
*(ptr++) = (v >> 14) | B;
*(ptr++) = (v >> 21) | B;
*(ptr++) = v >> 28;
}
return reinterpret_cast<char*>(ptr);
}

const char* GetVarint32PtrFallback(const char* p, const char* limit,
uint32_t* value) {
uint32_t result = 0;
for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
uint32_t byte = *(reinterpret_cast<const uint8_t*>(p));
p++;
if (byte & 128) { // 高 1 位为 1,表示还没有结束
// More bytes are present
result |= ((byte & 127) << shift);
} else {
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char*>(p);
}
}
return nullptr;
}

Slice

为了避免操作字节数组或字符串时不必要的拷贝,LevelDB 自己实现了一个 Slice

1
2
3
4
5
class LEVELDB_EXPORT Slice {
private:
const char* data_; // 切片指针
size_t size_; // 切片大小
};

Slice 并不关心数据的内容,甚至不会为数据分配新的内存空间,只是简单地用指针指向原数据,这从 Slice 定义的 4 个构造函数可以看出

1
2
3
4
5
6
7
Slice() : data_(""), size_(0) {}  
// Create a slice that refers to d[0,n-1].
Slice(const char* d, size_t n) : data_(d), size_(n) {}
// Create a slice that refers to the contents of "s"
Slice(const std::string& s) : data_(s.data()), size_(s.size()) {}
// Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) {}

Slice 还重载了[]、=、== 、!= 操作符,让 Slice 支持比较和根据下表取值等操作

1
2
3
4
inline bool operator==(const Slice& x, const Slice& y) {  
return ((x.size() == y.size()) &&
(memcmp(x.data(), y.data(), x.size()) == 0));
}

同时 Slice 还自定义了 Compare 方法,用于比较两个 Slice 的大小,其具体逻辑是比较两个字符串的字典序

1
2
3
4
5
6
7
8
9
10
11
inline int Slice::compare(const Slice& b) const {  
const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
int r = memcmp(data_, b.data_, min_len);
if (r == 0) {
if (size_ < b.size_)
r = -1;
else if (size_ > b.size_)
r = +1;
}
return r;
}

WriteBatch

WriteBatch holds a collection of updates to apply atomically to a DB.
The updates are applied in the order in which they are added to the WriteBatch.

WriteBatch 可以用于批量更新,它内部保证了更新的原子性,这是通过 wal 日志实现的,即对于一个 WriteBatch 里的所有更新,wal 只会写一次日志。

WriteBatch 只有一个成员变量 rep_,其结构如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
std::string rep_; 

+--------------------------+
| seq(8) | count(4) | data |
+--------------------------+
/ \
+----------------------------------------+
| record1 | record2 | ... | record count |
+----------------------------------------+
/ \
+--------------------+
| type | key | value |
+--------------------+
  • Sequence:序列号,占 8 字节,写入 db 前才赋值为 LastSequence + 1
  • CountWriteBatch 中 key 的数量
  • Data:一个 Record 数组,每个 Record 由 1 字节的 kTypeValue 和 Varint32 编码的 key,value 组成。如果 kTypeValue 值为 delete,则不存储 value

WriteBatch 在 向rep_ 中写入数据时,并不会先写入 Sequence,只是为其空出了一个 8 字节的空间而已。只有当向 memTable 中插入前才会设置 Sequence。 在插入 memTable 的过程中,Sequence 也会递增,作为插入 key 的一部分插入 memTable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void WriteBatch::Put(const Slice& key, const Slice& value) {  
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}

// insert
// ->| DBImpl::Write
// ->| WriteBatchInternal::InsertInto
// ->| WriteBatch::Iterate
// ->| MemTableInserter::Put
// # MemTableInserter::Put
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}