Apollo Cyber RT 通信(上)

Apollo Cyber RT 通信(上)

前言

之前的博客中,我和大家讨论了 Apollo Cyber RT 组件的相关内容,在介绍组件的内容时,我们谈到每个组件都有自己的读者信道 Reader<M>Node 节点类对象, DataVisitor 数据访问类在创建协程工厂时也起到了关键作用。由此可见,通信问题已经成为了在学习 Cyber RT 过程中绕不过去的槛儿,那么,Cyber RT 怎么保证这么多的组件能够齐心协力,高效地完成一件件任务呢?今天我就来带领大家探讨一下这类问题。由于通信部分的内容过多,写在一篇博客里太长,所以我把它分成了两部分。

ROS 的历史遗留

之前在研究 Apollo 时,我了解到 Apollo 3.5 版本前,各个模块直接通过一个简单的运行时框架,构建在 ROS 之上。

而之后的版本,都加上了 Cyber RT,它不仅仅是一个运行时框架,还承担了数据通信和任务调度,以及记录日志等任务。但是从软件工程的角度出发,对底层的大改必然会牵动上层,为了尽可能地不影响上层代码,Cyber RT 不得不依照 ROS 的“规矩”,提供有相同名称的功能近似的接口,这之后我们可以将 ROS 与 Cyber RT 进行比较。

于是我就发现,与 ROS & ROS2 中类似的是,Cyber RT 也支持两种通信方式,详情见术语介绍

  • 发布—订阅通信方式(Publish-Subscribe),也叫基于信道的通信方式,常用于数据流处理中节点间通信。即发布者(Publisher)在信道(channel,ROS 中对应地称为 topic)上发布消息,相应的订阅者(Subscriber)便会收到消息数据
  • 服务—客户通信方式(Service-Client),也叫基于服务的通信方式,常用于双向的、需要有请求和应答的通信场景。

三种通信模型7

在 Cyber RT 中提供了不同的通信方式,以应对各类自动驾驶需求。根据上层模块所处的进程,可以将模块间的关系分为:

  • 同一进程内。在同一个进程节点之间的相互通信,对于进程内的数据,直接传递消息对象的指针,可以避免消息数据复制的开销,尤其是一些较大的消息,如点云和图像等
  • 同主机进程间。在不同进程之间的节点传播或交换信息,可以利用共享内存传输,这样不仅可以减少传输中的数据复制,显著提升传输效率,还能够满足一对多的传输场景
  • 跨主机。跨主机的数据利用 socket 传输,跨主机通信采用了第三方的开源库 Fast RTPS(Real Time Publish Subscribe Protocol,实时发布订阅协议),是 DDS(Data Distribution Service)标准的一个非常流行的开源实现,支持 RTPS 协议版本的一个订阅发布消息组件,具有高性能,实时性高,多平台支持等优点

通信架构

首先来看一下通信的层级划分(上图)2,再重复一遍,自动驾驶系统中的各个模块基本都由 Component 类实现,而一个 Component 对象包含一个 Node 对象。Node 对象会根据需要创建和管理 WriterReaderServiceClient 对象。而在通信类下面, TrasmitterReceiver 类。前者用于数据发送,后者用于数据接收。它们是数据传输层的抽象,而底下还有多个用于不同场景下的传输实现类,比如对于 Trasmitter 类,就有 IntraTransmitter 类,ShmTransmitter 类,RtpsTransmitter 类和 HybridTransmitter 类。同样地, Receiver 类也有类似的情况 。在这里我就简单地说明一下这些底层类的作用,因为这些东西过于细节,我往后也不会重点研究,但是还是有必要稍微了解一下的😀:

  • RTPS 基于 eProsimar 的 Fast RTPS,介绍同上
  • Shared memory 共享内存模式
  • Intra-Process 用于进程内通信
  • Hybrid 混合使用以上几种通信模式

OK,接下来,我们放慢脚步,一层一层地剥开通信的实现情况🐶。

节点 Node

