概述

版本:v5.15.10

RocksDB的写入操作是通过Put()接口完成的。在RocksDB中维护一个队列,但只有队头的Leader可以写入, 队头可以将自己和后面的Writer任务合并操作。

为了防止数据丢失,RocksDB设计首先写入WAL文件,再写入Memtable,在队头Leader完成WAL写入任务后,假如参数allow_concurrent_memtable_write被设置为true整个队列的数量大于1, RocksDB就会采取并行写Memtable的策略,即Leader唤醒后面的Writer任务,将它们的状态置为返回为STATE_PARALLEL_MEMTABLE_WRITER, 后面的Writer就会各自完成自己的Memtable写入任务。

假如RocksDB没有设置allow_concurrent_memtable_write参数, Leader会代为写入Memtable,完成之后唤醒其他Writer任务,将它们的返回状态设置为STATE_COMPLETED, 然后指定一个新的队头Leader,最后Writer各自返回写入的状态.

具体实现

写入的具体实现在db/db_impl_write.cc:WriteImpl(). RocksDB的Put()接口组装一个batch之后,调用Write()

// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
               const Slice& key, const Slice& value) {
  // 构造一个batch, 然后将key, value组合起来存放在batch中
  WriteBatch batch(key.size() + value.size() + 24);
  Status s = batch.Put(column_family, key, value);
  if (!s.ok()) {
    return s;
  }
  return Write(opt, &batch);
}

这里我们直接来看Write()的实现,即Write()调用WriteImpl():

// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         WriteBatch* my_batch, WriteCallback* callback,
                         uint64_t* log_used, uint64_t log_ref,
                         bool disable_memtable, uint64_t* seq_used,
                         size_t batch_cnt,
                         PreReleaseCallback* pre_release_callback) {
  // 如果关闭了Memtable选项, 只需要写入WAL文件
  if (two_write_queues_ && disable_memtable) {
    return WriteImplWALOnly(write_options, my_batch, callback, log_used,
                            log_ref, seq_used, batch_cnt, pre_release_callback);
  }

  // 是否打开了pipelined写入方式
  if (immutable_db_options_.enable_pipelined_write) {
    return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
                              log_ref, disable_memtable, seq_used);
  }

  // 新建一个writer
  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                        disable_memtable, batch_cnt, pre_release_callback);

  // 将writer加入一个batch队列中
  write_thread_.JoinBatchGroup(&w);
  // 判断返回的状态是否为并行写memtable的leader
  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    // 应该写入memtable
    if (w.ShouldWriteToMemtable()) {
      PERF_TIMER_STOP(write_pre_and_post_process_time);
      PERF_TIMER_GUARD(write_memtable_time);

      ColumnFamilyMemTablesImpl column_family_memtables(
          versions_->GetColumnFamilySet());
      w.status = WriteBatchInternal::InsertInto(
          &w, w.sequence, &column_family_memtables, &flush_scheduler_,
          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
          true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);

      PERF_TIMER_START(write_pre_and_post_process_time);
    }

    // 检查memtable的写入任务是否全部已经完成,假如完成,最后一个writer以leader的角色退出,
    // 并会选出下一个batch的leader
    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
      // we're responsible for exit batch group
      for (auto* writer : *(w.write_group)) {
        if (!writer->CallbackFailed() && writer->pre_release_callback) {
          assert(writer->sequence != kMaxSequenceNumber);
          Status ws = writer->pre_release_callback->Callback(writer->sequence,
                                                             disable_memtable);
          if (!ws.ok()) {
            status = ws;
            break;
          }
        }
      }
      // 假如还有writer在进行写入,自己就以follwer的角色退出,仅设置自己的状态
      auto last_sequence = w.write_group->last_sequence;
      versions_->SetLastSequence(last_sequence);
      MemTableInsertStatusCheck(w.status);
      write_thread_.ExitAsBatchGroupFollower(&w);
    }
    assert(w.state == WriteThread::STATE_COMPLETED);
    // STATE_COMPLETED conditional below handles exit

    status = w.FinalStatus();
  }
  // 假如退出时的状态为STATE_COMPLETED, 则写入任务已经完成,则退出
  if (w.state == WriteThread::STATE_COMPLETED) {
    if (log_used != nullptr) {
      *log_used = w.log_used;
    }
    if (seq_used != nullptr) {
      *seq_used = w.sequence;
    }
    // write is complete and leader has updated sequence
    return w.FinalStatus();
  }
  // 下面处理状态为STATE_GROUP_LEADER,即作为整个batch的leader的情况
  assert(w.state == WriteThread::STATE_GROUP_LEADER);

  // leader选择其他writer加入整个batch中
  last_batch_group_size_ =
      write_thread_.EnterAsBatchGroupLeader(&w, &write_group);

  if (status.ok()) {
    // ...
    if (!two_write_queues_) {
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
        // 写入WAL文件
        status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
                            need_log_dir_sync, last_sequence + 1);
      }
    } else {
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
        // LastAllocatedSequence is increased inside WriteToWAL under
        // wal_write_mutex_ to ensure ordered events in WAL
        status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
                                      seq_inc);
      } else {
        // Otherwise we inc seq number for memtable writes
        last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
      }
    }
    assert(last_sequence != kMaxSequenceNumber);
    const SequenceNumber current_sequence = last_sequence + 1;
    last_sequence += seq_inc;

    if (status.ok()) {
      PERF_TIMER_GUARD(write_memtable_time);

      if (!parallel) {
        // 假如没有开启并行写memtable
        w.status = WriteBatchInternal::InsertInto(
            write_group, current_sequence, column_family_memtables_.get(),
            &flush_scheduler_, write_options.ignore_missing_column_families,
            0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
      } else {
        // ...
        // 唤醒其他的writer写入各自的memtable
        write_thread_.LaunchParallelMemTableWriters(&write_group);
        in_parallel_group = true;

        // leader判断是否需要写入memtable, 如果需要,仅完成自己的memtable的写入
        if (w.ShouldWriteToMemtable()) {
          ColumnFamilyMemTablesImpl column_family_memtables(
              versions_->GetColumnFamilySet());
          assert(w.sequence == current_sequence);
          w.status = WriteBatchInternal::InsertInto(
              &w, w.sequence, &column_family_memtables, &flush_scheduler_,
              write_options.ignore_missing_column_families, 0 /*log_number*/,
              this, true /*concurrent_memtable_writes*/, seq_per_batch_,
              w.batch_cnt);
        }
      }
      if (seq_used != nullptr) {
        *seq_used = w.sequence;
      }
    }
  }
  PERF_TIMER_START(write_pre_and_post_process_time);

  // ...
  // 假如开启了并行写,判断目前的memtable写入状况
  bool should_exit_batch_group = true;
  if (in_parallel_group) {
    // CompleteParallelWorker returns true if this thread should
    // handle exit, false means somebody else did
    should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
  }
  // 假如Leader作为整个batch的最后一个写入完成的writer, 则设置其他的writer状态并选出下一个batch的
  // leader,退出
  if (should_exit_batch_group) {
    // ...
    write_thread_.ExitAsBatchGroupLeader(write_group, status);
  }

  if (status.ok()) {
    status = w.FinalStatus();
  }
  return status;
}

