Apollo Cyber RT 调度系统

Apollo Cyber RT 调度系统解析

前言

上篇博客中,我简要地给大家介绍了 Apollo 系统,以及它的代码文件结构,并说明了一下 Cyber RT 在 Apollo 系统中的地位(是的,我调整了一下博客的内容,使之变得更加均衡合理)。Cyber RT 在系统的任务调度方面有重要的作用,又和实时系统要求密切相关。因此,我打算将调度系统作为一个切入点,在本篇博客中,我将会给大家介绍一下 Cyber RT 的调度系统。

Cyber中的调度

自动驾驶系统的有三大流程:感知、决策、执行。例如,车上的传感器感知到障碍物,判断其类型、运动轨迹等,再做出决策,最后再到刹车、油门和方向的控制,这会经过一系列模块的计算。这些模块在计算过程中会产生数据依赖,如果用箭头将模块间的数据依赖表示出来,就会形成图的拓扑结构。

在 Apollo 项目中,通常使用 DAG file 来描述计算图的拓扑结构。由于自动驾驶系统牵扯到很多的步骤,有很复杂的流程,相应的,这些计算图也十分庞大。因此,如何调度整个计算图使系统能满足各种时间约束,达到系统的实时性和确定性,是个巨大的挑战。

接下来,我们就详细地剖析一下 Cyber RT 的调度系统💪!

conf 配置文件

Cyber 调度的配置文件在 cyber/conf 文件夹中,配置文件详细说明了线程名、线程的 CPU 亲和性、调度策略,对于协程,还有分组情况、协程的优先级等等。

方便起见,我从官方文档中举例,文档对每项设置的说明都非常具体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
scheduler_conf {
// 1. 设置调度器策略
policy: "classic"
// 2. 设置cpu set
process_level_cpuset: "0-7,16-23" # all threads in the process are on the cpuset
// 3. 设置线程的cpuset,调度策略和优先级
threads: [
{
name: "async_log"
cpuset: "1"
policy: "SCHED_OTHER" # policy: SCHED_OTHER,SCHED_RR,SCHED_FIFO
prio: 0
}, {
name: "shm"
cpuset: "2"
policy: "SCHED_FIFO"
prio: 10
}
]
classic_conf {
// 4. 设置分组,线程组的cpuset,cpu亲和性,调度策略和优先级
// 设置调度器创建"processor"对象的个数,以及协程的优先级。
groups: [
{
name: "group1"
processor_num: 16
affinity: "range"
cpuset: "0-7,16-23"
processor_policy: "SCHED_OTHER"
// policy:
// SCHED_OTHER 默认策略 分时调度策略
// SCHED_RR 实时调度策略 时间片轮转
// SCHED_FIFO 队列 先到先服务策略
processor_prio: 0
tasks: [
{
name: "E"
prio: 0
}
]
},{
name: "group2"
processor_num: 16
affinity: "1to1"
cpuset: "8-15,24-31"
processor_policy: "SCHED_OTHER"
processor_prio: 0
tasks: [
{
name: "A"
prio: 0
},{
name: "B"
prio: 1
},{
name: "C"
prio: 2
},{
name: "D"
prio: 3
}
]
}
]
}
}

那么上面的 conf 文档描述了怎么样的调度策略呢?

还是参考官方文档,对于配置文件中已经编排好的任务,其拓扑结构就依据优先级确定了。我们根据上面的 conf 文档,可以简单画出任务的优先级拓扑情况,如下图,A、B、C、D 任务在第一个 group 中执行,E在第二个 group 中执行,对于没有出现在配置中的任务,比如F默认会放到第一个 group 中执行(下文会提到哦)。 而且配置中我们对于任务进行了优先级设置,A、B、C、D 的任务优先级依次增大,正好对应下图的拓扑依赖关系,在链路中越靠后的任务优先级越高。其实,数据也是这样在任务拓扑图中传递,数据走到最后,执行的任务优先级越高,这是为保证整个流程可以快速走完,不被其他流程的任务打断。

调度策略

