概述

因为RocksDB采用LSM-Tree的算法实现,用户使用RocksDB进行插入或删除操作key-value,首先写入内存的数据结构—Memtable,一旦一个Memtable被写满,它就会转为Immutable,然后被替换为一个新的Memtable.

Memtable 和 Immutable Memtable

在RocksDB中,Memtable 和 Immutable memtable都位于内存, 它俩属于一样的数据结构,唯一的区别是Memtable可读可写,而Immutable memtable是只读的,不允许写入。RocksDB引入了Column family的概念,在一个Column family中只有一个Memtable,但允许存在多个Immutable memtable. 与LevelDB不同的是,RocksDB实现多个数据结构类型的Memtable,默认的是SkipList,即跳跃表。用户可以根据不同的需求,选择不同的Memtable实现,参考Wiki-Memtable

Memtable 与Immutable Memtable的转换

在一个column family中,维护一个Memtable数据结构mem_和一个Immutable Memtable的列表imm_:

  MemTable* mem_;
  MemTableList imm_;

当达到限定条件,需要将Memtable转为Immutable Memtable时:首先构造一个新的Memtable, 然后将旧的Memtable添加至imm_,增加新的Memtable的引用计数,最后将新的Memtable添加至column family:

  new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
  // ...
  cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
  new_mem->Ref();
  cfd->SetMemtable(new_mem);

Memtable的数据结构

我们以默认的SkipList来分析Memtable, 源码在db/memtable.cc,在一个Memtable中维护两个SkipList,其中范围删除插入range_del_table_,其余的操作写入table_。首先来看Memtable定义的操作接口: Add()Get()

Add()

bool MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key, /* user key */
                   const Slice& value, bool allow_concurrent,
                   MemTablePostProcessInfo* post_process_info) {
  // 一条key-value Entry的数据格式
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  uint32_t key_size = static_cast<uint32_t>(key.size());
  uint32_t val_size = static_cast<uint32_t>(value.size());
  uint32_t internal_key_size = key_size + 8;
  const uint32_t encoded_len = VarintLength(internal_key_size) +
                               internal_key_size + VarintLength(val_size) +
                               val_size;
  char* buf = nullptr;
  // 通过判断key-value的类型来选择memtable, 范围删除的kv插入range_del_table_
  std::unique_ptr<MemTableRep>& table =
      type == kTypeRangeDeletion ? range_del_table_ : table_;
  KeyHandle handle = table->Allocate(encoded_len, &buf);
  //...
  // 是否允许并发插入
  if (!allow_concurrent) {
    // 是否制定了函数提取key的前缀
    if (insert_with_hint_prefix_extractor_ != nullptr &&
        insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
      // ...
      bool res = table->InsertWithHint(handle, &insert_hints_[prefix]);
    } else {
      // 插入key-value pair
      bool res = table->Insert(handle);
    }
  } else {
    // 插入key-value pair
    bool res = table->InsertConcurrently(handle);
  }
  return true;
}

Add()函数将用户的key和value封装成一个buf,然后根据不同的条件调用table->Insert()插入至Memtable。table就是Memtable的工厂类实现,默认SkiplistRep, 即通过调用SkipListInsert()完成key的插入。

Get()

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
                   MergeContext* merge_context,
                   RangeDelAggregator* range_del_agg, SequenceNumber* seq,
                   const ReadOptions& read_opts, ReadCallback* callback,
                   bool* is_blob_index) {
  // 在range_del_table_上初始化一个迭代器
  std::unique_ptr<InternalIterator> range_del_iter(
      NewRangeTombstoneIterator(read_opts));
  Status status = range_del_agg->AddTombstones(std::move(range_del_iter));
  if (!status.ok()) {
    *s = status;
    return false;
  }

  Slice user_key = key.user_key();
  // 利用前缀提取过滤判断key是否存在
  bool const may_contain =
      nullptr == prefix_bloom_
          ? false
          : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
  if (prefix_bloom_ && !may_contain) {
    // 如果前缀过滤器通知这个key不存在
    PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
    *seq = kMaxSequenceNumber;
  } else {
    Saver saver;
    saver.status = s;
    saver.found_final_value = &found_final_value;
    saver.merge_in_progress = &merge_in_progress;
    saver.key = &key;
    saver.value = value;
    saver.seq = kMaxSequenceNumber;
    saver.mem = this;
    saver.merge_context = merge_context;
    saver.range_del_agg = range_del_agg;
    // ...
    // 进入默认的SkipList进行查找,利用SkipList的Iterator查找
    table_->Get(key, &saver, SaveValue);

    *seq = saver.seq;
  }
  // ...
  return found_final_value;
}

Memtable的Get()调用了SkipListRepGet()接口,通过SkipList的FindGreaterOrEqual()来查找。查找出来的key会被传入的回调函数SaveValue()根据type处理,例如kTypeDeletion就返回NotFound()

我们顺便来看一下SkipList的实现: memtable/skiplist.h

SkipList数据结构

class SkipList {
 private:
  struct Node;

 public:
 
  explicit SkipList(Comparator cmp, Allocator* allocator,
                    int32_t max_height = 12, int32_t branching_factor = 4);
  void Insert(const Key& key);
  bool Contains(const Key& key) const;
  uint64_t EstimateCount(const Key& key) const;

  // 用来迭代SkipList的迭代器
  class Iterator {
    // ...
  };

 private:
  // ...
  Comparator const compare_;
  Allocator* const allocator_;    // Allocator used for allocations of nodes
  Node* const head_;
  std::atomic<int> max_height_;  // Height of the entire list
};

SkipList本质上是一个带有分层的有序链表:

Node的数据结构

template<typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }

  Key const key;
  // ...

 private:
  // 存放该节点的next_节点的数组
  // 数组大小为该节点的height,当调用NewNode()分配内存初始化整个数组
  std::atomic<Node*> next_[1];
};

SkipList的插入

这个就是Memtable的Add()最后写入Skiplist调用的接口Insert()。插入的时候随机生成一个高度height,将构造的新的节点从第0层到height层分别插入。

template<typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
  // 这里对连续插入做了优化,prev_[0]记录了上次插入的Node,prev_height_记录了上次
  // 插入的最大层数。如果新的key大于prev_[0]的值并且小于prev_[0]的Next节点的值,即
  // prev_[0]就是新的key的前节点。
  if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
      (prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) {
	//...
	// 按照层级将每层的前节点补齐。
    for (int i = 1; i < prev_height_; i++) {
      prev_[i] = prev_[0];
    }
  } else {
    // 逐层查找该key待插入位置的前节点
    FindLessThan(key, prev_);
  }

  // skiplist不允许重复插入
  assert(prev_[0]->Next(0) == nullptr || !Equal(key, prev_[0]->Next(0)->key));

  int height = RandomHeight();
  // 如果当前节点的高度大于最高节点,则高出部分的的前节点都是头节点。
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev_[i] = head_;
    }
    // ...
  }

  Node* x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // 从0层开始插入,直到height层,原理就是简单的链表插入
    x->NoBarrier_SetNext(i, prev_[i]->NoBarrier_Next(i));
    prev_[i]->SetNext(i, x);
  }
  // 这里记录了插入的节点和height,为了连续插入做优化
  prev_[0] = x;
  prev_height_ = height;
}

SkipList的迭代

SkipList的Iterator迭代比较简单,直接从第0层的单链表根据next_[0]顺序迭代。