Apollo Cyber RT 定时器

Apollo Cyber RT 定时器

前言

首先回顾一下之前的内容。Apollo 是百度开发的自动驾驶开源框架,具有高性能和高灵活性的特点,我主要介绍 Apollo 5.5 版本。Apollo Cyber RT 是 Apollo 团队在注意到 ROS 无法满足自动驾驶系统的鲁棒性和性能要求后,专门为自动驾驶场景设计的开源、高性能运行时框架。Cyber RT 的调度系统给出了两个调度策略,使用协程来处理任务,并以优先级为主要依据调度协程。

OK,介绍完 Apollo 自动驾驶系统和 Cyber RT 运行时框架的调度部分后,今天我来介绍一下 Cyber RT 的定时器。当然,本人水平有限,对搜寻的资料的概括和对代码的理解难免有些错误、遗漏,恳请大家讨论、指正。

什么是定时器

在操作系统内核中,时间管理与定时器是重要的组成部分。相对于事件驱动而言,内核中有大量的函数都是基于时间驱动的,其中有些是周期性的,有些是等待一个相对时间后执行,有些是在绝对时间上执行。定时器是管理系统流逝的时间的基础,能够使工作在指定时间点上执行。

定时器的使用很简单,你只需要执行一些初始化工作,设置一个超时时间,指定超时后执行的函数,最后激活定时器就可以了。

Linux 中的定时器

为了给大家留下一个深刻的印象,我先拿如何使用 Linux 内核中的定时器来说明。Linux 的内核定时器定义在文件 <linux/timer.h> 中,其结构如下1(2.6 版本):

1
2
3
4
5
6
7
struct timer_list {
struct list_head entry; // 定时器链表的入口
unsigned long expires; // 以 jiffies 为单位的定时器
void *(function)(unsigned long); // 定时器处理函数
unsigned long data; // 传给处理函数的长整型参数
// ...
}

首先,我们需要定义一个定时器变量,以创建定时器。

1
struct timer_list my_timer;

接着需要通过一个辅助函数来初始化定时器内部的数值,初始化必须在使用其他定时器管理函数对定时器操作前完成。

1
init_timer(&my_timer);

然后,我们就可以设置数据结构中的值了。

1
2
3
my_timer.expires = jiffies + delay;	// 定时器超时时的节拍数
my_timer.data = 0; // 给定时器处理函数传入 0 值
my_timer.function = my_function; // 定时器超时时调用的函数

只有当当前的 jiffies 值(全局变量,用来记录自系统启动以来产生的节拍总数)大于或等于 my_timer.expires 时,my_timer.function 所指向的处理函数就会开始执行,当然函数还需要长整型参数 my_timer.data ,也就是说,处理函数必须符合以下原型:

1
void my_timer_function(unsigned long data);

最后,激活定时器。大功告成!

1
add_timer(&my_timer);

需要特别注意到的是,Linux 系统中的实时调度算法,以及上面所说的定时器都是软实时的。所谓软实时,对于实时调度算法来说,就是指系统会尽可能使进程在它的限定时间到来前运行,但不保证总能满足这些进程的要求。对于定时器来讲,就是系统在指定时间到来后,可能会推迟定时器处理函数的执行。所以不能用它们来实现任何硬实时任务

定时器——算法与数据结构

双向链表

在 Linux 系统中,所有的定时器都以双向链表的形式存放在一起(timer_list 数据结构说明了这点),但是内核每次都需要遍历整个队列来找到超时的定时器,这在队列很长时是无法忍受的。虽然我们可以很方便地实现插入、删除定时器的操作,但查询的时间过慢。

另一种方案,将链表以超时时刻进行排序也很难令人满意。因为这样虽然可以优化查询时间,但增删一个定时器就会非常费时。为此,Linux 内核采取的方案是,将定时器按它们的超时时刻划分为五组,当定时器超时时间接近时,定时器将随组一起移动。采取分组定时器的方法可以在执行软中断的多数情况下,确保内核尽可能减少搜索超时定时器所带来的负担,但不能用定时器来实现硬实时任务。

时间轮

那么,Apollo Cyber RT 中的定时器是怎么实现的呢?Cyber RT 的定时器有一个时间轮2,负责安排定时任务的启动顺序。时间轮最早由 George Varghese 提出,目的就是为解决传统算法中操作系统定时器的任务启动与管理的低效率问题。使用时间轮调度定时器任务,可以在 O(1) 时间内完成启动、停止和查找管理定时器。

下面举个例子来说明时间轮的工作原理3。如下图所示,这是一个简单的时间轮。

