引言

DuckDB 的执行器采用了 Push-based Vectorized Execution Model, 与传统的 Pull-based 模型相比, Push 模型在并行化,内存效率都有着显著优势.

传统的执行模型火山模型 (Volcano Pull Model) 存在缓存局部性差、并行度低的问题,其中最为关键的是算子内部缺乏水平并行能力;若想让多个线程同时处理同一算子的不同数据分片,需要借助 EXCHANGE 等算子复制工作管道。至于垂直并行,Volcano 模型本身就采用“上游拉取下游”方式串接算子,这一层面并不是瓶颈。

这在目前的单机多核的趋势下存在很大的性能问题, 关于 Pull-based vs Push-based, Volcano vs Pipeline 网上有大量的资料讨论过这个问题:

DuckDB 的整个执行器都围绕着 Pipeline 的模型来设计, Pipeline 提供了结构化的并行理念, 将复杂的算子抽象成一个 DAG 有向无环图, 在整个 DAG 任务流中根据依赖关系可以将复杂的查询拆分为多个 Pipeline, 互不依赖的 Pipeline 可以并行执行.

DuckDB 采用 Morsel-Driven Parallelism 并行模型, 将数据划分为小的 “morsels”, 然后动态地将这些 morsels 分配给工作线程, 实现自适应的任务调度和更好的负载均衡. 这种机制解决了传统并行执行中的负载均衡问题, 特别是在处理数据倾斜的情况下能够动态调整任务分配.

执行器

duckdb-executor_1.png

在之前的文章DuckDB 源码分析 - Logical Plan 逻辑计划我们介绍了 DuckDB 的逻辑计划, 逻辑计划转换为物理计划后经过执行器被转换为高效的并行执行 Pipeline.

执行器核心抽象与数据结构

逻辑计划构建完成以后进行物理计划的构建, 然后我们根据物理计划开始构建整个执行任务流. 物理计划生成的核心组件包括物理计划生成器 (PhysicalPlanGenerator), 物理操作符 (PhysicalOperator) 和物理计划 (PhysicalPlan). 这些组件协同工作, 将逻辑计划转换为可执行的物理计划.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* src/main/client_context.cpp */

shared_ptr<PreparedStatementData>
ClientContext::CreatePreparedStatementInternal(ClientContextLock &lock, const string &query,
unique_ptr<SQLStatement> statement,
optional_ptr<case_insensitive_map_t<BoundParameterData>> values) {
/* ... */

if (config.enable_optimizer && logical_plan->RequireOptimizer()) {
profiler.StartPhase(MetricsType::ALL_OPTIMIZERS);

/* 进行逻辑计划的优化. */
Optimizer optimizer(*logical_planner.binder, *this);
logical_plan = optimizer.Optimize(std::move(logical_plan));

D_ASSERT(logical_plan);
profiler.EndPhase();

#ifdef DEBUG
logical_plan->Verify(*this);
#endif
}

/* 将逻辑计划转为物理计划. */
profiler.StartPhase(MetricsType::PHYSICAL_PLANNER);
PhysicalPlanGenerator physical_planner(*this);
result->physical_plan = physical_planner.Plan(std::move(logical_plan));
profiler.EndPhase();
D_ASSERT(result->physical_plan);
return result;
}

duckdb-executor_2.png

Pipeline 流水线机制

基本概念

  • 执行模型: DuckDB 采用向量化 (DataChunk 为单位)的流水线执行. 每条 Pipeline 由 “一个 Source + 若干个中间 PhysicalOperator + 一个 Sink” 构成.

  • 数据流向: Source 产出数据块 -> 中间算子按批变换 -> Sink 消费并汇总/产出最终结果或中间结果 (如哈希表).

  • 调度体系: Pipeline 被拆分为可并行的任务, 由任务调度器在多线程环境下执行; PipelineExecutor 负责在本地线程上驱动这条管道.

  • MetaPipeline

    MetaPipeline 是 “pipeline 的编排组合”, 定义 pipeline 的元数据,参数,依赖关系. 围绕同一个 Sink, 把多个以不同 Source 开始但共享同一 Sink 的 Pipeline 组织成一组, 并在组内与组间管理依赖与完成事件. MetaPipeline 还负责管理 Pipeline 之间的执行顺序和复杂依赖关系, 特别是在处理 Join 策略时协调 Build 侧和 Probe 侧的执行.

  • Pipeline

    一连串的物理计划操作符被切割成可并行,可调度的执行单元, 这个执行单元就是 pipeline. 明确一条从 Source 到 Sink 的线性数据流, 中间是一串可融合的算子.

