Apollo Cyber RT 通信(下)
Apollo Cyber RT 通信(下)
前言
欢迎回来!今天我继续和大家聊 Cyber RT 的通信,上文我探讨了 Cyber RT 的两种通信方式和三种通信模型,并从通信架构的角度,一层层地给大家详细介绍了通信时代码的具体工作情况。由于通信内容过多,全放在一篇博客理论太长,于是我将这些内容简单地一分为二,事实上它们应当是有机的整体。
上文末尾,我介绍了 Blocker 类的功能,今天这篇博客,我们继续向深处进军↖(^ω^)↗。
我们在上文已经认识了 Reader 和 Writer ,现在继续往下走。如果你仔细看代码的话,在 Reader 和 Writer 初始化时,都会分别构建好底层的 Receiver 和 Transmitter 对象。为了描述简便,我之前有意地忽略了,现在把它拿出来。Reader 部分的比较复杂,还使用了 ReceiverManager 进行管理;Writer 就比较直接了,在 Init() 函数中可以直接看到。
1 | |
我们不妨把这里作为突破口,打开新世界的大门。
ReceiverManager
之前提到过,Reader 在初始化时,需要用 ReceiverManager::GetReceiver() 获得 Receiver 对象。它的内部分封装了一个 unordered_map 表,将信道名字和与之对应的 Receiver 对象保存在表中。再看看下面的代码,可得出一个结论,如果同一个进程内,不同的 Reader 对象订阅同一个信道,事实上使用的是同一个 Receiver 2。再看看那个很长的 CreateReceiver() 函数调用,除了传递一个配置信息参数外,还有一个很长的回调函数。回调函数会做:
- 加入一个 Transport 事件,类型为 Dispatch
- 调用数据分发器
DataDispatcher::Dispatch()函数 - 加入一个 Transport 事件,类型为 Notify
这些步骤具体做了什么?我们目前还不知道,求知的欲望驱使着我们继续往下探索。
1 | |
Transport
Transport 类,单例模式,有如下指针成员,emm……在还没完全弄懂底层代码的情况下,也很难告诉你们这些类的具体作用:
Participant类 FastRtps 相关Notifier类 与 Shm 相关IntraDispatcher类 Intra 的分发器ShmDispatcher类 Shm 的分发器RtpsDispatcher类 Rtps 的分发器
1 | |
和之前 Cyber RT 给人的感觉一样,一旦它要创建什么重要的东西,不调用个几层是根本不可能完成的。在这里,Transport 类的两个函数 CreateTransmitter 和 CreateReceiver 都会根据传入的 mode ,去构造出对应的子类对象,分别对应我在这篇博客开头提到的四种不同场景下的传输实现类。哦,提醒一下,默认的 mode 是 Hybrid ,也就是三种模式混用。
1 | |
Transmitter 类是写消息,而 Receiver 类是读消息,因此 Receiver 类比 Transmitter 类多了一个参数 MessageListener ,其本质就是个回调函数:
1 | |
1 | |
最后一部分代码:在 Receiver 对象被创建时,只要模式不是 Hybrid,都会立刻调用 Receiver::Enable() 函数开启接收。
Receiver & Transmitter
这两个类是 EndPoint 的子类。关于 Endpoint 类5,那可就是整个架构的类继承的终点了,其内部有三个成员
bool enabled_用来标记是否被启用Identity id_用于标识,对于每个Endpoint对象拥有唯一的 id 号,其子类也是用这个来进行标识RoleAttributes attr_用来记录配置文件中的数据。
其中 Receiver 类只有一个回调函数 msg_listener_,该回调函数就是 Receiver 构造时传入的函数。在新消息到来时,会被调用👇:
1 | |
在 ReceiverManager 那一小节中,已经说到该函数有三个步骤,其中5,调用 DataDispatcher::Dispatch 非常关键。因为从这步可以看出上层和底层最终完成了闭环。当底层 Receiver 的回调函数 msg_listener 收到消息被调用时,上层的分发器 DataDispatcher 会把来自底层的消息发到等待的消息缓存里,然后调用上层的通知器 DataNotifier::Notify() ,唤醒对应的 Component 的封装了 Process() 的协程,让协程处理这些消息。
再看看 Receiver 的四个子类,每个子类都包含了相应的 Dispatcher 的指针,例如,RtpsReceiver 类含有 RtpsDispatcherPtr 成员。这些 Dispatcher 的功能就是增删监听者,从而让 Receiver 关闭或开启,比如:
1 | |
简单地介绍一下 Dispatcher 类(别和下面的 DataDispatcher 搞混了)。它主要负责记录一个 channel_id 和对应 ListenerHandlerBasePtr 的关系表。而AddListener() 和 RemoveListener() 函数是从关系表中,拿到给定信道的对应 ListenerHandlerBase ,并在这上面连接(Connect)或不连接(Disconnect)相应的回调函数。由于时间精力有限,这边解释得比较混乱,若想具体了解其工作机制,可以参考文章最后的文献。总的来说,这有点像在 Qt 中实现的信号槽机制:信号在特定情况下被发射出去,对应的信号响应函数在槽中监听。信号与槽通过 Connect 函数关联,一个信号可以发射到多个槽,多个信号可以被一个槽监听。
再来看看 Transmitter 类,它是真正的数据写者的基类。它有两个成员,分别为序列号 seq_num_ 和消息信息 msg_info_ 。
1 | |
我主要研究其中的 Transmit() 函数,其四个子类都具体实现了 Transmit() 函数,如果你仔细看过前一篇博客,就知道这也是 Writer 类一直在调用的函数。那么这个 Transmit() 函数有什么具体步骤呢?
Writer调用Transmitter::Transmit()函数- 设置
msg_info::seq_num = NextSeqNum消息的序列号 - 加入一个 Transport事件,类型为 Transmit Begin
- 调用子类实现的
Transmit()函数,该函数通过传入一条消息即可以完成数据写入任务。
1 | |
好,我对 Receiver 和 Transmitter 类的介绍就到这里了。我不打算再继续介绍它们的子类以及更加底层的设计是如何实现的,因为这边牵扯到非常多的技术细节,弄懂这些会消耗非常多的精力,而且这些东西也远远超出了课题组目前的要求。
Data 部分
通信架构的内容已经全部介绍完了(至少对于发布—订阅通信方式来说),但还是感觉缺了什么东西,把这两者有效地连接起来😅。没错,我至今还没有说清楚,Writer 发布的消息是如何让 Reader 看到的。而这就牵扯到 cyber/data 中实现的类了。
DataVisitor
先来说说数据访问类 DataVisitor ,它是 DataVisitorBase 的子类。它的主要成员和构造函数如下。先来说几个明显可以得到的结论:
DataVisitor对象都会有若干个缓存类ChannelBuffer,还有一个DataFusion对象,融合了所有的消息类型- 在初始化的时候,首先构建这些
ChannelBuffer,然后它们都会被加入到相应类型的DataDispatcher的管理中 data_notifier_(存在于DataVisitorBase中),会向信道 0 加入一个notifier_,类型为struct Notifier { std::function<void()> callback; };,这表明信道 0 注定与其他信道不一般 :thinking:data_fusion_会构建并指向AllLatest对象,从类型来看,它们整合了所有的消息类型,应当用于信息的最后收集发送
1 | |
该类主要用于消息数据的访问,存放到来的消息数据,并提供接口供消息读取。事实上,我们之前在讨论组件的初始化的最后部分时见过它,也在订阅者初始化时遇见过它,但当时我都有意地略过了。现在我们重拾这部分内容(为了简单且不失一般性,我以两个信道的情况为例),把这部分彻底搞明白:
1 | |
上面代码做了:
-
收集了所有的
Reader对象的所读信道 id 和消息队列大小,放入到config_list后就创建DataVisitor对象 -
在上面
DataVisitor类的构造函数中,根据传入的信道 id 和消息队列尺寸,DataVisitor内为其创建了两个ChannelBuffer作为缓冲区- 调用数据分发器将它们加入到
DataDispatcher的管理中 - 调用
DataNotifier::AddNotifier()函数,传入第 0 个读者的信道 id ,加入到DataNotifier的管理中。DataDispatcher与DataNotifier类均为单例,之后我们会对它们做详细介绍 - 创建
DataFusion对象,这也是之后要了解的╮(╯▽╰)╭
- 调用数据分发器将它们加入到
-
创建协程工厂,并构建出要封装为协程的函数:
1
2
3
4
5
6
7
8
9
10
11
12factory.create_routine = [=]() {
return [=]() {
std::shared_ptr<M0> msg0; std::shared_ptr<M1> msg1;
for (;;) {
CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
if (dv->TryFetch(msg0, msg1)) { // 取数据
f(msg0, msg1); // f 函数就是组件初始化时创建的 func 函数
CRoutine::Yield(RoutineState::READY);
} else
CRoutine::Yield();
}
}; };协程做的就是调用
dv->TryFetch()取数据(下文会详细说明),如果成功就调用组件的Process()函数,且协程的状态从等待数据转变为了就绪,而一旦协程就绪,就可以被Processor类运行 -
Scheduler创建任务,在CreateTask()函数中,调用visitor->RegisterNotifyCallback()函数1
2
3
4
5visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load()))
return;
this->NotifyProcessor(task_id);
});上面的函数被赋值给了
DataVisitorBase::notifier_,用于唤醒相应的协程来处理该新消息
说完这部分过程后,我发现我们至少还要理解一下 DataDispatcher 、 DataNotifier 、 DataFusion 和 AllLatest 类😂😂😂。
DataDispatcher
千里之行,始于足下。先来看看数据分发类 DataDispatcher ,顾名思义,它将底层传来的数据进行分发,具体来说,当新消息到来时,通过 Dispatch() 函数把它们放到该信道下的所有消息缓冲区中。它是个单例模式,但是个模板类,意味着每一个消息类型会有对应的一个唯一的 DataDispatcher 对象。类内记录了一个信道 id 与多个 CacheBuffer 对象对应的表。注意到:一个信道可以有多个订阅者 Reader ,每个订阅者拥有一个 CacheBuffer 缓冲区,而这个缓冲区就是之前 DataVisitor 类在构造时给每个消息类型创建的 ChannelBuffer 。
1 | |
DataDispatcher::Dispatch() 函数非常重要,在 ReceiverManager 和 Receiver && Transmitter 中我们已经提到,他是连接上层和底层的最关键一环。在 Receiver 对象每次收到该信道的消息时,就会调用DataDispatcher::Dispatch() 函数分发刚收到的数据,函数会先从表中取出所有对应信道的缓冲区,然后调用 CacheBuffer::Fill() 函数来给缓冲区填数据(稍后介绍这个函数),最后调用 DataNotifier::Notify() 函数,唤醒它们对应的协程来取数据并运行。现在你应该明白,为何 DataVisitor 在构造时,需要把刚刚建立的缓冲区给 DataDispatcher 管理,不然的话,缓冲区拿不到消息啊。
DataNotifier
再来看看 DataNotifier 类。它是个单例模式,类内有一个信道 id 与多个 Notifer 对应的表,这是考虑到一个信道可以有多个订阅者。很显然,前文提到的在 DataVisitor 构造时,调用 AddNotifier() 就是要把自己的 Notifier 存到这个表中。
1 | |
至于 AddNotifier() 函数的实现,emm……很平凡,无非就是找到对应的信道 id ,然后将参数中的 Notifier 放入到数组里(如果没有数组,新建一个)。重要的是唤醒函数 Notify() ,该函数内部会调用 notifier->callback() ,回顾一下,这个 notifier 是在 Scheduler 创建任务时被设置的,内含有 NotifyProcessor() 函数,可以唤醒协程。在 Receiver && Transmitter 和 ReceiverManager 也提到,第二步的分发器最终会调用该函数,唤醒所有监听该信道的协程,来处理到来的消息。这样,你也就明白为什么 DataVisitor 类在构造时要把 notifier 加入进去了,不然的话信道来了个消息,就没法唤醒协程,Reader 就不知道了呀。
1 | |
DataFusion & AllLatest
再来看看 DataVisitor 构造函数的最后一步,创建 DataFusion 对象,看看名字,就应该明白该对象用于信道数据的融合。DataFusion 是 AllLatest 的基类,DataFusion 类十分简单,仅提供了一个 Fusion() 接口,具体由 AllLatest 实现。所以,我们重点看一下 AllLatest 类,哈,听名字就知道它会取所有信道中的最新值,再结合它是 DataFusion 的子类,所以主要功能应该是融合多个信道的最新数据。
我还是以两个信道的情况为例,该类成员有几个 ChannelBuffer 类,其中一个比较特殊,类型是数据融合的 buffer_fusion_ 。
1 | |
在构造函数中,特殊的信道 0 的消息缓冲区会调用 SetFusionCallback() 来设置回调函数 :point_down:,为方便表述,我给它起名 FusionFunc 。从下面的代码中看出, FusionFunc 先判断是否所有信道都有消息,并获取最新的消息,如果都有消息的话就将这些消息融合,即用 std::tuple 封装,再调用 Fill() 函数填入到 buffer_fusion_ 的 CacheBuffer 中。
1 | |
何时调用 FusionFunc 呢?答案在这个 Fill() 函数(见下代码)中,它诡计多端——如果 CacheBuffer 有回调函数 FusionFunc,会调用回调函数;如果没有,会把接收到的数据放入缓冲区中。很显然在上面的构造函数中,只有信道 0 设置了回调函数 FusionFunc。因此当信道 0 有数据到来, DataDispatcher::Dispatch() 被调用时(代码见 DataDispatcher 部分),进而调用 Fill() 函数时, FusionFunc 才会被调用,将最新的消息融合,并将融合的消息填入到 buffer_fusion_ 中。而其他信道的数据到来时, Fill() 函数只是单纯往对应的 CacheBuffer 中填数据。
1 | |
在协程的处理函数中(回顾一下, DataVisitor 的创建协程工厂中提到的)会调用 DataVisitor::TryFetch() 函数,再调用 Fusion() 函数(代码如下),它从融合数据的缓冲区 buffer_fusion_ 中拿走(Fetch)融合消息,这也就意味着同时拿多个信道上的最新消息,保证了每次给 Component::Process() 函数的参数都必须“全员到齐”,并且所有信息都是最新的。综上所述,只有信道 0 收到消息后,才会融合其他信道的消息,往往主导通信处理的节奏,因此信道 0 的选取就比较关键了。
1 | |
ChannelBuffer & CacheBuffer
前文我一直提到一个词,叫做缓冲区。事实上这个词具体指向了两个类,ChannelBuffer 和 CacheBuffer 类。为了让读者更好地理解“缓冲区”这个词,我简要地介绍一下这两个类。ChannelBuffer 类包含了两个成员:信道 id 和 指向 CacheBuffer 的指针。它的函数 Fetch() 和 Latest() 分别用于取对应索引的消息和取得最新消息。而 CacheBuffer 类,其实质就是一个循环队列,用来放置某个信道产生的数据。需要注意的是,CacheBuffer 占用的内存是恒定的,因为里面的数组长度一开始就被限定了,所以,一旦缓冲区装满了,它会毫不犹豫地丢弃最旧的消息,推入最新的消息。具体的队列实现在 cyber/data/cache_buffer.h 和 cyber/data/channel_buffer.h,很简单,有兴趣可以直接读代码。
总结
(⊙o⊙)哦终于,我把 Cyber RT 所有的通信机制都看完了,哦,BTW,上篇通信链接在这里。现在,在把所有的内容都搞明白后,让我们理一理自己昏沉的头脑,跳脱出代码的边边框框,从上帝视角审视一下数据是如何流通的。
这里有两张非常好的图,感谢 [2],我可以不用自己动手,花费数小时画图了😀