它一共有 8 格槽位(bucket),每个槽位代表了一个单位时间。整个轮子类似于时钟,每过一个单位时间,上面的指针就会往下走一格。开始时指向第 0 个槽位,过了一个单位时间后,指向第 1 个槽位。每个槽位中可能会有多个任务,用链表将他们连接起来。

我们通过一个例子来具体说明时间轮是如何添加、删除和查找任务。假设单位时间为 1 秒。当前有 2 个定时任务A、B,分别需要 3 秒、11 秒执行一次。见下图,目前指针指在 0 的位置,3 秒钟之后指针将指向 bucket[3] 的位置,因此我们把任务A放入 bucket[3] 中,接下来我们再看如何放置任务 B,任务 B 是 11 秒之后执行,也就是说时间轮转一圈之后,再过 3 秒种,任务 B 才执行。为标记任务的圈数,引入了 round ,round 为 1 就表示需要 1圈,同理推广到其它圈数。我们把 B 任务也放入 bucket[3],但是设置它的 round 为 1。 我们先看下任务 A 和任务 B 的执行过程,3 秒钟之后时间轮转到 bucket[3],这时候检查 bucket[3] 中的任务,只执行 round 为 0 的任务,这里执行任务 A,然后把 bucket[3] 中所有任务的 round 减1,这时候任务B的 round 数为 0 了,等到时间轮转一圈之后,就会执行任务B了。任务 A 执行完成之后,会把任务 A 从 bucket[3] 中删除,然后重新计算时间 3+3,放入 bucket[6] 中,等到 bucket[6] 执行完成之后,然后再放入 (6+3)% 8 取余,放入 bucket[1] 中。每次任务执行完成之后,都需要重新计算任务的时间,确定 bucket 的位置,然后放入对应的 bucket 中。

通过上面这个简单的例子,大家应该可以明白,时间轮算法的插入、删除、查找执行的复杂度都是O(1),由时间轮实现的定时器非常高效。

Cyber 定时器

cyber/timer/timer.h 中,classTimer 用于执行单次或周期性的定时任务。它的某些用户接口如下:

  • Timer(uint32_t period, std::function<void()> callback, bool oneshot) 
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    创建一个新的 [Timer](https://cyber-rt.readthedocs.io/en/latest/api/cppapi.html#classapollo_1_1cyber_1_1_timer) 对象。

    - `period`:定时器的周期,单位为 ms
    - `callback`:定时器需要执行的任务
    - `oneshot`:True:仅在接下来第一个周期中执行 `callback` 。False:在每个周期中都执行 `callback`

    - ```c
    void SetTimerOption(TimerOption opt)
    设置 [TimerOption](https://cyber-rt.readthedocs.io/en/latest/api/cppapi.html#_CPPv4N6apollo5cyber11TimerOptionE) 对象。其中 `TimerOption` 对象的三个成员恰好就是上面函数的三个参数。
  • void Start() 
    
    1
    2
    3
    4
    5

    开始计时

    - ```c
    void Stop()
    停止计时

和我们的理论一样,定时器最重要的无非就是这三个:1)触发任务的时刻(或者说周期)。2)需要执行什么任务(称之为回调函数)。3)考虑到周期性要求,加上是否为单次触发的标志。

初始化与定时

让我们先抛开时间轮这个复杂的玩意,先从 Timer 出发,了解一下 Timer 的初始化与定时过程,之后再逐步了解时间轮的工作机制。Timer 初始化过程很简单,就是在 Timer 对象中创建 Task 任务并注册回调函数(callback)。

但回调函数在到指定时刻后,会:

  1. 开始执行具体的函数操作
  2. 得到累计时间误差和实际执行时间,并重新计算任务下一次执行的间隔
    1. 如果执行的时间已经大于给定的周期,那么在下一个 tick 中马上执行。
    2. 如果出现累计误差,进行调整补偿(下文会提到如何补偿)
  3. 向时间轮中增加定时任务(限定于周期性)

我简单地画了一张图,方便大家理解。考虑到每个周期执行完后,都会因各种各样的原因产生时间误差,这些时间误差累积起来会对整个系统的时间确定性产生致命的影响,因此必须采取措施消除误差。参看下图,只要重新计算 next_fire_duration = interval - execute_time - error_time ,就可以校正误差。

重头戏:时间轮

在一切开始之前,我们先明确一下:时间轮是一个单例模式,而定时器会有多个。这意味着所有的定时器的所有定时任务都会出现在这一个时间轮上。

好,再来看一下 cyber/timer/timing_wheel.h 文件,我们可以了解到时间轮的详细设计情况。说实话,比我想象的要复杂一些😓。Cyber 有两级时间轮,其中,主时间轮(第一级时间轮)有 512 个槽位,而辅助时间轮(第二级时间轮)有 64 个槽位,主时间轮的最小单位时间为 2 ms,而辅助时间轮的单位时间是 512 * 2 ms,这是因为辅助时间轮是在主时间轮不够表示比较长的时间周期的情况下,使用的,意思是说主时间轮转一圈,辅助时间轮走一格。这就意味着,Cyber 支持的最长的周期就是 512 * 64 * 2 ms,大概是 10.75 分钟。

