|
// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
// 如果关闭了Memtable选项, 只需要写入WAL文件
if (two_write_queues_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used, batch_cnt, pre_release_callback);
}
// 是否打开了pipelined写入方式
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
}
// 新建一个writer
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, batch_cnt, pre_release_callback);
// 将writer加入一个batch队列中
write_thread_.JoinBatchGroup(&w);
// 判断返回的状态是否为并行写memtable的leader
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// 应该写入memtable
if (w.ShouldWriteToMemtable()) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_memtable_time);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);
PERF_TIMER_START(write_pre_and_post_process_time);
}
// 检查memtable的写入任务是否全部已经完成,假如完成,最后一个writer以leader的角色退出,
// 并会选出下一个batch的leader
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
for (auto* writer : *(w.write_group)) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence,
disable_memtable);
if (!ws.ok()) {
status = ws;
break;
}
}
}
// 假如还有writer在进行写入,自己就以follwer的角色退出,仅设置自己的状态
auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence);
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupFollower(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
// STATE_COMPLETED conditional below handles exit
status = w.FinalStatus();
}
// 假如退出时的状态为STATE_COMPLETED, 则写入任务已经完成,则退出
if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
return w.FinalStatus();
}
// 下面处理状态为STATE_GROUP_LEADER,即作为整个batch的leader的情况
assert(w.state == WriteThread::STATE_GROUP_LEADER);
// leader选择其他writer加入整个batch中
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
if (status.ok()) {
// ...
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// 写入WAL文件
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
}
} else {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
seq_inc);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
if (!parallel) {
// 假如没有开启并行写memtable
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
} else {
// ...
// 唤醒其他的writer写入各自的memtable
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
// leader判断是否需要写入memtable, 如果需要,仅完成自己的memtable的写入
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
PERF_TIMER_START(write_pre_and_post_process_time);
// ...
// 假如开启了并行写,判断目前的memtable写入状况
bool should_exit_batch_group = true;
if (in_parallel_group) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
// 假如Leader作为整个batch的最后一个写入完成的writer, 则设置其他的writer状态并选出下一个batch的
// leader,退出
if (should_exit_batch_group) {
// ...
write_thread_.ExitAsBatchGroupLeader(write_group, status);
}
if (status.ok()) {
status = w.FinalStatus();
}
return status;
}
|