Apollo Cyber RT 组件

Apollo Cyber RT 组件

前言

今天,我来给大家介绍一下 Apollo Cyber RT 中组件(Component)的相关知识。老规矩,在这之前,先回顾一下之前的内容。根据课题组的安排,我这段时间一直在研究 Apollo 系统,它是百度开发的自动驾驶开源框架,具有高性能和高灵活性的特点,我主要介绍 Apollo 5.5 版本。其中的 Apollo Cyber RT 是 Apollo 团队在注意到 ROS 无法满足自动驾驶系统的鲁棒性和性能要求后,专门为自动驾驶场景设计的开源、高性能运行时框架。在之前的博客中,我介绍了 Cyber RT 中定时器的相关知识,主要介绍了定时器的算法、实现以及定时器组件如何使用定时器。由此引发了我对组件实现的兴趣,那么这篇博客就详细地介绍一下组件吧。

组件 Component

根据百度 Apollo 团队提供的 Cyber RT 文档1组件(Component)是 Cyber RT 用于构建应用模块的基本类。每个特定的应用模块都可以继承 Component 类并定义自己的函数 Init()Proc() ,之后,该模块就可以被装载入 Cyber RT 框架中。

一般来说,用户有两种选择来使用 Cyber RT 框架:

  • 基于二进制。应用被分别编译成二进制文件,并使用自己创建的 ReaderWriter 来与其他 Cyber RT 模块进行通信
  • 基于组件。应用被编译成一个共享库(Shared library),通过继承 Component 类并写好相应的 dag 文件,Cyber RT 框架会自动装载并运行该应用

不难看出,使用基于组件的方案有明显的优点:

  • 组件可以被不同的进程装载,部署非常灵活
  • 当需要改变接受信道的名字(或者其他属性),可以直接更改 dag 文件,不需要重新编译
  • 组件支持接收多种类型的数据
  • 组件支持提供多种混合策略

用户自定义组件

要创建并启动一个算法组件2,需要通过以下 4 个步骤:

  • 初如化组件的文件结构
  • 实现组件类
  • 设置配置文件
  • 启动组件

官方文档介绍的很详细,在这里我就不啰嗦了。

组件类

在开始前,为更方便大家的理解,建议阅读 Cyber RT 的术语解释Cyber RT Terms ,因为我会反复提到其中的某些术语。

从代码上看,组件基类 ComponentBase 是组件类 Component 和时间组件类 TimerComponent 的基类。仔细看下图(淡蓝色为背景表示它是 privateprotected 的),有几点发现:

  • 一个 Component 类只含有一个 Node ,但可以有多个 Reader
  • Init()Proc() 这两个用户自己定义的函数,都是不可以被直接调用的
  • 用户只能使用 Initialize()Process() 函数来调用自己写的 Init()Proc()

根据代码,Component 类最多可以处理 4 个消息信道( channels of messages),这些信道——即 Reader 对象,最后都会被放入到 ComponentBase::readers_ 变量中,没错,这些所谓的信道在代码实现中就是 Reader ,Apollo 团队并没有设计出 Channel 这样的类🐶。

先从简单的 Process() 函数抓起吧。Process() 非常好理解,就是先判断一下有没有关闭该 Component 类,再调用 Proc() 函数。

1
2
3
4
5
6
7
template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Process(
const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) {
if (is_shutdown_.load())
return true;
return Proc(msg0, msg1);
}

我最关心的就是 Component 类的初始化过程,即 Initialize() 函数,一旦搞清楚了这一点,那么我们就可以更好地理解其他 Cyber RT 部分在整个系统中的作用。经过对代码的详细了解后,我总结出了以下过程:

  1. 创建 Node 节点
  2. 读取配置文件
  3. 调用用户定义的函数 Init()
  4. 创建信道对象,或者说读者 Reader<M>>
    1. 根据配置文件的相关内容,填充信道的相关配置信息 reader_cfg
    2. 创建消息收到时,就会触发的激活函数 invoke func
    3. 如果 is_reality_mode 为真 ,那么直接根据配置信息 reader_cfg 创建信道,不加入激活函数;若不是,需要再加入invoke func 创建信道
  5. is_reality_mode 为真,那么就需要创建数据访问类 DataVisitor 、 协程工厂、调度器,并创建调度任务。