RocksDB为了防止数据丢失先写入WAL文件,再写入Memtable,我们来看一下WAL日志文件的写入实现WriteToWAL函数:

Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
                          log::Writer* log_writer, uint64_t* log_used,
                          bool need_log_sync, bool need_log_dir_sync,
                          SequenceNumber sequence) {
  // ...
  // 合并多个Batch
  WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
                                        &write_with_wal, &to_be_cached_state);
  // 写入WAL文件
  status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
  // 如果设置了need_log_sync, 即WriteOptions::sync == true, 每条Log都会被刷盘
  if (status.ok() && need_log_sync) {
    // 将每条Log进行刷盘
    for (auto& log : logs_) {
      status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
      if (!status.ok()) {
        break;
      }
    }
	if (status.ok() && need_log_dir_sync) {
      status = directories_.GetWalDir()->Fsync();
    }
  }
  // ...
  return status;
  }

写入WAL文件成功后,接下来写入Memtable,我们简单的来看一下单个Writer的Memtable写入实现InsertInto函数:

Status WriteBatchInternal::InsertInto(
    const WriteBatch* batch, ColumnFamilyMemTables* memtables,
    FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
    uint64_t log_number, DB* db, bool concurrent_memtable_writes,
    SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch) {
  // 构造一个Memtable的Inserter
  MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
                            ignore_missing_column_families, log_number, db,
                            concurrent_memtable_writes, has_valid_writes,
                            seq_per_batch);
  // 对Memtable进行写入操作
  Status s = batch->Iterate(&inserter);
  // ...
  return s;
}

batch->Iterate(&inserter)就是对Memtable的具体写入操作:

Status WriteBatch::Iterate(Handler* handler) const {
  // ...
  while ((s.ok() || UNLIKELY(s.IsTryAgain())) && !input.empty() &&
         handler->Continue()) {
    if (LIKELY(!s.IsTryAgain())) {
      tag = 0;
      column_family = 0;  // default
      // 从batch中读取数据
      s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
                                   &blob, &xid);
      if (!s.ok()) {
        return s;
      }
    } else {
      assert(s.IsTryAgain());
      s = Status::OK();
    }
	
    // 根据操作类型进行不同的写入
    switch (tag) {
      case kTypeColumnFamilyValue:
      case kTypeValue:
        // ...
        s = handler->PutCF(column_family, key, value);
        // ...
        break;
      case kTypeColumnFamilyDeletion:
      case kTypeDeletion:
        // ...
        s = handler->DeleteCF(column_family, key);
        // ...
  }
  // ...
}

所有的handle操作均在db/write_batch.cc,我们挑选其中的Delete操作来看看具体实现细节, DeleteCF()的具体实现函数是DeleteImpl()

Status DeleteImpl(uint32_t column_family_id, const Slice& key,
                    const Slice& value, ValueType delete_type) {
    // ...
    // 获取Column Family的Memtable
    MemTable* mem = cf_mems_->GetMemTable();
    // 调用Memtable的Add()对Memtable进行写入,注意传入的ValueType参数为delete_type
    bool mem_res =
        mem->Add(sequence_, delete_type, key, value,
                 concurrent_memtable_writes_, get_post_process_info(mem));
    // ...
  }

对Memtable的写入直接调用Add()接口,具体细节RocksDB中的Memtable已详细介绍。