Node 是整个数据拓扑网络中的基本单元。Node 对象会根据需要创建和管理 WriterReaderServiceClient 对象。ReaderWriter,用于发布—订阅模式。ServiceClient,用于服务—客户模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 class Node {
private
std::string node_name_;
std::string name_space_;
std::map<std::string, std::shared_ptr<ReaderBase>> readers_;
std::unique_ptr<NodeChannelImpl> node_channel_impl_ = nullptr;
std::unique_ptr<NodeServiceImpl> node_service_impl_ = nullptr;
};

template <typename MessageT>
auto Node::CreateReader(const ReaderConfig& config, const CallbackFunc<MessageT>& reader_func)
-> std::shared_ptr<cyber::Reader<MessageT>> {
/* .... */
auto reader =
node_channel_impl_->template CreateReader<MessageT>(config, reader_func);
/* .... */
return reader;
}

其中 Node 类的成员变量:NodeChannelImpl 指针和 NodeServiceImpl 指针,对于我们进一步了解通信非常重要,因为在 Node::CreateReader 函数中,Node 使用了 NodeChannelImpl 指针创建 Reader 对象。同理于其他三个,Node 类都是使用相应的指针来创建对象,没错,它们才是幕后黑手。

NodeChannelImpl 与 NodeServiceImpl ——幕后创建者