一般情况下,MetaPipeline 围绕同一个 Sink 组织执行,通常只有一个基础 Pipeline (base pipeline)。多数算子(Projection/Filter/TableScan 等中间或 Source 算子)不会在该 MetaPipeline 下再生成额外 Pipeline。
但是也有例外,比如 Join 算子。

PhysicalOperator: 物理算子

Source: 源端算子, 作为整个 pipeline 的起点, 每个算子具体是 Source 算子还是 Sink 算子是初始化定义的, 比如 PhysicalTableScan 就是 Source 算子.

Sink: Pipeline Breaker 的算子.

有的算子即是 Source 算子, 也是 Sink 算子. 这类算子被称为 Pipeline Breaker, 它们会将一个 Pipeline 分割成多个独立的 Pipeline. 典型的 Pipeline Breaker 包括 Hash Join,Aggregation 等需要分阶段执行的算子.

Pipeline 构建

调度过程由 Pipeline 和 PipelineExecutor 管理. 一个查询可能被划分为多个相互依赖的 Pipeline. PipelineExecutor 负责协调这些流水线的执行.

在 DuckDB 中,每一个查询语句都会被包装成以 PhysicalResultCollector 作为 root (顶层 Sink) 的物理计划,PhysicalResultCollector::GetResultCollector 会选择具体的 Collector 变体并作为 root operator 交给执行器初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void Executor::InitializeInternal(PhysicalOperator &plan) {

/* 从当前 ClientContext 获取全局的任务调度器. */
auto &scheduler = TaskScheduler::GetScheduler(context);

{
lock_guard<mutex> elock(executor_lock);
physical_plan = &plan;

this->profiler = ClientData::Get(context).profiler;
profiler->Initialize(plan);
this->producer = scheduler.CreateProducer();

/* 构建所有的 pipelines. */
PipelineBuildState state;

/* 从 root 的 MetaPipeline 开始. */
auto root_pipeline = make_shared_ptr<MetaPipeline>(*this, state, nullptr);

/* 调用 Build 从物理计划根节点递归构建所有相关的 Pipeline, 并按 Pipeline 构建原则建立 Pipeline Breaker 与依赖. */
root_pipeline->Build(*physical_plan);

/* 调用 Ready 为所有 pipelines 生成并准备事件序列 (initialize -> event -> prepare finish -> finish -> complete),
* 使它们可被调度执行. */
root_pipeline->Ready();

/* ... */

// set root pipelines, i.e., all pipelines that end in the final sink
root_pipeline->GetPipelines(root_pipelines, false);
root_pipeline_idx = 0;

// collect all meta-pipelines from the root pipeline
/* 收集所有待调度的 MetaPipeline (递归搜索). */
vector<shared_ptr<MetaPipeline>> to_schedule;
root_pipeline->GetMetaPipelines(to_schedule, true, true);

// number of 'PipelineCompleteEvent's is equal to the number of meta pipelines, so we have to set it here
total_pipelines = to_schedule.size();

// collect all pipelines from the root pipelines (recursively) for the progress bar and verify them
root_pipeline->GetPipelines(pipelines, true);

// finally, verify and schedule
VerifyPipelines();
ScheduleEvents(to_schedule);
}
}

