Apollo Cyber RT 通信(下)
Apollo Cyber RT 通信(下)
前言
欢迎回来!今天我继续和大家聊 Cyber RT 的通信,上文我探讨了 Cyber RT 的两种通信方式和三种通信模型,并从通信架构的角度,一层层地给大家详细介绍了通信时代码的具体工作情况。由于通信内容过多,全放在一篇博客理论太长,于是我将这些内容简单地一分为二,事实上它们应当是有机的整体。
上文末尾,我介绍了 Blocker
类的功能,今天这篇博客,我们继续向深处进军↖(^ω^)↗。
我们在上文已经认识了 Reader
和 Writer
,现在继续往下走。如果你仔细看代码的话,在 Reader
和 Writer
初始化时,都会分别构建好底层的 Receiver
和 Transmitter
对象。为了描述简便,我之前有意地忽略了,现在把它拿出来。Reader
部分的比较复杂,还使用了 ReceiverManager
进行管理;Writer
就比较直接了,在 Init()
函数中可以直接看到。
1 |
|
我们不妨把这里作为突破口,打开新世界的大门。
ReceiverManager
之前提到过,Reader
在初始化时,需要用 ReceiverManager::GetReceiver()
获得 Receiver
对象。它的内部分封装了一个 unordered_map
表,将信道名字和与之对应的 Receiver
对象保存在表中。再看看下面的代码,可得出一个结论,如果同一个进程内,不同的 Reader
对象订阅同一个信道,事实上使用的是同一个 Receiver
2。再看看那个很长的 CreateReceiver()
函数调用,除了传递一个配置信息参数外,还有一个很长的回调函数。回调函数会做:
- 加入一个 Transport 事件,类型为 Dispatch
- 调用数据分发器
DataDispatcher::Dispatch()
函数 - 加入一个 Transport 事件,类型为 Notify
这些步骤具体做了什么?我们目前还不知道,求知的欲望驱使着我们继续往下探索。
1 |
|
Transport
Transport
类,单例模式,有如下指针成员,emm……在还没完全弄懂底层代码的情况下,也很难告诉你们这些类的具体作用:
Participant
类 FastRtps 相关Notifier
类 与 Shm 相关IntraDispatcher
类 Intra 的分发器ShmDispatcher
类 Shm 的分发器RtpsDispatcher
类 Rtps 的分发器
1 |
|
和之前 Cyber RT 给人的感觉一样,一旦它要创建什么重要的东西,不调用个几层是根本不可能完成的。在这里,Transport
类的两个函数 CreateTransmitter
和 CreateReceiver
都会根据传入的 mode ,去构造出对应的子类对象,分别对应我在这篇博客开头提到的四种不同场景下的传输实现类。哦,提醒一下,默认的 mode 是 Hybrid ,也就是三种模式混用。
1 |
|
Transmitter
类是写消息,而 Receiver
类是读消息,因此 Receiver
类比 Transmitter
类多了一个参数 MessageListener
,其本质就是个回调函数:
1 |
|
1 |
|
最后一部分代码:在 Receiver
对象被创建时,只要模式不是 Hybrid,都会立刻调用 Receiver::Enable()
函数开启接收。
Receiver & Transmitter
这两个类是 EndPoint
的子类。关于 Endpoint
类5,那可就是整个架构的类继承的终点了,其内部有三个成员
bool enabled_
用来标记是否被启用Identity id_
用于标识,对于每个Endpoint
对象拥有唯一的 id 号,其子类也是用这个来进行标识RoleAttributes attr_
用来记录配置文件中的数据。
其中 Receiver
类只有一个回调函数 msg_listener_
,该回调函数就是 Receiver
构造时传入的函数。在新消息到来时,会被调用👇:
1 |
|
在 ReceiverManager 那一小节中,已经说到该函数有三个步骤,其中5,调用 DataDispatcher::Dispatch
非常关键。因为从这步可以看出上层和底层最终完成了闭环。当底层 Receiver
的回调函数 msg_listener
收到消息被调用时,上层的分发器 DataDispatcher
会把来自底层的消息发到等待的消息缓存里,然后调用上层的通知器 DataNotifier::Notify()
,唤醒对应的 Component
的封装了 Process()
的协程,让协程处理这些消息。
再看看 Receiver
的四个子类,每个子类都包含了相应的 Dispatcher 的指针,例如,RtpsReceiver
类含有 RtpsDispatcherPtr
成员。这些 Dispatcher 的功能就是增删监听者,从而让 Receiver
关闭或开启,比如:
1 |
|
简单地介绍一下 Dispatcher
类(别和下面的 DataDispatcher
搞混了)。它主要负责记录一个 channel_id
和对应 ListenerHandlerBasePtr
的关系表。而AddListener()
和 RemoveListener()
函数是从关系表中,拿到给定信道的对应 ListenerHandlerBase
,并在这上面连接(Connect)或不连接(Disconnect)相应的回调函数。由于时间精力有限,这边解释得比较混乱,若想具体了解其工作机制,可以参考文章最后的文献。总的来说,这有点像在 Qt 中实现的信号槽机制:信号在特定情况下被发射出去,对应的信号响应函数在槽中监听。信号与槽通过 Connect 函数关联,一个信号可以发射到多个槽,多个信号可以被一个槽监听。
再来看看 Transmitter
类,它是真正的数据写者的基类。它有两个成员,分别为序列号 seq_num_
和消息信息 msg_info_
。
1 |
|
我主要研究其中的 Transmit()
函数,其四个子类都具体实现了 Transmit()
函数,如果你仔细看过前一篇博客,就知道这也是 Writer
类一直在调用的函数。那么这个 Transmit()
函数有什么具体步骤呢?
Writer
调用Transmitter::Transmit()
函数- 设置
msg_info::seq_num = NextSeqNum
消息的序列号 - 加入一个 Transport事件,类型为 Transmit Begin
- 调用子类实现的
Transmit()
函数,该函数通过传入一条消息即可以完成数据写入任务。
1 |
|
好,我对 Receiver
和 Transmitter
类的介绍就到这里了。我不打算再继续介绍它们的子类以及更加底层的设计是如何实现的,因为这边牵扯到非常多的技术细节,弄懂这些会消耗非常多的精力,而且这些东西也远远超出了课题组目前的要求。
Data 部分
通信架构的内容已经全部介绍完了(至少对于发布—订阅通信方式来说),但还是感觉缺了什么东西,把这两者有效地连接起来😅。没错,我至今还没有说清楚,Writer
发布的消息是如何让 Reader
看到的。而这就牵扯到 cyber/data
中实现的类了。
DataVisitor
先来说说数据访问类 DataVisitor
,它是 DataVisitorBase
的子类。它的主要成员和构造函数如下。先来说几个明显可以得到的结论:
DataVisitor
对象都会有若干个缓存类ChannelBuffer
,还有一个DataFusion
对象,融合了所有的消息类型- 在初始化的时候,首先构建这些
ChannelBuffer
,然后它们都会被加入到相应类型的DataDispatcher
的管理中 data_notifier_
(存在于DataVisitorBase
中),会向信道 0 加入一个notifier_
,类型为struct Notifier { std::function<void()> callback; };
,这表明信道 0 注定与其他信道不一般 :thinking:data_fusion_
会构建并指向AllLatest
对象,从类型来看,它们整合了所有的消息类型,应当用于信息的最后收集发送
1 |
|
该类主要用于消息数据的访问,存放到来的消息数据,并提供接口供消息读取。事实上,我们之前在讨论组件的初始化的最后部分时见过它,也在订阅者初始化时遇见过它,但当时我都有意地略过了。现在我们重拾这部分内容(为了简单且不失一般性,我以两个信道的情况为例),把这部分彻底搞明白:
1 |
|
上面代码做了:
-
收集了所有的
Reader
对象的所读信道 id 和消息队列大小,放入到config_list
后就创建DataVisitor
对象 -
在上面
DataVisitor
类的构造函数中,根据传入的信道 id 和消息队列尺寸,DataVisitor
内为其创建了两个ChannelBuffer
作为缓冲区- 调用数据分发器将它们加入到
DataDispatcher
的管理中 - 调用
DataNotifier::AddNotifier()
函数,传入第 0 个读者的信道 id ,加入到DataNotifier
的管理中。DataDispatcher
与DataNotifier
类均为单例,之后我们会对它们做详细介绍 - 创建
DataFusion
对象,这也是之后要了解的╮(╯▽╰)╭
- 调用数据分发器将它们加入到
-
创建协程工厂,并构建出要封装为协程的函数:
1
2
3
4
5
6
7
8
9
10
11
12factory.create_routine = [=]() {
return [=]() {
std::shared_ptr<M0> msg0; std::shared_ptr<M1> msg1;
for (;;) {
CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
if (dv->TryFetch(msg0, msg1)) { // 取数据
f(msg0, msg1); // f 函数就是组件初始化时创建的 func 函数
CRoutine::Yield(RoutineState::READY);
} else
CRoutine::Yield();
}
}; };协程做的就是调用
dv->TryFetch()
取数据(下文会详细说明),如果成功就调用组件的Process()
函数,且协程的状态从等待数据转变为了就绪,而一旦协程就绪,就可以被Processor
类运行 -
Scheduler
创建任务,在CreateTask()
函数中,调用visitor->RegisterNotifyCallback()
函数1
2
3
4
5visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load()))
return;
this->NotifyProcessor(task_id);
});上面的函数被赋值给了
DataVisitorBase::notifier_
,用于唤醒相应的协程来处理该新消息
说完这部分过程后,我发现我们至少还要理解一下 DataDispatcher
、 DataNotifier
、 DataFusion
和 AllLatest
类😂😂😂。
DataDispatcher
千里之行,始于足下。先来看看数据分发类 DataDispatcher
,顾名思义,它将底层传来的数据进行分发,具体来说,当新消息到来时,通过 Dispatch()
函数把它们放到该信道下的所有消息缓冲区中。它是个单例模式,但是个模板类,意味着每一个消息类型会有对应的一个唯一的 DataDispatcher
对象。类内记录了一个信道 id 与多个 CacheBuffer
对象对应的表。注意到:一个信道可以有多个订阅者 Reader
,每个订阅者拥有一个 CacheBuffer
缓冲区,而这个缓冲区就是之前 DataVisitor
类在构造时给每个消息类型创建的 ChannelBuffer
。
1 |
|
DataDispatcher::Dispatch()
函数非常重要,在 ReceiverManager 和 Receiver && Transmitter 中我们已经提到,他是连接上层和底层的最关键一环。在 Receiver
对象每次收到该信道的消息时,就会调用DataDispatcher::Dispatch()
函数分发刚收到的数据,函数会先从表中取出所有对应信道的缓冲区,然后调用 CacheBuffer::Fill()
函数来给缓冲区填数据(稍后介绍这个函数),最后调用 DataNotifier::Notify()
函数,唤醒它们对应的协程来取数据并运行。现在你应该明白,为何 DataVisitor
在构造时,需要把刚刚建立的缓冲区给 DataDispatcher
管理,不然的话,缓冲区拿不到消息啊。
DataNotifier
再来看看 DataNotifier
类。它是个单例模式,类内有一个信道 id 与多个 Notifer
对应的表,这是考虑到一个信道可以有多个订阅者。很显然,前文提到的在 DataVisitor
构造时,调用 AddNotifier()
就是要把自己的 Notifier
存到这个表中。
1 |
|
至于 AddNotifier()
函数的实现,emm……很平凡,无非就是找到对应的信道 id ,然后将参数中的 Notifier
放入到数组里(如果没有数组,新建一个)。重要的是唤醒函数 Notify()
,该函数内部会调用 notifier->callback()
,回顾一下,这个 notifier
是在 Scheduler
创建任务时被设置的,内含有 NotifyProcessor()
函数,可以唤醒协程。在 Receiver && Transmitter 和 ReceiverManager 也提到,第二步的分发器最终会调用该函数,唤醒所有监听该信道的协程,来处理到来的消息。这样,你也就明白为什么 DataVisitor
类在构造时要把 notifier
加入进去了,不然的话信道来了个消息,就没法唤醒协程,Reader
就不知道了呀。
1 |
|
DataFusion & AllLatest
再来看看 DataVisitor
构造函数的最后一步,创建 DataFusion
对象,看看名字,就应该明白该对象用于信道数据的融合。DataFusion
是 AllLatest
的基类,DataFusion
类十分简单,仅提供了一个 Fusion()
接口,具体由 AllLatest
实现。所以,我们重点看一下 AllLatest
类,哈,听名字就知道它会取所有信道中的最新值,再结合它是 DataFusion
的子类,所以主要功能应该是融合多个信道的最新数据。
我还是以两个信道的情况为例,该类成员有几个 ChannelBuffer
类,其中一个比较特殊,类型是数据融合的 buffer_fusion_
。
1 |
|
在构造函数中,特殊的信道 0 的消息缓冲区会调用 SetFusionCallback()
来设置回调函数 :point_down:,为方便表述,我给它起名 FusionFunc
。从下面的代码中看出, FusionFunc
先判断是否所有信道都有消息,并获取最新的消息,如果都有消息的话就将这些消息融合,即用 std::tuple
封装,再调用 Fill()
函数填入到 buffer_fusion_
的 CacheBuffer
中。
1 |
|
何时调用 FusionFunc
呢?答案在这个 Fill()
函数(见下代码)中,它诡计多端——如果 CacheBuffer
有回调函数 FusionFunc
,会调用回调函数;如果没有,会把接收到的数据放入缓冲区中。很显然在上面的构造函数中,只有信道 0 设置了回调函数 FusionFunc
。因此当信道 0 有数据到来, DataDispatcher::Dispatch()
被调用时(代码见 DataDispatcher 部分),进而调用 Fill()
函数时, FusionFunc
才会被调用,将最新的消息融合,并将融合的消息填入到 buffer_fusion_
中。而其他信道的数据到来时, Fill()
函数只是单纯往对应的 CacheBuffer
中填数据。
1 |
|
在协程的处理函数中(回顾一下, DataVisitor 的创建协程工厂中提到的)会调用 DataVisitor::TryFetch()
函数,再调用 Fusion()
函数(代码如下),它从融合数据的缓冲区 buffer_fusion_
中拿走(Fetch)融合消息,这也就意味着同时拿多个信道上的最新消息,保证了每次给 Component::Process()
函数的参数都必须“全员到齐”,并且所有信息都是最新的。综上所述,只有信道 0 收到消息后,才会融合其他信道的消息,往往主导通信处理的节奏,因此信道 0 的选取就比较关键了。
1 |
|
ChannelBuffer & CacheBuffer
前文我一直提到一个词,叫做缓冲区。事实上这个词具体指向了两个类,ChannelBuffer
和 CacheBuffer
类。为了让读者更好地理解“缓冲区”这个词,我简要地介绍一下这两个类。ChannelBuffer
类包含了两个成员:信道 id 和 指向 CacheBuffer
的指针。它的函数 Fetch()
和 Latest()
分别用于取对应索引的消息和取得最新消息。而 CacheBuffer
类,其实质就是一个循环队列,用来放置某个信道产生的数据。需要注意的是,CacheBuffer
占用的内存是恒定的,因为里面的数组长度一开始就被限定了,所以,一旦缓冲区装满了,它会毫不犹豫地丢弃最旧的消息,推入最新的消息。具体的队列实现在 cyber/data/cache_buffer.h
和 cyber/data/channel_buffer.h
,很简单,有兴趣可以直接读代码。
总结
(⊙o⊙)哦终于,我把 Cyber RT 所有的通信机制都看完了,哦,BTW,上篇通信链接在这里。现在,在把所有的内容都搞明白后,让我们理一理自己昏沉的头脑,跳脱出代码的边边框框,从上帝视角审视一下数据是如何流通的。
这里有两张非常好的图,感谢 [2],我可以不用自己动手,花费数小时画图了😀
在上面这张图中,仍然以两个信道的 Component
为例,从左上角出发:
Component
初始化建成了两个Reader
对象,然后创建了一个DataVisitor<M0, M1>
对象- 两个
Reader
对象也分别创建了一个DataVisitor<>
对象,回顾一下上篇提到的Reader
的初始化过程 - 这些
DataVisitor
对象会分别创建出ChannelBuffer
,并使用DataDispatcher
管理这些缓冲区(注意看它内部的表) - 当接收到新消息后,
DataDispatcher
会给对应的信道上的所有缓冲区进行分派,特别地,若信道 0 有新消息,还会对其他消息进行融合(注意看AllLatest
对象) - 之后,
DataVisitor
会使用DataNotifier
对象,唤醒相应的协程,处理收到的数据
现在,我们再来看看底层的通信架构,虽然有些部分我略去未讲,但这张图我们还是可以看明白的🐶。这次,我们从最右边开始看起。
- 可以看出两个
Reader
对象共用了一个Receiver
。这是因为在同一个进程内,不同的Reader
对象订阅同一个信道,就会使用同一个Receiver
- 默认选择的
Receiver
是 hybrid 的,因此需要三个底层接收类IntraReceiver
、ShmReceiver
和RtpsReceiver
配合 - 在创建
Receiver
时,监听者处理函数msg_listener_
就已经“准备就绪”了。 Dispatcher
的表中记录了监听者和它们负责的信道,并把它们连接Connect
起来,如同 Qt 中的信号槽机制- 一有新消息到达(最左边的函数是我陌生的),那么就会立刻触发信号槽机制,调用
msg_listener
函数,之后就是上层DataDispatcher
的工作了
最后的最后,事实上我还是落了一个东西:服务—客户通信方式😓😓😂😂。的确,这篇博客的内容全是关于发布—订阅通信方式的,对于 Service
和 Client
,几乎没有提及,之后,我就会补上这一部分服务发现的内容。
参考
[2] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通信传输
[3] 百度Apollo系统学习-Cyber RT 通信-上层
[4] 百度 Apollo Cyber RT简介、基本概念以及与 ROS 对照
[5] 百度Apollo系统学习-Cyber RT 通信-底层
[7] 自动驾驶汽车平台技术基础/杨世春等编著. —北京:清华大学出版社