为方便讨论,我以 NodeChannelImpl 类为例,并从 CreateReader 函数出发,在 Node 类中调用了 NodeChannelImpl::CreateReader() 后,

  1. 创建 RoleAttributes 对象,把该对象的一些配置数据全部填好(如果你对该对象包含了什么很感兴趣,那么请看 cyber/proto/role_attributes.proto
  2. 根据现在的模式(真实模式还是模拟模式),来决定如何创建读者类。根据代码,真实模式时使用 Reader<M> ,模拟模式使用 IntraReader<M>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 为方便说明,对代码进行裁减 真实代码情况见 cyber/node/node_channel_impl.h 文件
template <typename MessageT>
auto NodeChannelImpl::CreateReader(const proto::RoleAttributes& role_attr,
const CallbackFunc<MessageT>& reader_func,
uint32_t pending_queue_size)
-> std::shared_ptr<Reader<MessageT>> {
/* .... */
proto::RoleAttributes new_attr(role_attr);
FillInAttr<MessageT>(&new_attr);
std::shared_ptr<Reader<MessageT>> reader_ptr = nullptr;
if (!is_reality_mode_)
reader_ptr =
std::make_shared<blocker::IntraReader<MessageT>>(new_attr, reader_func);
else
reader_ptr = std::make_shared<Reader<MessageT>>(new_attr, reader_func,
pending_queue_size);
return reader_ptr;
}

令人感到安心的是,NodeChannelImpl 类在创建 Write 对象时,步骤与 Reader 几乎一致,甚至可以说,他们只是把几个关键词换了一下而已。

那么,对于 NodeServiceImpl 类,情况有没有改变呢?emm……情况稍有变化:(以 Service 类为例)

  1. 直接创建 Service 对象,并进行初始化(初始化里干的事情日后再聊)
  2. 将创建好的 Service 指针放入到数组 serivce_list_
  3. 调用服务发现的拓扑管理类 service_discovery::TopologyManager 中的 Join() 函数(这边是个大坑,日后再聊+1 🐶)

Reader && Writer

比起服务—客户的通信模式,我更关注发布—订阅模式;比起模拟模式,我更关注真实模式,因此先作重点介绍。

Reader 类

那么兜兜转转,我们终于来到了读者类。首先来看一下 Reader 的基本情况。唔,它继承自 ReaderBase 类,它包含一个 Blocker 和一个 Receiver 对象,它们是我们要重点关注的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Reader : public ReaderBase {
explicit Reader(const proto::RoleAttributes& role_attr,
const CallbackFunc<MessageT>& reader_func = nullptr,
uint32_t pending_queue_size = DEFAULT_PENDING_QUEUE_SIZE);
protected:
double latest_recv_time_sec_; // 接受消息的最近时间
double second_to_lastest_recv_time_sec_ = -1.0; // 接受消息的第二近时间
uint32_t pending_queue_size_; // 消息缓冲区的最大值
private:
CallbackFunc<MessageT> reader_func_;
ReceiverPtr receiver_ = nullptr;
std::string croutine_name_;
BlockerPtr blocker_ = nullptr;
ChangeConnection change_conn_;
service_discovery::ChannelManagerPtr channel_manager_ = nullptr;
};

根据我们在上一小节的理解,Reader 对象只会在真实模式下被创建。既然已经说到了创建,我们就先来看一下 Reader 的创建过程吧😀。在 Reader 类的构造函数中,其类的 Blocker 成员也进行构造,这边的过程很简单。而进一步对 Reader 进行初始化,调用 Reader::Init() 函数时,情况就复杂了起来:

  1. 创建回调函数。用 Reader::Enqueue() + 传入的回调函数封装出一个新的回调函数 func
  2. 与组件的初始化过程的最后几步相似Reader 类的初始化中,也拿出了 Scheduler 类,创建了 DataVisitor 对象(通信下篇会对其详细阐述)和协程工厂对象,并用它们创建了一个任务,目的就是将第一步中封装好的回调函数 func 包装为协程,最后根据调度算法的安排,在合适的时候调用。此外还把对应的协程名字记录到 croutine_name_
  3. 根据 Reader 对象的属性,从接收器管理类 ReceiverManager<M> 中取出相对应的 Receiver 对象,该对象用于接受消息
  4. service_discovery::TopologyManager 那里拿到信道管理器 channel_manager ,最后把这个 Reader 对象加入到拓扑结构中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 为方便说明,对代码进行裁减 真实代码情况见 cyber/node/reader.h 文件
template <typename MessageT> bool Reader<MessageT>::Init() {
/* .... */
// 第一步
std::function<void(const std::shared_ptr<MessageT>&)> func;
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
// 第二步
auto sched = scheduler::Instance();
croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
auto dv = std::make_shared<data::DataVisitor<MessageT>>(
role_attr_.channel_id(), pending_queue_size_);
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
sched->CreateTask(factory, croutine_name_);
// 第三步
receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
this->role_attr_.set_id(receiver_->id().HashValue());
// 第四步
channel_manager_ = service_discovery::TopologyManager::Instance()->channel_manager();
JoinTheTopology();
return true;
}

好,那么 Reader 主要功能是什么呢?根据官方文档6Reader 类订阅了一个信道,然后就有两个主要功能,这些都牵扯到后面要介绍的类,因此在这里就简单说明一下:

  • 传入一个回调函数 CallbackFunc ,用来处理刚刚到达的消息。“处理消息”的意思就是把消息压入 Blocker 类的队列中,然后用回调函数处理。
  • 可以观察到 Blocker 类中的缓存消息。用户可以使用函数 Observe() 将消息从发布队列取出,放入到观察队列中。一个 Reader 使用一个 ChannelBuffer ,处理了的消息会被存放在这里。

好,除了初始化和主要的功能,暂时不讨论其他 Reader 相关的函数,因为它们要么过于平凡,要么涉及到了服务发现的内容(我无法在一篇博客中讨论这么多东西),接下来,我们可以去看看 Reader 类的反面,Writer 类是怎么做的。

Writer 类

同样,从 Writer 类的组成开始,它继承 WriterBase 类,但组成比 Reader 类简单很多,只有一个 Transmitter 需要我们重点关注。

1
2
3
4
5
6
7
8
template <typename MessageT>
class Writer : public WriterBase {
explicit Writer(const proto::RoleAttributes& role_attr);
private:
TransmitterPtr transmitter_;
ChangeConnection change_conn_;
service_discovery::ChannelManagerPtr channel_manager_;
};

也一样,Writer 类只会在真实模式下被创建,其构建和初始化过程比 Reader 类对象简单:

  1. 构建好 WriterBase::role_attr ,把基本的属性数据填充好
  2. 创建 Transmitter 对象(下文会提到)
  3. service_discovery::TopologyManager 那里拿到信道管理器 channel_manager ,最后把该 Writer 对象加入到拓扑结构中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename MessageT>
Writer<MessageT>::Writer(const proto::RoleAttributes& role_attr)
: WriterBase(role_attr), transmitter_(nullptr), channel_manager_(nullptr) {}

template <typename MessageT>
bool Writer<MessageT>::Init() {
/* .... */
transmitter_ =
transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);
/* .... */
this->role_attr_.set_id(transmitter_->id().HashValue());
channel_manager_ =
service_discovery::TopologyManager::Instance()->channel_manager();
JoinTheTopology();
return true;
}