那么有了这些预备知识后,前文我们提到的“定时器会向时间轮内增加任务”的步骤,就比较清楚明白了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void TimingWheel::AddTask(const std::shared_ptr<TimerTask>& task,
const uint64_t current_work_wheel_index) {
if (!running_) {
Start(); /* 稍后来关注如何 Start */
}

// 先计算一下 任务应该被分配到哪个槽位
auto work_wheel_index = current_work_wheel_index +
static_cast<uint64_t>(std::ceil(
static_cast<double>(task->next_fire_duration_ms) /
TIMER_RESOLUTION_MS));
// 如果超出最大值,产生了绕圈
if (work_wheel_index >= WORK_WHEEL_SIZE) {
auto real_work_wheel_index = GetWorkWheelIndex(work_wheel_index);
task->remainder_interval_ms = real_work_wheel_index;
auto assistant_ticks = work_wheel_index / WORK_WHEEL_SIZE;
// 没有被套圈,那就直接加入到槽位里面
if (assistant_ticks == 1 &&
real_work_wheel_index < current_work_wheel_index_) {
work_wheel_[real_work_wheel_index].AddTask(task);
} else {
// 被套圈了,需要加入到辅助时间轮内,否则会被过早地运行
auto assistant_wheel_index = 0;
{
std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
assistant_wheel_index = GetAssistantWheelIndex(
current_assistant_wheel_index_ + assistant_ticks);
assistant_wheel_[assistant_wheel_index].AddTask(task);
}
}
} else {
// 一切正常,直接加入到槽位
work_wheel_[work_wheel_index].AddTask(task);
}
}

那么这一切是怎么开始的呢?我们回到一开始忽略的 TimingWheel::Start() 函数:

1
2
3
4
5
6
7
8
void TimingWheel::Start() {
std::lock_guard<std::mutex> lock(running_mutex_);
if (!running_) {
running_ = true;
tick_thread_ = std::thread([this]() { this->TickFunc(); });
scheduler::Instance()->SetInnerThreadAttr("timer", &tick_thread_);
}
}

哎嘿,有没有发现,这里居然需要用到之前讲的 scheduler 类,把时间轮内的线程加载到 scheduler 类的框架中统一管理,详见Apollo Cyber RT 调度系统解析

问题自然就来了,这个时间轮自带的线程需要干嘛呢?我们看一下它调用的 TickFunc() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void TimingWheel::Tick() {
// 取出主时间轮中 当前指向的槽位的列表
auto& bucket = work_wheel_[current_work_wheel_index_];
{
std::lock_guard<std::mutex> lock(bucket.mutex());
auto ite = bucket.task_list().begin();
// 遍历整个列表 依次执行所有任务
while (ite != bucket.task_list().end()) {
auto task = ite->lock();
if (task) {
auto* callback =
reinterpret_cast<std::function<void()>*>(&(task->callback));
cyber::Async([this, callback] {
if (this->running_) {
(*callback)();
}
});
}
// 执行完一个删一个
ite = bucket.task_list().erase(ite);
}
}
}

void TimingWheel::Cascade(const uint64_t assistant_wheel_index) {
// 辅助时间轮移动了一个
auto& bucket = assistant_wheel_[assistant_wheel_index];
std::lock_guard<std::mutex> lock(bucket.mutex());
auto ite = bucket.task_list().begin();
// 把辅助时间轮中的任务全部取出来,扔到主时间轮中 因为它们马上就要执行了!
while (ite != bucket.task_list().end()) {
auto task = ite->lock();
if (task) {
work_wheel_[task->remainder_interval_ms].AddTask(task);
}
ite = bucket.task_list().erase(ite);
}
}

void TimingWheel::TickFunc() {
Rate rate(TIMER_RESOLUTION_MS * 1000000); // ms to ns
while (running_) {
Tick();
tick_count_++;
rate.Sleep();
// 主时间轮移动一格
{
std::lock_guard<std::mutex> lock(current_work_wheel_index_mutex_);
current_work_wheel_index_ =
GetWorkWheelIndex(current_work_wheel_index_ + 1);
}
// 当主时间轮转完一圈后,辅助时间轮会移动到下一格
if (current_work_wheel_index_ == 0) {
{
std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
current_assistant_wheel_index_ =
GetAssistantWheelIndex(current_assistant_wheel_index_ + 1);
}
// 辅助时间轮 指向的槽位 要把所有槽位内的任务都移入到主时间轮中
Cascade(current_assistant_wheel_index_);
}
}
}

