-- 查询 SELECT p.category, SUM(s.amount) AS total FROM sales s JOIN products p ON s.product_id = p.id WHERE p.category IN ('A','B') AND s.sale_date BETWEENDATE'2024-01-01'ANDDATE'2024-12-31' GROUPBY p.category ORDERBY total DESC LIMIT 10;
// on the RHS (build side), we construct a child MetaPipeline with this operator as its sink auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op, MetaPipelineType::JOIN_BUILD);
if (op.children[1].get().CanSaturateThreads(current.GetClientContext())) { // if the build side can saturate all available threads, // we don't just make the LHS pipeline depend on the RHS, but recursively all LHS children too. // this prevents breadth-first plan evaluation
// continue building the current pipeline on the LHS (probe side) /* 继续当前 MetaPipeline, 构建左子树 (Probe 侧) 的 pipelines. */ op.children[0].get().BuildPipelines(current, meta_pipeline);
if (last_child_ptr) { // the pointer was set, set up the dependencies
/* 特殊 Join 类型处理. */ switch (op.type) { case PhysicalOperatorType::POSITIONAL_JOIN: // Positional joins are always outer meta_pipeline.CreateChildPipeline(current, op, last_pipeline); return; case PhysicalOperatorType::CROSS_PRODUCT: return; default: break; }
// Join can become a source operator if it's RIGHT/OUTER, or if the hash join goes out-of-core /* 某些 Join 操作会主动产生额外的输出数据 (Source角色), 所以需要创建一个额外的 Pipeline 来处理这些输出. */ if (op.Cast<PhysicalJoin>().IsSource()) { meta_pipeline.CreateChildPipeline(current, op, last_pipeline); } }
MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline ¤t, PhysicalOperator &op, MetaPipelineType type){ children.push_back(make_shared_ptr<MetaPipeline>(executor, state, &op, type)); auto &child_meta_pipeline = *children.back().get(); // store the parent child_meta_pipeline.parent = ¤t; // child MetaPipeline must finish completely before this MetaPipeline can start /* 当前的 pipeline 依赖子 MetaPipeline 的 base pipeline 完成. */ current.AddDependency(child_meta_pipeline.GetBasePipeline()); // child meta pipeline is part of the recursive CTE too child_meta_pipeline.recursive_cte = recursive_cte; return child_meta_pipeline; }
// set root pipelines, i.e., all pipelines that end in the final sink root_pipeline->GetPipelines(root_pipelines, false); root_pipeline_idx = 0;
// collect all meta-pipelines from the root pipeline /* 收集所有待调度的 MetaPipeline (递归搜索). */ vector<shared_ptr<MetaPipeline>> to_schedule; root_pipeline->GetMetaPipelines(to_schedule, true, true);
// number of 'PipelineCompleteEvent's is equal to the number of meta pipelines, so we have to set it here total_pipelines = to_schedule.size();
// collect all pipelines from the root pipelines (recursively) for the progress bar and verify them /* 收集所有待调度的 pipelines (递归搜索). */ root_pipeline->GetPipelines(pipelines, true);
voidTaskScheduler::ExecuteForever(atomic<bool> *marker){ #ifndef DUCKDB_NO_THREADS staticconstexprconstint64_t INITIAL_FLUSH_WAIT = 500000; // initial wait time of 0.5s (in mus) before flushing
auto &config = DBConfig::GetConfig(db); shared_ptr<Task> task;
/* 线程主循环. */ while (*marker) { /* ... */ if (queue->Dequeue(task)) { /* 从无锁任务队列中获取任务. */ auto process_mode = config.options.scheduler_process_partial ? TaskExecutionMode::PROCESS_PARTIAL : TaskExecutionMode::PROCESS_ALL; /* 执行任务. */ auto execute_result = task->Execute(process_mode);
/* 根据执行结果处理任务. */ switch (execute_result) { case TaskExecutionResult::TASK_FINISHED: case TaskExecutionResult::TASK_ERROR: task.reset(); /* 任务完成, 释放资源. */ break; case TaskExecutionResult::TASK_NOT_FINISHED: { /* 任务未完成, 重新入队. */ auto &token = *task->token; queue->Enqueue(token, std::move(task)); break; } case TaskExecutionResult::TASK_BLOCKED: /* 任务被阻塞, 取消调度. */ task->Deschedule(); task.reset(); break; } } elseif (queue->GetTasksInQueue() > 0) { // failed to dequeue but there are still tasks remaining - signal again to retry queue->semaphore.signal(1); } }
if (exhausted_source && done_flushing && !remaining_sink_chunk && !next_batch_blocked && in_process_operators.empty()) { /* 所有条件都满足, 退出循环. */ break; } elseif (remaining_sink_chunk) { // The pipeline was interrupted by the Sink. We should retry sinking the final chunk. /* Sink 操作符之前是被阻塞的, 重新尝试将最终数据块推送到 Sink. */ result = ExecutePushInternal(final_chunk, chunk_budget); D_ASSERT(result != OperatorResultType::HAVE_MORE_OUTPUT); remaining_sink_chunk = false; } elseif (!in_process_operators.empty() && !started_flushing) { // Operator(s) in the pipeline have returned `HAVE_MORE_OUTPUT` in the last Execute call // the operators have to be called with the same input chunk to produce the rest of the output
/* PhysicalOperator 在上次执行中返回了 HAVE_MORE_OUTPUT, 需要再次调用这个操作符产生剩下的输出结果. */ D_ASSERT(source_chunk.size() > 0); result = ExecutePushInternal(source_chunk, chunk_budget); } elseif (exhausted_source && !next_batch_blocked && !done_flushing) { // The source was exhausted, try flushing all operators /* 数据源已消费完, 尝试刷新支持缓存的操作符. */ auto flush_completed = TryFlushCachingOperators(chunk_budget); if (flush_completed) { done_flushing = true; break; } else { if (remaining_sink_chunk) { return PipelineExecuteResult::INTERRUPTED; } else { D_ASSERT(chunk_budget.IsDepleted()); return PipelineExecuteResult::NOT_FINISHED; } } } elseif (!exhausted_source || next_batch_blocked) { /* 通过 Pipeline 的 Source PhysicalOperator 获取新数据并处理. */ SourceResultType source_result; if (!next_batch_blocked) { // "Regular" path: fetch a chunk from the source and push it through the pipeline source_chunk.Reset(); source_result = FetchFromSource(source_chunk);