看完我的语言描述后,我觉得是时候上代码给你们看一下它的真面目了。为确保代码简单而又不失一般性,我选择了一个含有两个信道Component 类初始化函数。这部分代码非常重要,后文我们会反复使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
template <typename M0, typename M1>		// 消息的类型 表示 这里有两个信道
bool Component<M0, M1, NullType, NullType>::Initialize(const ComponentConfig& config) {
// 1. 创建 Node 节点
node_.reset(new Node(config.name()));
// 2. 读取配置文件
LoadConfigFiles(config);
if (config.readers_size() < 2)
/* 错误处理 */
if (!Init())
/* 错误处理 */
bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
// 信道的配置信息
ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
// 第 1 个 读信道已经创建
auto reader1 = node_->template CreateReader<M1>(reader_cfg);

reader_cfg.channel_name = config.readers(0).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
// 第 0 个读信道要根据是否 is_reality_mode 来决定使用何种函数创建
std::shared_ptr<Reader<M0>> reader0 = nullptr;
if (cyber_likely(is_reality_mode)) {
reader0 = node_->template CreateReader<M0>(reader_cfg);
} else {
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());

auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
config.readers(1).channel());

auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!blocker1->IsPublishedEmpty()) {
auto msg1 = blocker1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
} else
/* 错误处理 */
};
reader0 = node_->template CreateReader<M0>(reader_cfg, func);
}
if (reader0 == nullptr || reader1 == nullptr)
/* 错误处理 */
// 信道创建完毕,全部存入到 readers_ 数组中
readers_.push_back(std::move(reader0));
readers_.push_back(std::move(reader1));

if (cyber_unlikely(!is_reality_mode)) {
return true;
}
// 创建 scheduler
auto sched = scheduler::Instance();
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto func = [self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1);
} else
/* 错误处理 */
};
// 创建 DataVisitor 和 RoutineFactory 最后创建任务
std::vector<data::VisitorConfig> config_list;
for (auto& reader : readers_)
config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
croutine::RoutineFactory factory = croutine::CreateRoutineFactory<M0, M1>(func, dv);
return sched->CreateTask(factory, node_->Name());
}

不得不承认,初始化的过程复杂度超乎我的想象。看完代码,你应该明白之前的只言片语只是对这一复杂过程的笼统概括😅。接下来,我还需要对上述过程中提到的术语做一些解释。如果你不想看这些繁杂的细节,可以直接跳过

Node 节点

Initialize() 函数一开始,Component 类就创建了 Node 类对象。那么什么是 Node 类?根据官方文档给出的解释3Node 节点类是 Cyber RT 的基本组成部分;每个 Component 对象都包含一个节点,可通过节点进行通信。通过定义节点中的 read/write 和/或 service/client,模块可以具有不同类型的通信模式。Node 对象负责创建 ReaderWriterServiceClient 信道对象来帮该组件获取或传输信息。

这么说好像有点抽象?那我们再来看一下上文的类继承图以及下面的代码块。从类成员的角度看,Node 对象有

  • std::string node_name_ 它的名字
  • std::string name_space_命名空间
  • map<string, ReaderBase> 类型的 readers_ ,它其实就是一个表格,负责保存信道名字 channel_name 与对应的 Reader 读者对象
  • 一个 NodeChannelImpl 指针。 NodeChannelImpl 类是 Node 用来创建与信道相关的 ReaderWriter 对象的类,在真实模式下(下文会介绍),创建的对象是 ReaderWriter,而在模拟模式下,创建的是 IntraReaderIntraWriter 对象,创建后向通信拓扑注册当前节点
  • 一个 NodeServiceImpl 指针。和NodeChannelImpl类似,只不过它创建的是 ServiceClient 对象,创建后也会注册 service
  • 上面提到的两个指针,都指向创建之前提到的四种信道对象的创建器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Node {
public:
template <typename M0, typename M1, typename M2, typename M3>
friend class Component;
friend class TimerComponent;
private:
explicit Node(const std::string& node_name,
const std::string& name_space = "");

std::string node_name_;
std::string name_space_;

std::map<std::string, std::shared_ptr<ReaderBase>> readers_;
std::unique_ptr<NodeChannelImpl> node_channel_impl_ = nullptr;
std::unique_ptr<NodeServiceImpl> node_service_impl_ = nullptr;
};

回到 Initialize() 函数,Node 类创建完后,其主要任务就是创建读者信道:

1
reader0 = node_->template CreateReader<M0>(reader_cfg, func);

信道与配置信息

接下来的两步,读取配置文件和调用 Init() 函数都非常直白,我们直接来看读者信息的配置,或者说,看它配置了哪些读者信息。

1
2
3
4
ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();