Writer 类的主要功能想必大家也猜得到:向对应的信道写数据,其 Write() 函数会调用 transport::Transmitter->Transmit() 函数,每个 Writer 只能向一个信道写东西,但一个信道可以有多个 Writer 对象。

Blocker——缓存者

Blocker 类是 Reader 的一个成员,它继承自 BlockerBase 类,还是和之前一样,我们先来看一下 Blocker 类的组成。

1
2
3
4
5
6
7
8
9
template <typename T>
class Blocker : public BlockerBase {
BlockerAttr attr_; // Blocker的配置属性
MessageQueue observed_msg_queue_; // 观察队列
MessageQueue published_msg_queue_; // 发布队列

CallbackMap published_callbacks_; // id 与 回调函数的表 其实就是 unordered_map
MessageType dummy_msg_; // 假消息 常用于空值返回
};

前面我提到,Reader 类中有成员 Blocker ,用于缓存消息,因此有一个发布队列。当调用 Blocker::Enqueue() 函数时,Blocker 会将得到的新消息推入到队首,当队列已满时,就自动把队尾的旧消息移除。而为了用户能观察到队列中的消息,Blocker 类又加上了一个观察队列。当调用 Blocker::Observe() 时,就将发布队列拷贝一份给了观察队列。

对于每一个 Blocker 类,它保存了一张 callback_id 和回调函数指针的一一对应关系表 published_callbacks_,记录了其所在 Reader 对象的一些回调函数。之所以是一些,是因为 Blocker 的主要功能应该是提供一个管理者获取数据的入口,方便调试、记录日志、运行模拟环境和监控整个系统,所以在 Blocker 类里注册的回调函数应该都是管理员注册的监控函数,和系统主逻辑没关系3😅。其实在真实模式下,主逻辑完全都没有到过这里,这是我花了很多时间反复确认的。

好,这时候结合 Reader 类的部分内容,就可以更好地理解 Blocker 类的作用了。Reader 类调用 Enqueue() 函数时,先更新时间参数,再调用 Blocker::Publish() 函数,其中 Enqueue() 将消息推入到发布队列中,然后 Notify() 函数用 published_callbacks_ 内的所有的回调函数去处理消息。

1
2
3
4
5
template <typename T>
void Blocker<T>::Publish(const MessagePtr& msg) {
Enqueue(msg);
Notify(msg);
}

这个 Publish() 函数非常重要,在讨论下文的 BlockerManager 会用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
template <typename T>
void Blocker<T>::Enqueue(const MessagePtr& msg) {
/* .... */
published_msg_queue_.push_front(msg);
while (published_msg_queue_.size() > attr_.capacity)
published_msg_queue_.pop_back();
}

template <typename T>
void Blocker<T>::Notify(const MessagePtr& msg) {
for (const auto& item : published_callbacks_)
item.second(msg);
}

那么 published_callbacks_ 是如何得到的呢?答案是,通过 Subscribe() 函数加入的。只需要调用 Blocker::Subscribe() 函数,就可以将回调函数 id 和回调函数指针一同放入到这张表中。

BlockerManager——模拟模式助手

好,接下来我们将目光暂时转移到模拟模式下。先再次介绍一下 IntraReader 类与 IntraWriter 类。它们俩都是在模拟模式下才会出现的对象。代码文件位于 cyber/blocker/intra_reader.hcyber/bolcker/intra_writer.h,它们的实现与 BlockerManager 类关系密切,这也是为何它们的代码文件会位于 cyber/blocker 而不是和 Reader 一样位于 cyber/node 的重要原因(也是为什么我选择在这里讲它们俩的原因😅)。它们分别继承自 ReaderWriter 。因为这是模拟模式下创建的对象,所以它们不会去创建协程,去获取传感器的数据。