Apollo 提供了两种调度策略,一种是 classic 策略,在代码中,用 SchedulerClassic 类实现;另一种是 Choreography 策略,代码中用 SchedulerChoreography 类实现。对于这两个策略,我们先给一个大致的描述,好让大家理解之间的差异,稍后在代码分析中,会详细解释这些实现 :smile: 。

  • classic 策略

    • 较为通用的调度策略
    • 如果对当前自动驾驶车辆上的 DAG 结构不清楚,建议使用此策略
    • 相关协程任务以组为单位与线程作绑定
    • SchedulerClassic 采用了协程池的概念,协程不会绑定到具体的 Processor,而是放在全局的优先级队列中。Processor 运行时,每次从最高优先级的任务开始调度执行。
  • choreography 策略

    • 需要对车上的任务、结构足够熟悉
    • 根据任务的执行依赖关系、任务的执行时长、任务 CPU 消耗情况、消息频率等,对某些任务进行编排
    • SchedulerChoreography 类采用了本地队列和全局队列相结合的方式。他将主链路(choreography开头的配置)进行编排;而对非主链路的任务放到线程池中使用 classic 策略执行。

当使用 choreography 策略时,具体该怎么办?Well,根据任务优先级、执行时长、频率与调度之间的关系,任务编排有如下几个依据(经验):

  • 在同一个路径上的任务尽量编排在同一个 Processor 中,如果 Processor 负载过高,可考虑将部分任务拆分到其他 Processor
  • 为防止优先级倒挂,同一个路径上的任务从开始到结束,优先级应逐级升高
  • 不同路径上的任务尽量混排
  • 高频且短耗时任务尽量编排在同一个 Processor

另:据 Dig-into-Apollo中的说法,该调度策略与 Go 语言中的 GPM 模型相似。

Scheduler、Processor & Context

我从参考文献1中找到一张不错的类关系图,可以很好地帮我说明一下这些类的关系:

Scheduler

Scheduler 类使用单例模式Instance() 方法被第一次调用时会加载 conf 文件,此时会根据配置文件中指定的类型创建 SchedulerClassic 或者 SchedulerChoreography 对象。

这就涉及到 Scheduler 类的构造函数,在这里,我不想展示具体代码的任何细节,因为这只会把原本就复杂的事物越搞越乱(如果你真的想仔细了解,还是亲自下场看一下代码吧),我经过总结,认为其步骤:

  1. 读取 conf 配置文件,如果读取配置文件失败,会设置默认值
  2. 将内部线程信息保存到查询表中,并设置线程的亲和性和优先级等属性
  3. 将所有的任务按照配置文件的要求分为 group ,设置进程级别的cpuset
  4. 根据配置文件要求创建线程,并包装为 Processor(执行器),绑定上下文。

这边我要多提一句,对于 choreography 策略,在创建线程时会比较特殊,它依照配置文件的要求,把所有线程分为两个部分(就如你在本篇博客最末尾看到的那样),其中一部分线程与 ClassicContext 绑定,另一部分与 Choreography 绑定,这意味着 choreography 策略负责编排一部分任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void SchedulerChoreography::CreateProcessor() {
for (uint32_t i = 0; i < proc_num_; i++) {
auto proc = std::make_shared<Processor>();
auto ctx = std::make_shared<ChoreographyContext>();
proc->BindContext(ctx);
SetSchedAffinity(proc->Thread(), choreography_cpuset_,
choreography_affinity_, i);
SetSchedPolicy(proc->Thread(), choreography_processor_policy_,
choreography_processor_prio_, proc->Tid());
pctxs_.emplace_back(ctx);
processors_.emplace_back(proc);
}

for (uint32_t i = 0; i < task_pool_size_; i++) {
auto proc = std::make_shared<Processor>();
auto ctx = std::make_shared<ClassicContext>();
proc->BindContext(ctx);
SetSchedAffinity(proc->Thread(), pool_cpuset_, pool_affinity_, i);
SetSchedPolicy(proc->Thread(), pool_processor_policy_, pool_processor_prio_,
proc->Tid());
pctxs_.emplace_back(ctx);
processors_.emplace_back(proc);
}
}