代码有点长,还涉及了很多我没有见过的类,理解起来有点困难,但是振作一点,我们快要胜利了💪。结论先行,这个线程其实就是维持时间轮运行的“齿轮”。首先,rate(TIMER_RESOLUTION_MS * 1000000) 将时间精确到了纳秒级别,以便于精细化控制。然后就进入了主循环:

  1. 首先调用 Tick() 函数,从代码里我们不难理解,该函数就是将已经到点的定时任务启动执行,而且从 cyber::Async 名字上看,还是异步执行(或许我下一篇博客可以聊一聊这个?)。任务执行完成后,全部从列表中删除。
  2. 将主时间轮的指针往前拨动一格。
  3. 如果此时主时间轮一圈走完了,那就应当将辅助时间轮的指针往前拨动一格。
  4. 如果辅助时间轮的指针移动了一格,Cascade 函数就要把辅助时间轮的指针指向的槽位的列表里的所有任务全部取出来放入到主时间轮中,因为很快就要轮到它们执行了。

Update at 9:00 Oct. 23rd

定时器组件

我今天闲来无事,将 Apollo Cyber 的代码打开,看了许久才发现了 Cyber 中使用定时器奥秘。我今后会出一篇博客更详细的解释一下 component 的相关内容,但在这里我就直奔主题了。

打开 cyber/component/timer_component.h 文件,我们就可以一睹 TimerComponent 类的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
class TimerComponent : public ComponentBase {
public:
TimerComponent();
~TimerComponent() override;
bool Initialize(const TimerComponentConfig& config) override;
void Clear() override;
bool Process();
uint64_t GetInterval() const;
private:
virtual bool Proc() = 0;
uint64_t interval_ = 0;
std::unique_ptr<Timer> timer_; // 定时器!!
};

嗯哼,乍一看普普通通,但相信通过前面长篇大论的分析,你一定会对最后一个成员非常敏感!结合类的名字,以及 Component 类在整个系统中的地位,我认为 TimerComponent 类就是一个专门为周期性任务(或者时间敏感性任务等)作 base class 的类。

我们知道,Component 类实际上已经封装得非常好了,Apollo 团队的开发人员只给我们留下了两个接口:Proc()Init() ,而且这两个接口都是私有的,用户只能定义不能调用┑( ̄Д  ̄)┍。因此,真正的初始化和处理函数还不完全是我们自己写的函数。那么既然如此,先来看看这两个函数是怎么写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool TimerComponent::Process() {
if (is_shutdown_.load())
return true;
return Proc(); // 平平无奇,就是先看一下有没有关掉,再运行 Proc() 函数
}

bool TimerComponent::Initialize(const TimerComponentConfig& config) {
if (!config.has_name() || !config.has_interval()) {
AERROR << "Missing required field in config file.";
return false;
}
node_.reset(new Node(config.name()));
LoadConfigFiles(config);
// 把配置文件和信息全部读入后,使用 Init() 函数初始化
if (!Init())
return false;

std::shared_ptr<TimerComponent> self =
std::dynamic_pointer_cast<TimerComponent>(shared_from_this());
// 它把配置文件中的“周期”,以及 func(其实就是 Proc)扔到了定时器中!
auto func = [self]() { self->Proc(); };
timer_.reset(new Timer(config.interval(), func, false));
timer_->Start();
return true;
}

哦,原来定时器在 TimerComponent::Initialize 派上了大用场。TimerComponent 类只需要将周期从配置文件中读出来,再加上用户自己写的 Proc() 函数,就可以新建一个定时器,然后根据我们上面已经介绍的机制,就可以实现定时地做一些周期性工作了。

总结

这篇博客中,我首先介绍了一下定时器的概念,然后,从 Linux 系统出发,对定时器的使用、实现以及涉及的算法做了简单的介绍。由易到难,最终对 Cyber RT 的定时器实现有了详尽的解释。

  • 时间轮算法在启动、停止、查找管理方面的时间复杂度都是 O(1)
  • Apollo Cyber RT 使用了两级时间轮,同时满足了精确的时间单位和长时间的周期需求。
  • 定时器与 scheduler 类和 Task 的管理、运行方面都有非常紧密的关系,值得深入探究。

参考文献

[1] Robert Love Linux Kernel Development Third Edition 11th chapter

[2] George Varghese and Tony Lauck Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility

[3] dig-into-Apollo


Apollo Cyber RT 定时器
https://dingfen.github.io/2020/10/21/2020-10-21-CyberTimer/
作者
Bill Ding
发布于
2020年10月21日
更新于
2024年4月9日
许可协议