我们构建一个样例数据, 使用 EXPLAIN 来理解物理计划如何转换成 pipelines 集合的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 建表与数据
CREATE TABLE products(id INTEGER, category VARCHAR);
INSERT INTO products VALUES (1, 'A'), (2, 'B'), (3, 'C');
CREATE TABLE sales(product_id INTEGER, amount DOUBLE, sale_date DATE);
INSERT INTO sales VALUES
(1, 10.0, DATE '2024-01-10'),
(1, 20.0, DATE '2023-12-31'),
(2, 30.0, DATE '2024-06-15'),
(3, 40.0, DATE '2024-07-01'),
(2, 5.0, DATE '2025-01-01');

-- 查询
SELECT p.category, SUM(s.amount) AS total
FROM sales s
JOIN products p ON s.product_id = p.id
WHERE p.category IN ('A','B')
AND s.sale_date BETWEEN DATE '2024-01-01' AND DATE '2024-12-31'
GROUP BY p.category
ORDER BY total DESC
LIMIT 10;

下图是 PhysicalOperator 描述的物理计划:

DuckDB物理计划图
  1. 入口: PhysicalResultCollector

    PhysicalResultCollector 是 Sink 操作符, 负责收集查询结果.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void PhysicalResultCollector::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
/* 1. 重置为 sink 状态. */
sink_state.reset();

/* 2. ResultCollector 没有直接子节点, 通过 plan 字段引用(PhysicalOperator &plan). */
D_ASSERT(children.empty());

/* 3. 将自己设置为当前 pipeline 的 Sink. */
auto &state = meta_pipeline.GetState();
state.SetPipelineSink(current, *this);

/* 4. 创建子 MetaPipeline 来构建实际的查询计划. */
auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, *this);
child_meta_pipeline.Build(plan); /* plan = PhysicalExplainAnalyze. */
}
  1. PhysicalExplainAnalyze 操作符

    PhysicalExplainAnalyze 使用默认的 BuildPipelines() 逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void PhysicalOperator::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
op_state.reset();

auto &state = meta_pipeline.GetState();

/* ... */

if (IsSink()) {
/* 1. PhysicalExplainAnalyze 既是 Sink 操作符也是 Source 操作符. */
sink_state.reset();

/* 2. 将自己设置为当前 Pipeline 的 Source. */
state.SetPipelineSource(current, *this);

/* 3. 创建一个新的 MetaPipeline 来递归子操作符. */
auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, *this);
child_meta_pipeline.Build(children[0].get());

return;
}

/* ... */
}
  1. PhysicalTopN 操作符

    PhysicalTopN 既是 Source 又是 Sink,也使用默认的 BuildPipelines() 逻辑:

  2. PhysicalProjection 操作符

    PhysicalProjection 既不是 Sink 也不是 Source,但是也使用默认的 BuildPipelines() 逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
void PhysicalOperator::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
op_state.reset();

/* Projection 不是Sink, 不是Source, 有子节点. */
auto &state = meta_pipeline.GetState();

/* ... */

/* 添加到当前 pipeline ( pipeline.operators.push_back(op)) 并递归. */
state.AddPipelineOperator(current, *this);
children[0].get().BuildPipelines(current, meta_pipeline); /* 调用 PhysicalPerfectHashGroupBy 继续构建 pipeline. */
}
  1. PhysicalPerfectHashAggregate 操作符

    PhysicalPerfectHashAggregate 既是 Sink 也是 Source,使用默认的 BuildPipelines() 逻辑:

从 PhysicalPerfectHashAggregate 下来有连续两个 PhysicalProjection 操作符, 然后来到了 Hash Join 操作符.

  1. PhysicalHashJoin 操作符

    PhysicalHashJoin 是一个比较复杂的操作符, 它会创建 Build 侧和 Probe 侧两个不同的 Pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//===--------------------------------------------------------------------===//
