Apollo Cyber RT 通信(上)
Apollo Cyber RT 通信(上)
前言
在之前的博客中,我和大家讨论了 Apollo Cyber RT 组件的相关内容,在介绍组件的内容时,我们谈到每个组件都有自己的读者信道 Reader<M>
和 Node
节点类对象, DataVisitor
数据访问类在创建协程工厂时也起到了关键作用。由此可见,通信问题已经成为了在学习 Cyber RT 过程中绕不过去的槛儿,那么,Cyber RT 怎么保证这么多的组件能够齐心协力,高效地完成一件件任务呢?今天我就来带领大家探讨一下这类问题。由于通信部分的内容过多,写在一篇博客里太长,所以我把它分成了两部分。
ROS 的历史遗留
之前在研究 Apollo 时,我了解到 Apollo 3.5 版本前,各个模块直接通过一个简单的运行时框架,构建在 ROS 之上。
而之后的版本,都加上了 Cyber RT,它不仅仅是一个运行时框架,还承担了数据通信和任务调度,以及记录日志等任务。但是从软件工程的角度出发,对底层的大改必然会牵动上层,为了尽可能地不影响上层代码,Cyber RT 不得不依照 ROS 的“规矩”,提供有相同名称的功能近似的接口,这之后我们可以将 ROS 与 Cyber RT 进行比较。
于是我就发现,与 ROS & ROS2 中类似的是,Cyber RT 也支持两种通信方式,详情见术语介绍:
- 发布—订阅通信方式(Publish-Subscribe),也叫基于信道的通信方式,常用于数据流处理中节点间通信。即发布者(Publisher)在信道(channel,ROS 中对应地称为 topic)上发布消息,相应的订阅者(Subscriber)便会收到消息数据
- 服务—客户通信方式(Service-Client),也叫基于服务的通信方式,常用于双向的、需要有请求和应答的通信场景。
三种通信模型7
在 Cyber RT 中提供了不同的通信方式,以应对各类自动驾驶需求。根据上层模块所处的进程,可以将模块间的关系分为:
- 同一进程内。在同一个进程节点之间的相互通信,对于进程内的数据,直接传递消息对象的指针,可以避免消息数据复制的开销,尤其是一些较大的消息,如点云和图像等
- 同主机进程间。在不同进程之间的节点传播或交换信息,可以利用共享内存传输,这样不仅可以减少传输中的数据复制,显著提升传输效率,还能够满足一对多的传输场景
- 跨主机。跨主机的数据利用 socket 传输,跨主机通信采用了第三方的开源库 Fast RTPS(Real Time Publish Subscribe Protocol,实时发布订阅协议),是 DDS(Data Distribution Service)标准的一个非常流行的开源实现,支持 RTPS 协议版本的一个订阅发布消息组件,具有高性能,实时性高,多平台支持等优点
通信架构
首先来看一下通信的层级划分(上图)2,再重复一遍,自动驾驶系统中的各个模块基本都由 Component
类实现,而一个 Component
对象包含一个 Node
对象。Node
对象会根据需要创建和管理 Writer
,Reader
,Service
和 Client
对象。而在通信类下面, Trasmitter
和 Receiver
类。前者用于数据发送,后者用于数据接收。它们是数据传输层的抽象,而底下还有多个用于不同场景下的传输实现类,比如对于 Trasmitter
类,就有 IntraTransmitter
类,ShmTransmitter
类,RtpsTransmitter
类和 HybridTransmitter
类。同样地, Receiver
类也有类似的情况 。在这里我就简单地说明一下这些底层类的作用,因为这些东西过于细节,我往后也不会重点研究,但是还是有必要稍微了解一下的😀:
- RTPS 基于 eProsimar 的 Fast RTPS,介绍同上
- Shared memory 共享内存模式
- Intra-Process 用于进程内通信
- Hybrid 混合使用以上几种通信模式
OK,接下来,我们放慢脚步,一层一层地剥开通信的实现情况🐶。
节点 Node
Node
是整个数据拓扑网络中的基本单元。Node
对象会根据需要创建和管理 Writer
,Reader
,Service
和 Client
对象。Reader
和 Writer
,用于发布—订阅模式。Service
和 Client
,用于服务—客户模式。
1 |
|
其中 Node
类的成员变量:NodeChannelImpl
指针和 NodeServiceImpl
指针,对于我们进一步了解通信非常重要,因为在 Node::CreateReader
函数中,Node
使用了 NodeChannelImpl
指针创建 Reader
对象。同理于其他三个,Node
类都是使用相应的指针来创建对象,没错,它们才是幕后黑手。
NodeChannelImpl 与 NodeServiceImpl ——幕后创建者
为方便讨论,我以 NodeChannelImpl
类为例,并从 CreateReader
函数出发,在 Node
类中调用了 NodeChannelImpl::CreateReader()
后,
- 创建
RoleAttributes
对象,把该对象的一些配置数据全部填好(如果你对该对象包含了什么很感兴趣,那么请看cyber/proto/role_attributes.proto
) - 根据现在的模式(真实模式还是模拟模式),来决定如何创建读者类。根据代码,真实模式时使用
Reader<M>
,模拟模式使用IntraReader<M>
1 |
|
令人感到安心的是,NodeChannelImpl
类在创建 Write
对象时,步骤与 Reader
几乎一致,甚至可以说,他们只是把几个关键词换了一下而已。
那么,对于 NodeServiceImpl
类,情况有没有改变呢?emm……情况稍有变化:(以 Service
类为例)
- 直接创建
Service
对象,并进行初始化(初始化里干的事情日后再聊) - 将创建好的
Service
指针放入到数组serivce_list_
中 - 调用服务发现的拓扑管理类
service_discovery::TopologyManager
中的Join()
函数(这边是个大坑,日后再聊+1 🐶)
Reader && Writer
比起服务—客户的通信模式,我更关注发布—订阅模式;比起模拟模式,我更关注真实模式,因此先作重点介绍。
Reader 类
那么兜兜转转,我们终于来到了读者类。首先来看一下 Reader
的基本情况。唔,它继承自 ReaderBase
类,它包含一个 Blocker
和一个 Receiver
对象,它们是我们要重点关注的。
1 |
|
根据我们在上一小节的理解,Reader
对象只会在真实模式下被创建。既然已经说到了创建,我们就先来看一下 Reader
的创建过程吧😀。在 Reader
类的构造函数中,其类的 Blocker
成员也进行构造,这边的过程很简单。而进一步对 Reader
进行初始化,调用 Reader::Init()
函数时,情况就复杂了起来:
- 创建回调函数。用
Reader::Enqueue()
+ 传入的回调函数封装出一个新的回调函数func
- 与组件的初始化过程的最后几步相似,
Reader
类的初始化中,也拿出了Scheduler
类,创建了DataVisitor
对象(通信下篇会对其详细阐述)和协程工厂对象,并用它们创建了一个任务,目的就是将第一步中封装好的回调函数func
包装为协程,最后根据调度算法的安排,在合适的时候调用。此外还把对应的协程名字记录到croutine_name_
中 - 根据
Reader
对象的属性,从接收器管理类ReceiverManager<M>
中取出相对应的Receiver
对象,该对象用于接受消息 - 从
service_discovery::TopologyManager
那里拿到信道管理器channel_manager
,最后把这个Reader
对象加入到拓扑结构中
1 |
|
好,那么 Reader
主要功能是什么呢?根据官方文档6,Reader
类订阅了一个信道,然后就有两个主要功能,这些都牵扯到后面要介绍的类,因此在这里就简单说明一下:
- 传入一个回调函数
CallbackFunc
,用来处理刚刚到达的消息。“处理消息”的意思就是把消息压入Blocker
类的队列中,然后用回调函数处理。 - 可以观察到
Blocker
类中的缓存消息。用户可以使用函数Observe()
将消息从发布队列取出,放入到观察队列中。一个Reader
使用一个ChannelBuffer
,处理了的消息会被存放在这里。
好,除了初始化和主要的功能,暂时不讨论其他 Reader
相关的函数,因为它们要么过于平凡,要么涉及到了服务发现的内容(我无法在一篇博客中讨论这么多东西),接下来,我们可以去看看 Reader
类的反面,Writer
类是怎么做的。
Writer 类
同样,从 Writer
类的组成开始,它继承 WriterBase
类,但组成比 Reader
类简单很多,只有一个 Transmitter
需要我们重点关注。
1 |
|
也一样,Writer
类只会在真实模式下被创建,其构建和初始化过程比 Reader
类对象简单:
- 构建好
WriterBase::role_attr
,把基本的属性数据填充好 - 创建
Transmitter
对象(下文会提到) - 从
service_discovery::TopologyManager
那里拿到信道管理器channel_manager
,最后把该Writer
对象加入到拓扑结构中
1 |
|
Writer
类的主要功能想必大家也猜得到:向对应的信道写数据,其 Write()
函数会调用 transport::Transmitter->Transmit()
函数,每个 Writer
只能向一个信道写东西,但一个信道可以有多个 Writer
对象。
Blocker——缓存者
Blocker
类是 Reader
的一个成员,它继承自 BlockerBase
类,还是和之前一样,我们先来看一下 Blocker
类的组成。
1 |
|
前面我提到,Reader
类中有成员 Blocker
,用于缓存消息,因此有一个发布队列。当调用 Blocker::Enqueue()
函数时,Blocker
会将得到的新消息推入到队首,当队列已满时,就自动把队尾的旧消息移除。而为了用户能观察到队列中的消息,Blocker
类又加上了一个观察队列。当调用 Blocker::Observe()
时,就将发布队列拷贝一份给了观察队列。
对于每一个 Blocker
类,它保存了一张 callback_id
和回调函数指针的一一对应关系表 published_callbacks_
,记录了其所在 Reader
对象的一些回调函数。之所以是一些,是因为 Blocker
的主要功能应该是提供一个管理者获取数据的入口,方便调试、记录日志、运行模拟环境和监控整个系统,所以在 Blocker
类里注册的回调函数应该都是管理员注册的监控函数,和系统主逻辑没关系3😅。其实在真实模式下,主逻辑完全都没有到过这里,这是我花了很多时间反复确认的。
好,这时候结合 Reader
类的部分内容,就可以更好地理解 Blocker
类的作用了。Reader
类调用 Enqueue()
函数时,先更新时间参数,再调用 Blocker::Publish()
函数,其中 Enqueue()
将消息推入到发布队列中,然后 Notify()
函数用 published_callbacks_
内的所有的回调函数去处理消息。
1 |
|
这个 Publish()
函数非常重要,在讨论下文的 BlockerManager
会用到。
1 |
|
那么 published_callbacks_
是如何得到的呢?答案是,通过 Subscribe()
函数加入的。只需要调用 Blocker::Subscribe()
函数,就可以将回调函数 id 和回调函数指针一同放入到这张表中。
BlockerManager——模拟模式助手
好,接下来我们将目光暂时转移到模拟模式下。先再次介绍一下 IntraReader
类与 IntraWriter
类。它们俩都是在模拟模式下才会出现的对象。代码文件位于 cyber/blocker/intra_reader.h
和 cyber/bolcker/intra_writer.h
,它们的实现与 BlockerManager
类关系密切,这也是为何它们的代码文件会位于 cyber/blocker
而不是和 Reader
一样位于 cyber/node
的重要原因(也是为什么我选择在这里讲它们俩的原因😅)。它们分别继承自 Reader
和 Writer
。因为这是模拟模式下创建的对象,所以它们不会去创建协程,去获取传感器的数据。
老规矩,我们先看看 IntraReader
和 IntraWriter
“肚子里有什么东西"👇。看来货不多,IntraReader
类只有一个回调函数,IntraWriter
类只有个 BlockerManager
类的指针。
1 |
|
再来看看为什么称 BlockerManager
为模拟模式的助手。因为 IntraReader
和 IntraWriter
都或多或少与它有点关系。看看它们的初始化,IntraReader
类重载了 Reader::Init()
函数。在初始化中,它不会创建协程,而是直接调用 BlockerManager::Subscribe()
,把 IntraReader::OnMessage()
函数(记录时间并调用 IntraReader
创建时传入的回调函数)注册为该信道对应 Blocker
的回调函数。
1 |
|
对于 IntraWriter
类,就更直接了,在初始化时它直接用 BlockManager
创建了一个 Blocker
对象。
1 |
|
BlockerManager
类(单例模式)在模拟模式下发挥了非常大的作用!还是老样子,看看它有什么东西,哦,只有一个 BlockerMap
,它将信道名字与对应 Blocker
记录了下来。 IntraReader::Enqueue()
和 IntraWriter::Write()
都调用了 BlockerManager::Publish()
函数
再来看看它们工作时会怎么干吧。先来看一下 IntraReader::Enqueue()
函数,它会调用 BlockerManager::Publish()
函数,IntraReader::Observe()
函数也是从 BlockerManager
中取出一个 Blocker
对象,后调用 Observe()
。对于 IntraWriter
类,其 Write()
函数也是直接调用 BlockerManager::Publish()
函数来放入数据,而 BlockerManager::Publish()
函数👇会先选取(或构造)出对应信道的 Blocker
对象,然后调用 Blocker::Publish()
函数,这个函数我在前文已经讨论过了,Blocker::Publish()
函数会先调用 Enqueue()
将消息推入到发布队列中,然后 Notify()
函数通知 published_callbacks_
内的所有的回调函数去处理消息。
1 |
|
小结
在这篇博客中,我先介绍了一下 Cyber RT 的通信架构和三种通信模型,然后,开始一层层地介绍涉及到通信的类。在下篇博客中,我会将剩下的东西介绍清楚,最后我会给出一个更加完整的总结。
参考
[2] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通信传输
[3] 百度Apollo系统学习-Cyber RT 通信-上层
[4] 百度 Apollo Cyber RT简介、基本概念以及与 ROS 对照
[5] 百度Apollo系统学习-Cyber RT 通信-底层
[7] 自动驾驶汽车平台技术基础/杨世春等编著. —北京:清华大学出版社