除了构造函数之外,Scheduler 还负责分发、移除任务,也可以唤醒(Notify)某个任务。

Update at 11.00 29th Oct in 2020.


一开始写这篇博客时,并没有把Scheduler 类的创建、分发、唤醒、移除任务讲清楚,那么今天我来把这个坑补上。

首先是创建任务,在 Cyber RT 组件中,我说过 Component::Initialize() 中创建的处理消息函数,会被首先用于创建协程工厂,然后 Scheduler 会使用 CreateTask() 函数创建协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool Scheduler::CreateTask(std::function<void()>&& func,
const std::string& name,
std::shared_ptr<DataVisitorBase> visitor) {
if (cyber_unlikely(stop_.load())) {
/* 错误处理 */
}
// 登记任务名字,获得id
// GlobalData 中有一个 id 与 name 一一对应的表格
// 比起直译map这个单词 我更喜欢用表 因为这更容易让人理解
auto task_id = GlobalData::RegisterTaskName(name);
// 创建协程
auto cr = std::make_shared<CRoutine>(func);
cr->set_id(task_id);
cr->set_name(name);
// 分配任务
if (!DispatchTask(cr))
return false;
// 数据访问者 登记回调函数
if (visitor != nullptr) {
visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load()))
return;
// 唤醒 Processor 让它开始处理任务
this->NotifyProcessor(task_id);
});
}
return true;
}

在上面的代码中,可以看到协程创建完毕后,还会被分配任务,那么分配任务具体做什么呢?在 SchedulerClassic 类中,DispatchTask() 首先把协程 id 和其指针一一对应起来,放入到 id_cr_ 表中(下图),然后根据配置文件设置这个协程的优先级和协程组,如果配置文件中没有提到该协程,默认放入组 0 中,最后将该协程根据其优先级,放入到队列(下图就有)中,最后唤醒所在组的上下文;对于 SchedulerChoreography 类,还需要多一步来判断协程对应的 Processor 类。那么相似的道理,移除任务就是先将协程停止,从表中、队列中移除。

最后 Scheduler::NotifyProcessor() 会唤醒 Processor ,它首先检查传入 id 对应的协程的状态,然后就调用了上下文的 Notify() 函数,让控制线程的条件变量唤醒一个线程(Processor 类)。那么等待一个线程呢?也是控制这个条件变量就行了😂,使用 Wait_for 函数就可以满足要求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ClassicContext::Notify(const std::string& group_name) {
(&mtx_wq_[group_name])->Mutex().lock();
notify_grp_[group_name]++;
(&mtx_wq_[group_name])->Mutex().unlock();
cv_wq_[group_name].Cv().notify_one();
}

void ClassicContext::Wait() {
std::unique_lock<std::mutex> lk(mtx_wrapper_->Mutex());
cw_->Cv().wait_for(lk, std::chrono::milliseconds(1000),
[&]() { return notify_grp_[current_grp] > 0; });
if (notify_grp_[current_grp] > 0)
notify_grp_[current_grp]--;
}

Processor

前面说到,调度器中会创建线程,并包装为 ProcessorScheduler 根据 conf 文件初始化线程,并创建若干个 Processor。注意,我再此强调 Processor 并不是物理上的处理器,本质就是一个线程而已。为了加强印象,也为了后续方便理解,我在这里给出 Processor 的定义代码:

1
2
3
4
5
6
7
8
9
10
11
class Processor {
public:
//...
private:
std::shared_ptr<ProcessorContext> context_; // 上下文
std::condition_variable cv_ctx_; // 条件变量
std::once_flag thread_flag_; // 线程 flag
std::mutex mtx_ctx_; // 互斥锁
std::thread thread_; // 线程
std::atomic<pid_t> tid_{-1}; // 线程id
std::atomic<bool> running_{false}; // 是否运行

好了,Processor 最重要的部分还是它如何运行。重要到什么程度?额嗯,我甚至可以花大篇幅把这部分代码完整地列出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Processor::Run() {
// 1. 获取线程的PID,系统内唯一
tid_.store(static_cast<int>(syscall(SYS_gettid)));
snap_shot_->processor_id.store(tid_);

while (cyber_likely(running_.load())) {
if (cyber_likely(context_ != nullptr)) {
// 2. 获取优先级最高并且准备就绪的协程
auto croutine = context_->NextRoutine();
if (croutine) {
snap_shot_->execute_start_time.store(cyber::Time::Now().ToNanosecond());
snap_shot_->routine_name = croutine->name();
// 3. 执行协程任务,完成后释放协程
croutine->Resume();
croutine->Release();
} else {
snap_shot_->execute_start_time.store(0);
// 4. 如果协程组中没有空闲的协程,则等待
context_->Wait();
}
} else {
// 5. 如果上下文为空,则线程阻塞10毫秒
std::unique_lock<std::mutex> lk(mtx_ctx_);
cv_ctx_.wait_for(lk, std::chrono::milliseconds(10));
}
}
}

其实 Processor::Run() 的逻辑很简单,就是不断地调用 ProcessorContext::NextRoutine() 函数,取得下一个协程(任务)。如果没取到,就调用 ProcessorContext::Wait() 等待。如果取到了,就调用 CRoutine::Resume() ,让任务继续运行。

看起来关键是 NextRoutine() 是如何挑选下一个任务的。参考上图,ProcessorContext 有两个派生类 ClassicContextChoreographyContext 。它们的实现因调度策略不同而不同。前者是按优先级从高到低从所在 group 对应的任务队列中取任务,取到后,需要判断其状态是否为 READY;后者也是按优先级从高到低的顺序,会将主链路上的任务与非主链路的任务分开列队,并且会为主链路上的任务提供指定的 Processor 。具体的内容很快我们就会在下一小节看到😀。

Context

ProcessorContext 类是一个抽象基类,它的实现非常简单,你甚至不用怀疑你的第一直觉,没错,NextRoutine()Wait() 函数就是它最重要的部分。

1
2
3
4
5
6
7
8
9
class ProcessorContext {
public:
virtual void Shutdown();
virtual std::shared_ptr<CRoutine> NextRoutine() = 0;
virtual void Wait() = 0;

protected:
std::atomic<bool> stop_{false};
};

我们重点关注 ClassicContextChoreographyContext 。首先,看一下最关键的 ProcessorContext::NextRoutine() 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//  classic 调度策略的 NextRoutine()
std::shared_ptr<CRoutine> ClassicContext::NextRoutine() {
if (cyber_unlikely(stop_.load()))
return nullptr;
// 1. 从优先级最高的队列开始遍历
for (int i = MAX_PRIO - 1; i >= 0; --i) {
// 2. 获取当前优先级队列的锁
ReadLockGuard<AtomicRWLock> lk(lq_->at(i));
for (auto& cr : multi_pri_rq_->at(i)) {
if (!cr->Acquire())
continue;
// 3. 返回状态就绪的协程
if (cr->UpdateState() == RoutineState::READY)
return cr;
cr->Release();
}
}
return nullptr;
}
// Choreography 的调度策略的 NextRoutine()
std::shared_ptr<CRoutine> ChoreographyContext::NextRoutine() {
if (cyber_unlikely(stop_.load()))
return nullptr;
ReadLockGuard<AtomicRWLock> lock(rq_lk_);
for (auto it : cr_queue_) {
auto cr = it.second;
if (!cr->Acquire())
continue;
if (cr->UpdateState() == RoutineState::READY)
return cr;
cr->Release();
}
return nullptr;
}

我们仔细比较一下上文的两个函数,emm……乍一看似乎没什么区别,确实,但细节往往隐藏了海量信息。注意到,ClassicContext 使用的队列是 multi_pri_rq_ChoreographyContext 使用的队列是 cr_queue_ ,两者类型如下:

1
2
3
4
5
6
7
using CROUTINE_QUEUE = std::vector<std::shared_ptr<CRoutine>>;
using MULTI_PRIO_QUEUE = std::array<CROUTINE_QUEUE, 20>;
using CR_GROUP = std::unordered_map<std::string, MULTI_PRIO_QUEUE>;

MULTI_PRIO_QUEUE *multi_pri_rq_; // 这是 ClassicContext 内的一个私有成员
std::multimap<uint32_t, std::shared_ptr<CRoutine>, std::greater<uint32_t>> cr_queue_;
alignas(CACHELINE_SIZE) static CR_GROUP cr_group_;

看清楚了吧,事实上 ClassicContext 使用了多个优先级队列,而ChoreographyContext 用的“队列”只是了一个表(对于 multimapmap ,我更喜欢用表格称呼它们)。如果你想进一步了解 multi_pri_rq_,那么可以仔细看一下下面这张图。再结合上面的代码,可以得出两点结论:

  • CR_GROUP 相当于组名与优先级队列的表格,是一个全局变量,有序地存放了系统中所有的协程。所以说,在 Classic 策略中,“协程是放在全局的优先级队列中处理的”。
  • 每一个协程组与一个 ClassicContext 对象对应,一个 ClassicContext 也与一个 Processor 绑定,即与一个线程绑定。这就是我前文所说的“相关协程以组为单位与线程做绑定”的直接证据。
而反观 Choreography 策略,`ChoreographyContext` 只管理那些**主链路**上的协程(任务),不会对协程进行分组。而对于非主链路上的任务,Choreography 策略会把它们扔给 `ClassicContext` 上下文处理。

说完这个,我们再看看 Processor 的线程阻塞和唤醒是怎么操作的:没错,通过上下文的 Wait()Notify() 函数 :man_shrugging:。

1
2
3
4
5
6
7
8
9
10
11
void ClassicContext::Wait() {
// 1. 获取锁
std::unique_lock<std::mutex> lk(mtx_wrapper_->Mutex());
// 2. 等待条件大于0
cw_->Cv().wait_for(lk, std::chrono::milliseconds(1000),
[&]() { return notify_grp_[current_grp] > 0; });
// 3. 对应协程组的唤醒条件减1
if (notify_grp_[current_grp] > 0) {
notify_grp_[current_grp]--;
}
}
1
2
3
4
5
6
7
8
9
void ClassicContext::Notify(const std::string& group_name) {
// 1. 加锁
(&mtx_wq_[group_name])->Mutex().lock();
// 2. 协程唤醒条件加1
notify_grp_[group_name]++;
(&mtx_wq_[group_name])->Mutex().unlock();
// 3. 唤醒线程
cv_wq_[group_name].Cv().notify_one();
}

看来唤醒和等待都是通过上下文的条件变量 cw_ 实现的,对于 Choreography 策略来说,实现也是类似的。

整体结构

现在,我们跳出这些恼人的代码,俯视整个调度结构。幸运的是,参考文献 [1] 已经提供了两张非常好的图片:我在这里也不厌其烦地重复说一下吧😓

  • 调度系统中,Scheduler 类统管了所有资源,很显然,它必然是单例。
  • 每个 Processor 封装了一个 std::thread ,并于一个 ProcessorContext 对象绑定。
  • 切换协程(任务)由上下文完成,过程几乎都是优先级从高到底遍历,选中已经就绪的协程开始运行。
  • 在一个完整的动作流程中,任务的优先级几乎都是从低到高的。
  • Choreography 调度策略,主要是针对主链路上的任务进行编排,这些任务会被分配到程序员指定的 Processor 上,且执行先后关系明确,需要对系统有足够深入的了解。

参考文献

[1] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的任务调度

[2] 百度 Apollo Cyber Docs

[3] Dig-into-Apollo

[4] Golang 中的协程

[5] 自动驾驶平台Apollo 3.5阅读手记:Cyber RT中的协程


Apollo Cyber RT 调度系统
https://dingfen.github.io/2020/10/17/2020-10-17-Cyber RT/
作者
Bill Ding
发布于
2020年10月17日
更新于
2024年4月9日
许可协议