// Pipeline Construction
//===--------------------------------------------------------------------===//
void PhysicalJoin::BuildJoinPipelines(Pipeline &current, MetaPipeline &meta_pipeline, PhysicalOperator &op,
bool build_rhs) {
op.op_state.reset();
op.sink_state.reset();

/* 把 Join 操作符加入当前 pipeline. */
auto &state = meta_pipeline.GetState();
state.AddPipelineOperator(current, op);

vector<shared_ptr<Pipeline>> pipelines_so_far;
meta_pipeline.GetPipelines(pipelines_so_far, false);
auto &last_pipeline = *pipelines_so_far.back();

vector<shared_ptr<Pipeline>> dependencies;
optional_ptr<MetaPipeline> last_child_ptr;
if (build_rhs) {
/* 构建右子树 (Build 侧) 的 MetaPipeline. */

// on the RHS (build side), we construct a child MetaPipeline with this operator as its sink
auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op, MetaPipelineType::JOIN_BUILD);

/* 根据 PhysicalOperator 递归构建右边的 pipelines. */
child_meta_pipeline.Build(op.children[1]);

if (op.children[1].get().CanSaturateThreads(current.GetClientContext())) {
// if the build side can saturate all available threads,
// we don't just make the LHS pipeline depend on the RHS, but recursively all LHS children too.
// this prevents breadth-first plan evaluation

/* 这个 if 判断是一个性能优化和执行顺序控制的关键逻辑.
* 如果 Build 阶段能占满所有可用线程, 要防止广度优先(breadth-first)的策略被先行执行,
* 也就是左子树所有的 pipelines 都要依赖右子树, 让 Build 阶段先完成再来调度左边的 Probe 阶段. */
child_meta_pipeline.GetPipelines(dependencies, false);
last_child_ptr = meta_pipeline.GetLastChild();
}
}

// continue building the current pipeline on the LHS (probe side)
/* 继续当前 MetaPipeline, 构建左子树 (Probe 侧) 的 pipelines. */
op.children[0].get().BuildPipelines(current, meta_pipeline);

if (last_child_ptr) {
// the pointer was set, set up the dependencies

/* 让左子树中所有后续创建的子 MetaPipeline 的所有 Pipeline, 都依赖于右表 Build 侧的所有 Pipeline. */
meta_pipeline.AddRecursiveDependencies(dependencies, *last_child_ptr);
}

/* 特殊 Join 类型处理. */
switch (op.type) {
case PhysicalOperatorType::POSITIONAL_JOIN:
// Positional joins are always outer
meta_pipeline.CreateChildPipeline(current, op, last_pipeline);
return;
case PhysicalOperatorType::CROSS_PRODUCT:
return;
default:
break;
}

// Join can become a source operator if it's RIGHT/OUTER, or if the hash join goes out-of-core
/* 某些 Join 操作会主动产生额外的输出数据 (Source角色), 所以需要创建一个额外的 Pipeline 来处理这些输出. */
if (op.Cast<PhysicalJoin>().IsSource()) {
meta_pipeline.CreateChildPipeline(current, op, last_pipeline);
}
}

PhysicalHashJoin 算子会创建 Build 阶段和 Probe 阶段两个不同的 Pipeline:

  1. Build 阶段(右子树): 负责构建哈希表
  2. Probe 阶段(左子树): 负责探测哈希表

Build 阶段必须在 Probe 阶段之前完成, 两个阶段之间有明确的依赖关系. 这种设计确保了在探测哈希表之前, 哈希表已经完全构建完成.

  1. PhysicalTableScan 操作符

Join 操作符经过几个投影算子 + PhysicalFilter 算子以后就来到了 PhysicalTableScan 操作符, PhysicalTableScan 只是一个 Source 算子, 使用默认的 BuildPipelines() 逻辑:

DuckDB执行计划图4

这条 SQL 在当前物理计划下有 4 个 MetaPipelines:

1: ExplainAnalyze (顶层 Sink+Source, 用于统计与包装输出).
2: TopN (最终排序与限流的 Sink + Source, 承接聚合结果).
3: PerfectHashGroupBy (聚合 Sink, 承接 Join 后的数据).
4: HashJoin.

MetaPipelines 之间是存在依赖关系,依赖关系和构建顺序是相反的,父 MetaPipeline 依赖子 MetaPipeline,依赖关系的构建时机是在父 MetaPipeline 构建子 MetaPipeline 时确立:

1
2
3
4
5
6
7
8
9
10
11
12
MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline &current, PhysicalOperator &op, MetaPipelineType type) {
children.push_back(make_shared_ptr<MetaPipeline>(executor, state, &op, type));
auto &child_meta_pipeline = *children.back().get();
// store the parent
child_meta_pipeline.parent = &current;
// child MetaPipeline must finish completely before this MetaPipeline can start
/* 当前的 pipeline 依赖子 MetaPipeline 的 base pipeline 完成. */
current.AddDependency(child_meta_pipeline.GetBasePipeline());
// child meta pipeline is part of the recursive CTE too
child_meta_pipeline.recursive_cte = recursive_cte;
return child_meta_pipeline;
}

Pipeline 任务调度和执行

从 PhysicalTableScan 开始, Source 算子每次产出一批数据, 单位就是 DataChunk, 中间算子按批消费/产生 DataChunk. Sink 算子消费 DataChunk, 如果它同时充当 Source 时, 再以 DataChunk 形式向下游提供结果.

在构建完成 Pipelines 流以后, DuckDB 开始进入 Pipelines 的执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void Executor::InitializeInternal(PhysicalOperator &plan) {

/* 从当前 ClientContext 获取全局的任务调度器. */
auto &scheduler = TaskScheduler::GetScheduler(context);

{
lock_guard<mutex> elock(executor_lock);
physical_plan = &plan;

this->profiler = ClientData::Get(context).profiler;
profiler->Initialize(plan);
this->producer = scheduler.CreateProducer();

/* 构建所有的 pipelines. */
PipelineBuildState state;

/* 从 root 的 MetaPipeline 开始. */
auto root_pipeline = make_shared_ptr<MetaPipeline>(*this, state, nullptr);

/* 调用 Build 从物理计划根节点递归构建所有相关的 Pipeline,并按 Pipeline 构建原则建立 Pipeline Breaker 与依赖关系. */
root_pipeline->Build(*physical_plan);

/* 整个 MetaPipeline 树上的所有 Pipeline 切换到 "可执行态", 并将每条 Pipeline 的算子链调整为执行所需的顺序. */
root_pipeline->Ready();

/* ... */

// set root pipelines, i.e., all pipelines that end in the final sink
root_pipeline->GetPipelines(root_pipelines, false);
root_pipeline_idx = 0;

// collect all meta-pipelines from the root pipeline
/* 收集所有待调度的 MetaPipeline (递归搜索). */
vector<shared_ptr<MetaPipeline>> to_schedule;
root_pipeline->GetMetaPipelines(to_schedule, true, true);

// number of 'PipelineCompleteEvent's is equal to the number of meta pipelines, so we have to set it here
total_pipelines = to_schedule.size();

// collect all pipelines from the root pipelines (recursively) for the progress bar and verify them
/* 收集所有待调度的 pipelines (递归搜索). */
root_pipeline->GetPipelines(pipelines, true);

// finally, verify and schedule
VerifyPipelines();
/* 为所有 MetaPipeline 及其内的 Pipeline 构建 "事件调度栈" (Event Stack). */
ScheduleEvents(to_schedule);
}
}

DuckDB 采用一个事件驱动的模型来完成 Pipeline 的执行, Pipeline Event 和 PipelineTask 是两个核心组件, 它们协同工作来实现高效的查询执行.

一个 Pipeline 对应多个 Event:

  • PipelineInitializeEvent: 负责初始化 Pipeline
  • PipelineEvent: 负责调度 Pipeline 的主要执行任务
  • PipelinePrepareFinishEvent: 负责准备完成阶段
  • PipelineFinishEvent: 负责完成阶段
  • PipelineCompleteEvent: 负责整个 Pipeline 的完成

ScheduleEvents() 是将构建完成的 Pipelines 转换为事件驱动执行的核心函数, 每条 Pipeline 都会生成一个事件栈 (关于 PipelineEvent 的一个组合), 依赖关系是:

1
2
3
4
5
// dependencies: initialize -> event -> prepare finish -> finish -> complete
base_stack.pipeline_event.AddDependency(base_stack.pipeline_initialize_event);
base_stack.pipeline_prepare_finish_event.AddDependency(base_stack.pipeline_event);
base_stack.pipeline_finish_event.AddDependency(base_stack.pipeline_prepare_finish_event);
base_stack.pipeline_complete_event.AddDependency(base_stack.pipeline_finish_event);