在上面这张图中,仍然以两个信道的 Component 为例,从左上角出发:
Component初始化建成了两个Reader对象,然后创建了一个DataVisitor<M0, M1>对象- 两个
Reader对象也分别创建了一个DataVisitor<>对象,回顾一下上篇提到的Reader的初始化过程 - 这些
DataVisitor对象会分别创建出ChannelBuffer,并使用DataDispatcher管理这些缓冲区(注意看它内部的表) - 当接收到新消息后,
DataDispatcher会给对应的信道上的所有缓冲区进行分派,特别地,若信道 0 有新消息,还会对其他消息进行融合(注意看AllLatest对象) - 之后,
DataVisitor会使用DataNotifier对象,唤醒相应的协程,处理收到的数据
现在,我们再来看看底层的通信架构,虽然有些部分我略去未讲,但这张图我们还是可以看明白的🐶。这次,我们从最右边开始看起。

- 可以看出两个
Reader对象共用了一个Receiver。这是因为在同一个进程内,不同的Reader对象订阅同一个信道,就会使用同一个Receiver - 默认选择的
Receiver是 hybrid 的,因此需要三个底层接收类IntraReceiver、ShmReceiver和RtpsReceiver配合 - 在创建
Receiver时,监听者处理函数msg_listener_就已经“准备就绪”了。 Dispatcher的表中记录了监听者和它们负责的信道,并把它们连接Connect起来,如同 Qt 中的信号槽机制- 一有新消息到达(最左边的函数是我陌生的),那么就会立刻触发信号槽机制,调用
msg_listener函数,之后就是上层DataDispatcher的工作了
最后的最后,事实上我还是落了一个东西:服务—客户通信方式😓😓😂😂。的确,这篇博客的内容全是关于发布—订阅通信方式的,对于 Service 和 Client ,几乎没有提及,之后,我就会补上这一部分服务发现的内容。
参考
[2] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通信传输
[3] 百度Apollo系统学习-Cyber RT 通信-上层
[4] 百度 Apollo Cyber RT简介、基本概念以及与 ROS 对照
[5] 百度Apollo系统学习-Cyber RT 通信-底层
[7] 自动驾驶汽车平台技术基础/杨世春等编著. —北京:清华大学出版社