一共三个变量被赋值(事实上 ReaderConfig 也只有这三个变量)

  • string channel_name 信道的名字。要求信道的名字不能重复
  • QosProfile qos_profile qos 属性,通信的服务质量
  • uint32_t pending_queue_size 信道缓冲区的长度,如果溢出了,会丢弃较早的消息

在真实模式(下文会提到)中,会调用 node_->template CreateReader<M1>(reader_cfg) 创建一个 Reader 对象,不加入激活函数。而若要仔细检视如何创建 Reader 对象,需要看一下 Node Channel Impl::CreateReader 。这里简单说一下,主要步骤有:

  1. 设定 RoleAttribute 的相关信息,包括信道名字,qos_file ,host_id,node_id 等等
  2. 将新的属性(Attr),激活函数和缓冲区大小作为参数,构造出 Reader<MessageT> 的对象

当然,这里面的故事还没有结束,如果有时间的话,可以进一步研究一下 Cyber RT 的通信模式。

真实模式与模拟模式

接下来,解决一下 is_reality_mode 的问题。通过 GlobalData::Instance()->IsRealityMode() 猜测,它是一个全局数据,再进一步调查发现,它只有两个值:

1
2
3
4
enum RunMode {
MODE_REALITY = 0;
MODE_SIMULATION = 1;
}

那么,什么时候它是 reality,什么时候是 simulation 呢?一般来说,simulation 模式多用于测试,多出现在测试文件中。其默认值为 reality 模式,但如果在测试文件(_test.cc)中,调用了如下函数,就会切换为模拟模式(simulation 模式)。而 reality 模式,即真实模式,根据我的理解,可能就是在系统真实运作、控制自动驾驶系统时的运行模式。在真实模式下,初始化工作非常的明确:为每个信道创建一个 Reader,然后创建回调函数用于调用 Process(),最后创建出对应的协程,让 Scheduler 来运行管理。

1
2
3
void GlobalData::EnableSimulationMode() {
run_mode_ = RunMode::MODE_SIMULATION;
}

模拟模式与真实模式的最大差别就是,数据来源不是真实传感器实时获取的数据了。那么,模拟模式的数据从哪获得呢?在代码中,模拟模式的信道由 IntraReaderIntraWriter 类实现,这些类获取的数据也不是从协程中获得,而是通过 Blocker 类获得模拟数据(或历史数据)。为方便说明,把 Component::Initialize() 函数的模拟模式部分截取过来👇:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
if (is_reality_mode) {
// ...
} else {
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
config.readers(1).channel());
// 特殊的回调函数
auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!blocker1->IsPublishedEmpty()) {
auto msg1 = blocker1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
} else {
AERROR << "Component object has been destroyed.";
}
};
reader0 = node_->template CreateReader<M0>(reader_cfg, func);
}

NodeChannelImpl::CreateReader() {
// ....
if (!is_reality_mode_) {
reader_ptr =
std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
} else {
// ...
}
}

可以看到,在模拟模式下,如果有 n 个信道(这里 n = 2),初始化程序会先给后 n-1 个信道创建 IntraReader 对象(日后我们再讨论这些东西),然后对于 Reader<M0> 信道,它会创建一个特殊的回调函数,该回调函数的基本情况如下:

  • 在信道 0 接收到消息时触发
  • 触发时,函数会从其他 n-1 个信道的 IntraReaderBlocker::published_msg_queue_ 队列中各拿出 1 个消息,并把这些消息一起交给 Process() 函数执行
  • 该函数在创建 IntraReader 对象时,就被当成参数传入,并在IntraReader::init() 中被注册到该 Blocker 对象内的回调函数表中(日后讨论+1)

接下来看看初始化函数的最后一部分,我们现在只考虑真实模式,因为此部分代码只有在真实模式下进行。首先,获取了调度器单例对象,并建立的 func ,而 func 内容也很简单,就是在线程安全的前提下直接调用 Porcess()Process() 会调用用户自己定义的函数 Proc(),进而处理组件接收到的所有消息。

1
2
3
4
5
6
7
8
9
auto func = [self](const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1);
} else {
AERROR << "Component object has been destroyed.";
}
};

func 会被当做参数传给 CRoutine::CreateRoutineFactory ,看这名字就知道,该函数用于创建协程工厂(工厂模式),此外该函数还涉及了消息融合,数据访问和数据分发等等,我们先略过不说。复杂的代码理解不了,还是看一个简单点的代码吧:

1
2
3
4
5
6
template <typename Function>
RoutineFactory CreateRoutineFactory(Function&& f) {
RoutineFactory factory;
factory.create_routine = [f = std::forward<Function&&>(f)]() { return f; };
return factory;
}