每个 PipelineEvent 通过以下流程创建 PipelineTask:

  1. 调用 PipelineEvent::Schedule() 方法
  2. 调用 Pipeline::Schedule() 方法
  3. 判断是否可以并行执行:
    • 如果可以并行, 则调用 LaunchScanTasks() 创建多个 PipelineTask
    • 如果不能并行, 则调用 ScheduleSequentialTask() 创建单个 PipelineTask
  4. 通过 event->SetTasks() 将创建的PipelineTask提交给TaskScheduler

因为有一部分算子其实是可以并行执行的, 所以 Pipeline 和 PipelineTask 并不是完全一一对应的关系, 一个 Pipeline 可以根据估算拆分多个 PipelineTask 并行执行:

比如 PhysicalTableScan 算子所在的 Pipeline:

1
2
3
4
5
6
7
8
9
10
Pipeline (扫描大表)
├── Source: PhysicalTableScan (估算4个并行分片)
├── Operators: [...]
└── Sink: [...]

执行时分解为:
PipelineTask-1 (处理分片1)
PipelineTask-2 (处理分片2)
PipelineTask-3 (处理分片3)
PipelineTask-4 (处理分片4)
  • 一个 Pipeline 可以对应多个 PipelineTask.
  • 共享内容: PipelineTask 共享同一条 Pipeline 的算子链和必要的全局状态,但每个任务都会维护独立的本地 Source/Operator/Sink 状态。

PipelineTask 的执行:

  1. PipelineTask 继承自 ExecutorTask, 负责实际执行 Pipeline 工作
  2. 创建PipelineExecutor: 每个 PipelineTask 会创建一个 PipelineExecutor 实例
  3. 执行Pipeline: 调用 PipelineExecutor 执行具体的查询逻辑
  4. 执行完成后调用 event->FinishTask() 通知 PipelineEvent

Pipeline 并行执行

我们已经探讨了 Pipeline 的整个生成流程,了解了 Pipeline 如何被分解为多个 PipelineTask,并通过 PipelineEvent 进行调度管理。这些 PipelineTask 最终由 DuckDB 的 TaskScheduler 来真正执行。

DuckDB 在启动时创建一个全局的 TaskScheduler, 创建的 Worker 数量是参数决定的:

1
2
scheduler->SetThreads(config.options.maximum_threads, config.options.external_threads);
scheduler->RelaunchThreads();

duckdb-executor_5.png

每个 Worker 线程调度函数TaskScheduler::ExecuteForever()处理全局任务队列中的任务, 任务队列是一个 multi-producer, multi-consumer 线程安全的无锁队列.
需要注意的是使用一个 multi-producer, multi-consumer 线程安全的无锁队列, 多个工作线程可以从同一个队列中获取任务, 所以这里隐含了负载均衡.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
#ifndef DUCKDB_NO_THREADS
static constexpr const int64_t INITIAL_FLUSH_WAIT = 500000; // initial wait time of 0.5s (in mus) before flushing

auto &config = DBConfig::GetConfig(db);
shared_ptr<Task> task;

/* 线程主循环. */
while (*marker) {
/* ... */
if (queue->Dequeue(task)) {
/* 从无锁任务队列中获取任务. */
auto process_mode = config.options.scheduler_process_partial ? TaskExecutionMode::PROCESS_PARTIAL
: TaskExecutionMode::PROCESS_ALL;
/* 执行任务. */
auto execute_result = task->Execute(process_mode);

/* 根据执行结果处理任务. */
switch (execute_result) {
case TaskExecutionResult::TASK_FINISHED:
case TaskExecutionResult::TASK_ERROR:
task.reset(); /* 任务完成, 释放资源. */
break;
case TaskExecutionResult::TASK_NOT_FINISHED: {
/* 任务未完成, 重新入队. */
auto &token = *task->token;
queue->Enqueue(token, std::move(task));
break;
}
case TaskExecutionResult::TASK_BLOCKED:
/* 任务被阻塞, 取消调度. */
task->Deschedule();
task.reset();
break;
}
} else if (queue->GetTasksInQueue() > 0) {
// failed to dequeue but there are still tasks remaining - signal again to retry
queue->semaphore.signal(1);
}
}

