RocksDB 的 Memtable
概述
因为RocksDB采用LSM-Tree的算法实现,用户使用RocksDB进行插入或删除操作key-value,首先写入内存的数据结构—Memtable,一旦一个Memtable被写满,它就会转为Immutable,然后被替换为一个新的Memtable.
Memtable 和 Immutable Memtable
在RocksDB中,Memtable 和 Immutable memtable都位于内存, 它俩属于一样的数据结构,唯一的区别是Memtable可读可写,而Immutable memtable是只读的,不允许写入。RocksDB引入了Column family的概念,在一个Column family中只有一个Memtable,但允许存在多个Immutable memtable. 与LevelDB不同的是,RocksDB实现多个数据结构类型的Memtable,默认的是SkipList,即跳跃表。用户可以根据不同的需求,选择不同的Memtable实现,参考Wiki-Memtable。
Memtable 与Immutable Memtable的转换
在一个column family中,维护一个Memtable数据结构 mem_ 和一个Immutable Memtable的列表 imm_:
MemTable* mem_;
MemTableList imm_;当达到限定条件,需要将Memtable转为Immutable Memtable时:首先构造一个新的Memtable, 然后将旧的Memtable添加至 imm_,增加新的Memtable的引用计数,最后将新的Memtable添加至column family:
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
// ...
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
new_mem->Ref();
cfd->SetMemtable(new_mem);Memtable的数据结构
我们以默认的SkipList来分析Memtable, 源码在 db/memtable.cc,在一个Memtable中维护两个SkipList,其中范围删除插入 range_del_table_,其余的操作写入 table_。首先来看Memtable定义的操作接口: Add(),Get()
Add()
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
// 一条key-value Entry的数据格式
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
uint32_t key_size = static_cast<uint32_t>(key.size());
uint32_t val_size = static_cast<uint32_t>(value.size());
uint32_t internal_key_size = key_size + 8;
const uint32_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = nullptr;
// 通过判断key-value的类型来选择memtable, 范围删除的kv插入range_del_table_
std::unique_ptr<MemTableRep>& table =
type == kTypeRangeDeletion ? range_del_table_ : table_;
KeyHandle handle = table->Allocate(encoded_len, &buf);
//...
// 是否允许并发插入
if (!allow_concurrent) {
// 是否制定了函数提取key的前缀
if (insert_with_hint_prefix_extractor_ != nullptr &&
insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
// ...
bool res = table->InsertWithHint(handle, &insert_hints_[prefix]);
} else {
// 插入key-value pair
bool res = table->Insert(handle);
}
} else {
// 插入key-value pair
bool res = table->InsertConcurrently(handle);
}
return true;
}Add() 函数将用户的key和value封装成一个buf,然后根据不同的条件调用 table->Insert() 插入至Memtable。table 就是Memtable的工厂类实现,默认 SkiplistRep, 即通过调用 SkipList 的 Insert() 完成key的插入。
Get()
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
// 在range_del_table_上初始化一个迭代器
std::unique_ptr<InternalIterator> range_del_iter(
NewRangeTombstoneIterator(read_opts));
Status status = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!status.ok()) {
*s = status;
return false;
}
Slice user_key = key.user_key();
// 利用前缀提取过滤判断key是否存在
bool const may_contain =
nullptr == prefix_bloom_
? false
: prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
if (prefix_bloom_ && !may_contain) {
// 如果前缀过滤器通知这个key不存在
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
*seq = kMaxSequenceNumber;
} else {
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.range_del_agg = range_del_agg;
// ...
// 进入默认的SkipList进行查找,利用SkipList的Iterator查找
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}
// ...
return found_final_value;
}Memtable的 Get() 调用了 SkipListRep 的 Get() 接口,通过SkipList的 FindGreaterOrEqual() 来查找。查找出来的key会被传入的回调函数 SaveValue() 根据 type 处理,例如 kTypeDeletion 就返回 NotFound()。
我们顺便来看一下SkipList的实现: memtable/skiplist.h
SkipList数据结构
class SkipList {
private:
struct Node;
public:
explicit SkipList(Comparator cmp, Allocator* allocator,
int32_t max_height = 12, int32_t branching_factor = 4);
void Insert(const Key& key);
bool Contains(const Key& key) const;
uint64_t EstimateCount(const Key& key) const;
// 用来迭代SkipList的迭代器
class Iterator {
// ...
};
private:
// ...
Comparator const compare_;
Allocator* const allocator_; // Allocator used for allocations of nodes
Node* const head_;
std::atomic<int> max_height_; // Height of the entire list
};SkipList本质上是一个带有分层的有序链表:
Node的数据结构
template<typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
// ...
private:
// 存放该节点的next_节点的数组
// 数组大小为该节点的height,当调用NewNode()分配内存初始化整个数组
std::atomic<Node*> next_[1];
};SkipList的插入
这个就是Memtable的 Add() 最后写入Skiplist调用的接口 Insert()。插入的时候随机生成一个高度height,将构造的新的节点从第0层到height层分别插入。
template<typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// 这里对连续插入做了优化,prev_[0]记录了上次插入的Node,prev_height_记录了上次
// 插入的最大层数。如果新的key大于prev_[0]的值并且小于prev_[0]的Next节点的值,即
// prev_[0]就是新的key的前节点。
if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
(prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) {
//...
// 按照层级将每层的前节点补齐。
for (int i = 1; i < prev_height_; i++) {
prev_[i] = prev_[0];
}
} else {
// 逐层查找该key待插入位置的前节点
FindLessThan(key, prev_);
}
// skiplist不允许重复插入
assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key));
int height = RandomHeight();
// 如果当前节点的高度大于最高节点,则高出部分的的前节点都是头节点。
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev_[i] = head_;
}
// ...
}
Node* x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// 从0层开始插入,直到height层,原理就是简单的链表插入
x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i));
prev_[i]->SetNext(i, x);
}
// 这里记录了插入的节点和height,为了连续插入做优化
prev_[0] = x;
prev_height_ = height;
}SkipList的迭代
SkipList的Iterator迭代比较简单,直接从第0层的单链表根据next_[0]顺序迭代。