很简单的代码,只是设定了一下 factory.create_routine。在返回 factory 后,调用了 Scheduler::CreateTask。此后,又继续调用一个函数,注意:CreateTask 的第一个参数 func 就是在 Component::Initialize() 中创建的原函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool Scheduler::CreateTask(const RoutineFactory& factory, const std::string& name) {
return CreateTask(factory.create_routine(), name, factory.GetDataVisitor());
}

bool Scheduler::CreateTask(std::function<void()>&& func,
const std::string& name,
std::shared_ptr<DataVisitorBase> visitor) {
if (cyber_unlikely(stop_.load())) {
/* 错误处理 */
}
auto task_id = GlobalData::RegisterTaskName(name);
auto cr = std::make_shared<CRoutine>(func); // 看这里!!!!
cr->set_id(task_id);
cr->set_name(name);
if (!DispatchTask(cr))
return false;
if (visitor != nullptr) {
visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load()))
return;
this->NotifyProcessor(task_id);
});
}
return true;
}

哇哈,终于抓住你了,看来 Component::Initialize() 中建立的 func ,最终会被 Cyber RT 用来创建一个协程。然后放入到 Scheduler 中,并根据我之前介绍的 Cyber RT 调度策略运行。在真实模式下,组件创建了一个特殊的回调函数,该函数对从信道接受来的消息进行处理。该函数最终会被封装为一个协程,并在 Scheduler 类的安排下执行。

定时组件类

定时组件类与组件类有所不同,它比组件类多了定时器,这部分内容我在 Cyber RT 定时器中已经提过,为了这篇博客的完整性,我再次强调一下。

Process() 函数和组件类一样,平平无奇。重点看一下 Initialize() 函数

  1. 创建 Node 类对象
  2. 读取配置文件
  3. 创建定时器对象,并开始计时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool TimerComponent::Initialize(const TimerComponentConfig& config) {
if (!config.has_name() || !config.has_interval()) {
/* 错误处理 */
}
node_.reset(new Node(config.name()));
LoadConfigFiles(config);
if (!Init())
return false;
std::shared_ptr<TimerComponent> self =
std::dynamic_pointer_cast<TimerComponent>(shared_from_this());
auto func = [self]() { self->Proc(); };
timer_.reset(new Timer(config.interval(), func, false));
timer_->Start();
return true;
}

注意到,与组件类不同,定时组件类在 Initialize() 中没有创建任何的 Reader 读者,也没有搞出调度器、协程工厂、创建任务等一系列复杂的操作。你可能有这样的疑问:既然定时组件类并没有创建出任务和协程,那么定时组件类的处理函数需要如何被调用呢?Cyber RT 定时器中其实已经有解答了🤪。简单来说,所有的定时器会把任务全部交给时间轮处理,过一段时间后,当时间轮发现需要执行某些定时任务时,就会把它们全部取出,然后调用 cyber::Async 异步地执行。

总结

今天我重点研究了组件类,分析了组件类的继承图关系以及它们的成员变量,并着重探究了组件类的初始化过程,进而对:信道与读者类、节点类与 NodeChannelImpl 类、调度器、数据访问类、协程、协程工厂函数等等有了一个大致的了解。

在组件类中,最重要的两个可调用函数就是 Process()Initialize() ,然而用户不可以对它们直接进行更改,必须通过重载 Proc()Init() 才能操控组件。对于如何创建一个组件类,官方的解释非常详细2。在对 Initialize() 函数的进一步的研究中,我发现在真实模式下,初始化工作主要有:为每个信道创建一个 Reader 对象,然后创建回调函数用于调用 Process(),最后创建出对应的协程,让 Scheduler 来运行管理,而在模拟模式下,数据主要是从 Blocker 类中取得的历史数据,而非感知器获得的真实数据。

我承认这篇博客很多地方没有解释很清楚,这一方面是因为篇幅限制,另一方面是因为我还未对 Cyber RT 理解透彻(尤其是通信部分),相信在课题组其他成员的帮助下,了解 Apollo 系统的真面目已经不远了。

参考

[1] Cyber RT Documents

[2] 如何使用 Cyber RT 来创建一个新的组件

[3] Cyber RT Terms


Apollo Cyber RT 组件
https://dingfen.github.io/2020/10/25/2020-10-25-CyberComponent/
作者
Bill Ding
发布于
2020年10月25日
更新于
2024年4月9日
许可协议