/* ... */
}

Pipeline 执行的核心函数是PipelineExecutor::Execute(), 负责驱动数据从源到终点的完整流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
/* 确保当前 Pipeline 具有 Sink (数据接收端). */
D_ASSERT(pipeline.sink);

/* 获取源数据, 如果这个 Pipeline 没有中间操作符, 直接使用 final_chunk,
* 否则使用第一个中间数据块 intermediate_chunks[0]. */
auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0];

/* ExecutionBudget 用来限制单次调用 Execute() 最多处理多少个 Chunk 的数据, 超过以后会结束这轮任务. */
ExecutionBudget chunk_budget(max_chunks);

/* 主执行循环. */
do {

/* 检查客户端是否已中断. */
if (context.client.interrupted) {
throw InterruptException();
}

/* 根据不同状态选择执行路径, 这些状态符 (exhausted_source, done_flushing..) 初始化都是 false. */
OperatorResultType result;

if (exhausted_source && done_flushing && !remaining_sink_chunk && !next_batch_blocked &&
in_process_operators.empty()) {
/* 所有条件都满足, 退出循环. */
break;
} else if (remaining_sink_chunk) {
// The pipeline was interrupted by the Sink. We should retry sinking the final chunk.
/* Sink 操作符之前是被阻塞的, 重新尝试将最终数据块推送到 Sink. */
result = ExecutePushInternal(final_chunk, chunk_budget);
D_ASSERT(result != OperatorResultType::HAVE_MORE_OUTPUT);
remaining_sink_chunk = false;
} else if (!in_process_operators.empty() && !started_flushing) {
// Operator(s) in the pipeline have returned `HAVE_MORE_OUTPUT` in the last Execute call
// the operators have to be called with the same input chunk to produce the rest of the output

/* PhysicalOperator 在上次执行中返回了 HAVE_MORE_OUTPUT, 需要再次调用这个操作符产生剩下的输出结果. */
D_ASSERT(source_chunk.size() > 0);
result = ExecutePushInternal(source_chunk, chunk_budget);
} else if (exhausted_source && !next_batch_blocked && !done_flushing) {
// The source was exhausted, try flushing all operators
/* 数据源已消费完, 尝试刷新支持缓存的操作符. */
auto flush_completed = TryFlushCachingOperators(chunk_budget);
if (flush_completed) {
done_flushing = true;
break;
} else {
if (remaining_sink_chunk) {
return PipelineExecuteResult::INTERRUPTED;
} else {
D_ASSERT(chunk_budget.IsDepleted());
return PipelineExecuteResult::NOT_FINISHED;
}
}
} else if (!exhausted_source || next_batch_blocked) {
/* 通过 Pipeline 的 Source PhysicalOperator 获取新数据并处理. */
SourceResultType source_result;
if (!next_batch_blocked) {
// "Regular" path: fetch a chunk from the source and push it through the pipeline
source_chunk.Reset();
source_result = FetchFromSource(source_chunk);

if (source_result == SourceResultType::BLOCKED) {
/* 如果数据源被阻塞, 返回中断状态. */
return PipelineExecuteResult::INTERRUPTED;
}

/* 如果数据源已经消费完成, 标记 FINISHED. */
if (source_result == SourceResultType::FINISHED) {
exhausted_source = true;
}
}

if (required_partition_info.AnyRequired()) {
auto next_batch_result = NextBatch(source_chunk);
next_batch_blocked = next_batch_result == SinkNextBatchType::BLOCKED;
if (next_batch_blocked) {
return PipelineExecuteResult::INTERRUPTED;
}
}

if (exhausted_source && source_chunk.size() == 0) {
// To ensure that we're not early-terminating the pipeline
continue;
}

/* 将数据块推送到 Pipeline 中的 PhysicalOperator 处理. */
result = ExecutePushInternal(source_chunk, chunk_budget);
} else {
throw InternalException("Unexpected state reached in pipeline executor");
}

if (result == OperatorResultType::BLOCKED) {
/* 处理 Sink 操作中断, 如果 Sink 操作符被阻塞, 设置 remaining_sink_chunk = true, 返回中断状态. */
remaining_sink_chunk = true;
return PipelineExecuteResult::INTERRUPTED;
}

if (result == OperatorResultType::FINISHED) {
/* 整个 Pipeline 已经处理完成, 直接返回. */
break;
}
} while (chunk_budget.Next());