老规矩,我们先看看 IntraReaderIntraWriter “肚子里有什么东西"👇。看来货不多,IntraReader 类只有一个回调函数,IntraWriter 类只有个 BlockerManager 类的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <typename MessageT>
class IntraReader : public apollo::cyber::Reader<MessageT> {
public:
IntraReader(const proto::RoleAttributes& attr, const Callback& callback);
bool Init() override;
void Enqueue(const std::shared_ptr<MessageT>& msg) override;
private:
void OnMessage(const MessagePtr& msg_ptr);
Callback msg_callback_;
};

template <typename MessageT>
class IntraWriter : public apollo::cyber::Writer<MessageT> {
public:
explicit IntraWriter(const proto::RoleAttributes& attr);
bool Init() override;
bool Write(const MessagePtr& msg_ptr) override;
private:
BlockerManagerPtr blocker_manager_;
};

再来看看为什么称 BlockerManager 为模拟模式的助手。因为 IntraReaderIntraWriter 都或多或少与它有点关系。看看它们的初始化,IntraReader 类重载了 Reader::Init() 函数。在初始化中,它不会创建协程,而是直接调用 BlockerManager::Subscribe(),把 IntraReader::OnMessage() 函数(记录时间并调用 IntraReader 创建时传入的回调函数)注册为该信道对应 Blocker 的回调函数。

1
2
3
4
5
BlockerManager::Instance()->Subscribe<MessageT>(
this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(),
this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this,
std::placeholders::_1));

对于 IntraWriter 类,就更直接了,在初始化时它直接用 BlockManager 创建了一个 Blocker 对象。

1
2
3
4
5
6
7
template <typename MessageT>
bool IntraWriter<MessageT>::Init() {
blocker_manager_ = BlockerManager::Instance();
blocker_manager_->GetOrCreateBlocker<MessageT>(
BlockerAttr(this->role_attr_.channel_name()));
return true;
}

BlockerManager 类(单例模式)在模拟模式下发挥了非常大的作用!还是老样子,看看它有什么东西,哦,只有一个 BlockerMap ,它将信道名字与对应 Blocker 记录了下来。 IntraReader::Enqueue()IntraWriter::Write() 都调用了 BlockerManager::Publish() 函数

再来看看它们工作时会怎么干吧。先来看一下 IntraReader::Enqueue() 函数,它会调用 BlockerManager::Publish() 函数,IntraReader::Observe() 函数也是从 BlockerManager 中取出一个 Blocker 对象,后调用 Observe() 。对于 IntraWriter 类,其 Write() 函数也是直接调用 BlockerManager::Publish() 函数来放入数据,而 BlockerManager::Publish() 函数👇会先选取(或构造)出对应信道的 Blocker 对象,然后调用 Blocker::Publish() 函数,这个函数我在前文已经讨论过了,Blocker::Publish() 函数会先调用 Enqueue() 将消息推入到发布队列中,然后 Notify() 函数通知 published_callbacks_ 内的所有的回调函数去处理消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class BlockerManager {
static const std::shared_ptr<BlockerManager>& Instance() {
static auto instance =
std::shared_ptr<BlockerManager>(new BlockerManager());
return instance;
}
private:
BlockerMap blockers_;
std::mutex blocker_mutex_;
};

template <typename T>
bool BlockerManager::Publish(const std::string& channel_name,
const typename Blocker<T>::MessagePtr& msg) {
auto blocker = GetOrCreateBlocker<T>(BlockerAttr(channel_name));
blocker->Publish(msg);
return true;
}

小结

在这篇博客中,我先介绍了一下 Cyber RT 的通信架构和三种通信模型,然后,开始一层层地介绍涉及到通信的类。在下篇博客中,我会将剩下的东西介绍清楚,最后我会给出一个更加完整的总结。

参考

[1] Dig into Apollo - Cyber

[2] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通信传输

[3] 百度Apollo系统学习-Cyber RT 通信-上层

[4] 百度 Apollo Cyber RT简介、基本概念以及与 ROS 对照

[5] 百度Apollo系统学习-Cyber RT 通信-底层

[6] cyber-rt.readthedocs.io

[7] 自动驾驶汽车平台技术基础/杨世春等编著. —北京:清华大学出版社


Apollo Cyber RT 通信(上)
https://dingfen.github.io/2020/11/03/2020-11-3-CyberCommu1/
作者
Bill Ding
发布于
2020年11月3日
更新于
2024年4月9日
许可协议