/* 检查 Pipeline 是否完成:
* 1. Source 数据尚未被完全消费.
* 2. 缓存操作符的刷新操作没有执行.
* 3. 整个 Pipeline 还没有完成. */
if ((!exhausted_source || !done_flushing) && !IsFinished()) {
return PipelineExecuteResult::NOT_FINISHED;
}

/* 整个 Pipeline 已经完成, 执行最终化操作. */
return PushFinalize();
}

用户的主线程在将查询分解成多个 PipelineTask 下发给 TaskScheduler 后, 并不会陷入空闲等待状态, 而是负责监控整个查询任务的执行过程 PendingQueryResult::ExecuteInternal(), 其中除了任务过程中的异常处理和协调任务完成的获取查询结果 (ClientContext::FetchResultInternal()), 主线程也会择机选择 TaskScheduler 中属于自身查询的 Task 来执行.

总结

DuckDB 的执行器概念和细节繁多,但是 DuckDB 的开发者将整个执行流程进行了清晰的抽象,各个模块互相交互又不至于耦合过深,这体现了非常优秀的架构设计能力。学习 DuckDB 的代码设计,不仅仅是为了理解执行器的整个流程,对于系统工程师来说学习这种设计理念更为重要:

  1. 流水线的并行化:在系统架构中,Worker Pool 是一个常见的设计,但是大部分的 Worker Pool 都是以 Task 为运行粒度,Task 内部并不会再次拆分,这一方面无法利用多核处理器,在内存方面需要一次性加载所有的数据。DuckDB 将一个 Task 抽象成完全的流水线,在处理一个复杂的 SQL 时,比如多表连接,将其进一步拆分成多个小任务,使用 DAG 来组织任务,整个处理过程就变得清晰可控。拆分成流水线以后,DuckDB 还能将复杂的算子,比如一个 SCAN 操作进行并行的处理。

  2. 负载均衡:DuckDB 使用无锁队列直接就解决了任务分配的负载问题,共享队列模型本质上避免了 stealing 的需要,并且整个调度器的线程数量也可以动态的根据负载进行调整,尽可能的充分利用计算资源。

  3. 精细的任务控制:DuckDB 支持将小任务进行更精细化的拆分:

    1
    2
    3
    4
    enum class TaskExecutionMode : uint8_t {
    PROCESS_ALL, /* 完全执行任务。 */
    PROCESS_PARTIAL /* 部分执行任务。 */
    };

    DuckDB 通过 scheduler_process_partial 配置选项来控制默认的执行模式,默认情况下,这个选项是 false,但在某些验证模式下可以设置为 true。在大多数场景下,DuckDB 的 TaskScheduler 都是使用 PROCESS_ALL 模式,但是主线程(用户线程)是使用 PROCESS_PARTIAL,如果使用了 PROCESS_PARTIAL mode,通过 ExecutionBudget 会限制每批处理的 Chunk 数量,主线程还有协调任务和监控异常的额外工作,所以长期陷入任务执行是不友好的设计。

  4. 依赖关系的处理:DuckDB 引入了 MetaPipeline 的概念,代表一组具有相同 Sink 的流水线,这种设计允许更精细地管理复杂查询中多个流水线之间的依赖关系。流水线内部依赖:同一 MetaPipeline 内不同流水线之间的依赖,流水线间依赖:不同 MetaPipeline 之间的依赖,特殊操作符依赖:如 Join 操作的构建侧和探测侧之间的依赖。整个依赖体系层层递进,实现较为优雅.