Apollo Cyber RT 通信(下)

Apollo Cyber RT 通信(下)

前言

欢迎回来!今天我继续和大家聊 Cyber RT 的通信,上文我探讨了 Cyber RT 的两种通信方式和三种通信模型,并从通信架构的角度,一层层地给大家详细介绍了通信时代码的具体工作情况。由于通信内容过多,全放在一篇博客理论太长,于是我将这些内容简单地一分为二,事实上它们应当是有机的整体。

上文末尾,我介绍了 Blocker 类的功能,今天这篇博客,我们继续向深处进军↖(^ω^)↗。

## Receiver & Transmitter

我们在上文已经认识了 ReaderWriter ,现在继续往下走。如果你仔细看代码的话,在 ReaderWriter 初始化时,都会分别构建好底层的 ReceiverTransmitter 对象。为了描述简便,我之前有意地忽略了,现在把它拿出来。Reader 部分的比较复杂,还使用了 ReceiverManager 进行管理;Writer 就比较直接了,在 Init() 函数中可以直接看到。

1
2
3
4
5
6
7
8
9
bool Reader<MessageT>::Init() {
receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
}

bool Writer<MessageT>::Init() {
/* .... */
transmitter_ =
transport::Transport::Instance()->CreateTransmitter<MessageT>(role_attr_);
}

我们不妨把这里作为突破口,打开新世界的大门。

ReceiverManager

之前提到过,Reader 在初始化时,需要用 ReceiverManager::GetReceiver() 获得 Receiver 对象。它的内部分封装了一个 unordered_map 表,将信道名字和与之对应的 Receiver 对象保存在表中。再看看下面的代码,可得出一个结论,如果同一个进程内,不同的 Reader 对象订阅同一个信道,事实上使用的是同一个 Receiver 2。再看看那个很长的 CreateReceiver() 函数调用,除了传递一个配置信息参数外,还有一个很长的回调函数。回调函数会做:

  • 加入一个 Transport 事件,类型为 Dispatch
  • 调用数据分发器 DataDispatcher::Dispatch() 函数
  • 加入一个 Transport 事件,类型为 Notify

这些步骤具体做了什么?我们目前还不知道,求知的欲望驱使着我们继续往下探索。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename MessageT>
auto ReceiverManager<MessageT>::GetReceiver(const proto::RoleAttributes& role_attr)
-> typename std::shared_ptr<transport::Receiver<MessageT>> {
// because multi reader for one channel will write datacache multi times,
// so reader for datacache we use map to keep one instance for per channel
const std::string& channel_name = role_attr.channel_name();
// 如果信道名字对应的 Receiver 还没有创建 那就创建
if (receiver_map_.count(channel_name) == 0) {
// 一个巨长的 CreateReceiver() 函数调用
receiver_map_[channel_name] = transport::Transport::Instance()->CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::DISPATCH, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});
}
// 如果已经有了 直接返回
return receiver_map_[channel_name];
}

Transport

Transport 类,单例模式,有如下指针成员,emm……在还没完全弄懂底层代码的情况下,也很难告诉你们这些类的具体作用:

  • Participant 类 FastRtps 相关
  • Notifier 类 与 Shm 相关
  • IntraDispatcher 类 Intra 的分发器
  • ShmDispatcher 类 Shm 的分发器
  • RtpsDispatcher 类 Rtps 的分发器
1
2
3
4
5
6
7
8
class Transport {
private:
ParticipantPtr participant_ = nullptr;
NotifierPtr notifier_ = nullptr;
IntraDispatcherPtr intra_dispatcher_ = nullptr;
ShmDispatcherPtr shm_dispatcher_ = nullptr;
RtpsDispatcherPtr rtps_dispatcher_ = nullptr;
}

和之前 Cyber RT 给人的感觉一样,一旦它要创建什么重要的东西,不调用个几层是根本不可能完成的。在这里,Transport 类的两个函数 CreateTransmitterCreateReceiver 都会根据传入的 mode ,去构造出对应的子类对象,分别对应我在这篇博客开头提到的四种不同场景下的传输实现类。哦,提醒一下,默认的 mode 是 Hybrid ,也就是三种模式混用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
template <typename M>
auto Transport::CreateTransmitter(const RoleAttributes& attr,
const OptionalMode& mode) -> typename std::shared_ptr<Transmitter<M>> {
/* .... */
// 往 modified_attr 中加入 qos profile
// 根据各种模式 创建相应的 Transmitter 子类
switch (mode) {
case OptionalMode::INTRA:
transmitter = std::make_shared<IntraTransmitter<M>>(modified_attr);
break;
case OptionalMode::SHM:
transmitter = std::make_shared<ShmTransmitter<M>>(modified_attr);
break;
case OptionalMode::RTPS:
transmitter =
std::make_shared<RtpsTransmitter<M>>(modified_attr, participant());
break;
default:
transmitter =
std::make_shared<HybridTransmitter<M>>(modified_attr, participant());
break;
}
if (mode != OptionalMode::HYBRID)
transmitter->Enable();
return transmitter;
}

Transmitter 类是写消息,而 Receiver 类是读消息,因此 Receiver 类比 Transmitter 类多了一个参数 MessageListener ,其本质就是个回调函数:

1
using MessageListener = std::function<void(const MessagePtr&, const MessageInfo&, const RoleAttributes&)>;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template <typename M>
auto Transport::CreateReceiver(const RoleAttributes& attr,
const typename Receiver<M>::MessageListener& msg_listener,
const OptionalMode& mode) -> typename std::shared_ptr<Receiver<M>> {
/* .... */
// 往 modified_attr 中加入 qos profile
// 根据各种模式 创建相应的 Receiver 子类
switch (mode) {
case OptionalMode::INTRA:
receiver =
std::make_shared<IntraReceiver<M>>(modified_attr, msg_listener);
break;
case OptionalMode::SHM:
receiver = std::make_shared<ShmReceiver<M>>(modified_attr, msg_listener);
break;
case OptionalMode::RTPS:
receiver = std::make_shared<RtpsReceiver<M>>(modified_attr, msg_listener);
break;
default:
receiver = std::make_shared<HybridReceiver<M>>(
modified_attr, msg_listener, participant());
break;
}
if (mode != OptionalMode::HYBRID)
receiver->Enable();
return receiver;
}

最后一部分代码:在 Receiver 对象被创建时,只要模式不是 Hybrid,都会立刻调用 Receiver::Enable() 函数开启接收。

Receiver & Transmitter

这两个类是 EndPoint 的子类。关于 Endpoint5,那可就是整个架构的类继承的终点了,其内部有三个成员

  • bool enabled_ 用来标记是否被启用
  • Identity id_ 用于标识,对于每个 Endpoint 对象拥有唯一的 id 号,其子类也是用这个来进行标识
  • RoleAttributes attr_ 用来记录配置文件中的数据。

其中 Receiver 类只有一个回调函数 msg_listener_,该回调函数就是 Receiver 构造时传入的函数。在新消息到来时,会被调用👇:

1
2
3
4
5
6
template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
const MessageInfo& msg_info) {
if (msg_listener_ != nullptr)
msg_listener_(msg, msg_info, attr_);
}

ReceiverManager 那一小节中,已经说到该函数有三个步骤,其中5,调用 DataDispatcher::Dispatch 非常关键。因为从这步可以看出上层和底层最终完成了闭环。当底层 Receiver 的回调函数 msg_listener 收到消息被调用时,上层的分发器 DataDispatcher 会把来自底层的消息发到等待的消息缓存里,然后调用上层的通知器 DataNotifier::Notify() ,唤醒对应的 Component 的封装了 Process() 的协程,让协程处理这些消息。

再看看 Receiver 的四个子类,每个子类都包含了相应的 Dispatcher 的指针,例如,RtpsReceiver 类含有 RtpsDispatcherPtr 成员。这些 Dispatcher 的功能就是增删监听者,从而让 Receiver 关闭或开启,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template <typename M>
void RtpsReceiver<M>::Enable() {
/* .... */
dispatcher_->AddListener<M>(
this->attr_, std::bind(&RtpsReceiver<M>::OnNewMessage, this,
std::placeholders::_1, std::placeholders::_2));
this->enabled_ = true;
}

template <typename M>
void RtpsReceiver<M>::Disable() {
dispatcher_->RemoveListener<M>(this->attr_);
this->enabled_ = false;
}

简单地介绍一下 Dispatcher 类(别和下面的 DataDispatcher 搞混了)。它主要负责记录一个 channel_id 和对应 ListenerHandlerBasePtr 的关系表。而AddListener()RemoveListener() 函数是从关系表中,拿到给定信道的对应 ListenerHandlerBase ,并在这上面连接(Connect)或不连接(Disconnect)相应的回调函数。由于时间精力有限,这边解释得比较混乱,若想具体了解其工作机制,可以参考文章最后的文献。总的来说,这有点像在 Qt 中实现的信号槽机制:信号在特定情况下被发射出去,对应的信号响应函数在槽中监听。信号与槽通过 Connect 函数关联,一个信号可以发射到多个槽,多个信号可以被一个槽监听。


再来看看 Transmitter 类,它是真正的数据写者的基类。它有两个成员,分别为序列号 seq_num_ 和消息信息 msg_info_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template <typename M>
class Transmitter : public Endpoint {
public:
explicit Transmitter(const RoleAttributes& attr);
virtual bool Transmit(const MessagePtr& msg);
virtual bool Transmit(const MessagePtr& msg, const MessageInfo& msg_info) = 0;

uint64_t NextSeqNum() { return ++seq_num_; }
uint64_t seq_num() const { return seq_num_; }
protected:
uint64_t seq_num_;
MessageInfo msg_info_;
};

我主要研究其中的 Transmit() 函数,其四个子类都具体实现了 Transmit() 函数,如果你仔细看过前一篇博客,就知道这也是 Writer 类一直在调用的函数。那么这个 Transmit() 函数有什么具体步骤呢?

  • Writer 调用 Transmitter::Transmit() 函数
  • 设置 msg_info::seq_num = NextSeqNum 消息的序列号
  • 加入一个 Transport事件,类型为 Transmit Begin
  • 调用子类实现的 Transmit() 函数,该函数通过传入一条消息即可以完成数据写入任务。
1
2
3
4
5
6
7
template <typename M>
bool Transmitter<M>::Transmit(const MessagePtr& msg) {
msg_info_.set_seq_num(NextSeqNum());
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::TRANSMIT_BEGIN, attr_.channel_id(), msg_info_.seq_num());
return Transmit(msg, msg_info_);
}

好,我对 ReceiverTransmitter 类的介绍就到这里了。我不打算再继续介绍它们的子类以及更加底层的设计是如何实现的,因为这边牵扯到非常多的技术细节,弄懂这些会消耗非常多的精力,而且这些东西也远远超出了课题组目前的要求。

Data 部分

通信架构的内容已经全部介绍完了(至少对于发布—订阅通信方式来说),但还是感觉缺了什么东西,把这两者有效地连接起来😅。没错,我至今还没有说清楚,Writer 发布的消息是如何让 Reader 看到的。而这就牵扯到 cyber/data 中实现的类了。

DataVisitor

先来说说数据访问类 DataVisitor ,它是 DataVisitorBase 的子类。它的主要成员和构造函数如下。先来说几个明显可以得到的结论:

  • DataVisitor 对象都会有若干个缓存类 ChannelBuffer ,还有一个 DataFusion 对象,融合了所有的消息类型
  • 在初始化的时候,首先构建这些 ChannelBuffer ,然后它们都会被加入到相应类型DataDispatcher 的管理中
  • data_notifier_ (存在于 DataVisitorBase 中),会向信道 0 加入一个 notifier_ ,类型为 struct Notifier { std::function<void()> callback; };,这表明信道 0 注定与其他信道不一般 :thinking:
  • data_fusion_ 会构建并指向 AllLatest 对象,从类型来看,它们整合了所有的消息类型,应当用于信息的最后收集发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
template <typename M0, typename M1>
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {
public:
explicit DataVisitor(const std::vector<VisitorConfig>& configs)
: buffer_m0_(configs[0].channel_id, new BufferType<M0>(configs[0].queue_size)),
buffer_m1_(configs[1].channel_id, new BufferType<M1>(configs[1].queue_size)) {
DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);
DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);
data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);
data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);
}
private:
fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
};

/** DataVisitorBase 内有
* 指向 DataNotifier 的指针
* 封装为 Notifier 的回调函数 */
class DataVisitorBase {
public:
void RegisterNotifyCallback(std::function<void()>&& callback) {
notifier_->callback = callback;
}
protected:
uint64_t next_msg_index_ = 0;
DataNotifier* data_notifier_ = DataNotifier::Instance();
std::shared_ptr<Notifier> notifier_;
};

该类主要用于消息数据的访问,存放到来的消息数据,并提供接口供消息读取。事实上,我们之前在讨论组件的初始化的最后部分时见过它,也在订阅者初始化时遇见过它,但当时我都有意地略过了。现在我们重拾这部分内容(为了简单且不失一般性,我以两个信道的情况为例),把这部分彻底搞明白:

1
2
3
4
5
6
7
8
9
10
11
12
template<typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize() {
/* .... */
// 创建 DataVisitor 和 RoutineFactory 最后创建任务
std::vector<data::VisitorConfig> config_list;
for (auto& reader : readers_)
config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
// 创建了两个信道的 DataVisitor 并用在了协程工厂类
auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
croutine::RoutineFactory factory = croutine::CreateRoutineFactory<M0, M1>(func, dv);
return sched->CreateTask(factory, node_->Name());
}

上面代码做了:

  • 收集了所有的 Reader 对象的所读信道 id 和消息队列大小,放入到 config_list 后就创建 DataVisitor 对象

  • 在上面 DataVisitor 类的构造函数中,根据传入的信道 id 和消息队列尺寸,DataVisitor 内为其创建了两个 ChannelBuffer 作为缓冲区

    • 调用数据分发器将它们加入到 DataDispatcher 的管理中
    • 调用 DataNotifier::AddNotifier() 函数,传入第 0 个读者的信道 id ,加入到 DataNotifier 的管理中。DataDispatcherDataNotifier 类均为单例,之后我们会对它们做详细介绍
    • 创建 DataFusion 对象,这也是之后要了解的╮(╯▽╰)╭
  • 创建协程工厂,并构建出要封装为协程的函数:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    factory.create_routine = [=]() {
    return [=]() {
    std::shared_ptr<M0> msg0; std::shared_ptr<M1> msg1;
    for (;;) {
    CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
    if (dv->TryFetch(msg0, msg1)) { // 取数据
    f(msg0, msg1); // f 函数就是组件初始化时创建的 func 函数
    CRoutine::Yield(RoutineState::READY);
    } else
    CRoutine::Yield();
    }
    }; };

    协程做的就是调用 dv->TryFetch() 取数据(下文会详细说明),如果成功就调用组件的 Process() 函数,且协程的状态从等待数据转变为了就绪,而一旦协程就绪,就可以被 Processor 类运行

  • Scheduler 创建任务,在CreateTask() 函数中,调用 visitor->RegisterNotifyCallback() 函数

    1
    2
    3
    4
    5
    visitor->RegisterNotifyCallback([this, task_id]() {
    if (cyber_unlikely(stop_.load()))
    return;
    this->NotifyProcessor(task_id);
    });

    上面的函数被赋值给了 DataVisitorBase::notifier_ ,用于唤醒相应的协程来处理该新消息

说完这部分过程后,我发现我们至少还要理解一下 DataDispatcherDataNotifierDataFusionAllLatest 类😂😂😂。

DataDispatcher

千里之行,始于足下。先来看看数据分发类 DataDispatcher ,顾名思义,它将底层传来的数据进行分发,具体来说,当新消息到来时,通过 Dispatch() 函数把它们放到该信道下的所有消息缓冲区中。它是个单例模式,但是个模板类,意味着每一个消息类型会有对应的一个唯一的 DataDispatcher 对象。类内记录了一个信道 id 与多个 CacheBuffer 对象对应的表。注意到:一个信道可以有多个订阅者 Reader ,每个订阅者拥有一个 CacheBuffer 缓冲区,而这个缓冲区就是之前 DataVisitor 类在构造时给每个消息类型创建的 ChannelBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename T>
class DataDispatcher {
public:
void AddBuffer(const ChannelBuffer<T>& channel_buffer);
bool Dispatch(const uint64_t channel_id, const std::shared_ptr<T>& msg);
private:
DataNotifier* notifier_ = DataNotifier::Instance();
AtomicHashMap<uint64_t, BufferVector> buffers_map_;
};

template <typename T>
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,
const std::shared_ptr<T>& msg) {
BufferVector* buffers = nullptr;
if (apollo::cyber::IsShutdown())
return false;
// 每次收到该信道的消息时,就会给所有缓冲区都放一份消息
if (buffers_map_.Get(channel_id, &buffers)) {
for (auto& buffer_wptr : *buffers)
if (auto buffer = buffer_wptr.lock()) {
std::lock_guard<std::mutex> lock(buffer->Mutex());
// 向 CacheBuffer 填入数据
buffer->Fill(msg);
}
} else
return false;
return notifier_->Notify(channel_id);
}

DataDispatcher::Dispatch() 函数非常重要,在 ReceiverManagerReceiver && Transmitter 中我们已经提到,他是连接上层和底层的最关键一环。在 Receiver 对象每次收到该信道的消息时,就会调用DataDispatcher::Dispatch() 函数分发刚收到的数据,函数会先从表中取出所有对应信道的缓冲区,然后调用 CacheBuffer::Fill() 函数来给缓冲区填数据(稍后介绍这个函数),最后调用 DataNotifier::Notify() 函数,唤醒它们对应的协程来取数据并运行。现在你应该明白,为何 DataVisitor 在构造时,需要把刚刚建立的缓冲区给 DataDispatcher 管理,不然的话,缓冲区拿不到消息啊。

DataNotifier

再来看看 DataNotifier 类。它是个单例模式,类内有一个信道 id 与多个 Notifer 对应的表,这是考虑到一个信道可以有多个订阅者。很显然,前文提到的在 DataVisitor 构造时,调用 AddNotifier() 就是要把自己的 Notifier 存到这个表中。

1
2
3
4
5
6
7
class DataNotifier {
public:
void AddNotifier(uint64_t channel_id, const std::shared_ptr<Notifier>& notifier);
bool Notify(const uint64_t channel_id);
private:
AtomicHashMap<uint64_t, NotifyVector> notifies_map_;
};

至于 AddNotifier() 函数的实现,emm……很平凡,无非就是找到对应的信道 id ,然后将参数中的 Notifier 放入到数组里(如果没有数组,新建一个)。重要的是唤醒函数 Notify() ,该函数内部会调用 notifier->callback() ,回顾一下,这个 notifier 是在 Scheduler 创建任务时被设置的,内含有 NotifyProcessor() 函数,可以唤醒协程。在 Receiver && TransmitterReceiverManager 也提到,第二步的分发器最终会调用该函数,唤醒所有监听该信道的协程,来处理到来的消息。这样,你也就明白为什么 DataVisitor 类在构造时要把 notifier 加入进去了,不然的话信道来了个消息,就没法唤醒协程,Reader 就不知道了呀。

1
2
3
4
5
6
7
8
9
inline bool DataNotifier::Notify(const uint64_t channel_id) {
NotifyVector* notifies = nullptr;
if (notifies_map_.Get(channel_id, &notifies)) {
for (auto& notifier : *notifies)
if (notifier && notifier->callback)
notifier->callback();
return true;
return false;
}

DataFusion & AllLatest

再来看看 DataVisitor 构造函数的最后一步,创建 DataFusion 对象,看看名字,就应该明白该对象用于信道数据的融合。DataFusionAllLatest 的基类,DataFusion 类十分简单,仅提供了一个 Fusion() 接口,具体由 AllLatest 实现。所以,我们重点看一下 AllLatest 类,哈,听名字就知道它会取所有信道中的最新值,再结合它是 DataFusion 的子类,所以主要功能应该是融合多个信道的最新数据

我还是以两个信道的情况为例,该类成员有几个 ChannelBuffer 类,其中一个比较特殊,类型是数据融合的 buffer_fusion_

1
2
3
4
5
6
7
8
9
template <typename M0, typename M1>
class AllLatest<M0, M1, NullType, NullType> : public DataFusion<M0, M1> {
// 所谓融合消息,就是放在 tuple 里
using FusionDataType = std::tuple<std::shared_ptr<M0>, std::shared_ptr<M1>>;
private:
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
ChannelBuffer<FusionDataType> buffer_fusion_;
};

在构造函数中,特殊的信道 0 的消息缓冲区会调用 SetFusionCallback() 来设置回调函数 :point_down:,为方便表述,我给它起名 FusionFunc 。从下面的代码中看出, FusionFunc 先判断是否所有信道都有消息,并获取最新的消息,如果都有消息的话就将这些消息融合,即用 std::tuple 封装,再调用 Fill() 函数填入到 buffer_fusion_CacheBuffer 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
AllLatest(const ChannelBuffer<M0>& buffer_0, const ChannelBuffer<M1>& buffer_1)
: buffer_m0_(buffer_0), buffer_m1_(buffer_1),
buffer_fusion_(buffer_m0_.channel_id(),
new CacheBuffer<std::shared_ptr<FusionDataType>>(
buffer_0.Buffer()->Capacity() - uint64_t(1))) {
buffer_m0_.Buffer()->SetFusionCallback( // buffer0 设置融合的回调函数
[this](const std::shared_ptr<M0>& m0) {
std::shared_ptr<M1> m1;
if (!buffer_m1_.Latest(m1)) // 信道内是否有消息 有的话取出最后一个
return;
auto data = std::make_shared<FusionDataType>(m0, m1);
std::lock_guard<std::mutex> lg(buffer_fusion_.Buffer()->Mutex());
// 填充到消息中
buffer_fusion_.Buffer()->Fill(data);
});
}

何时调用 FusionFunc 呢?答案在这个 Fill() 函数(见下代码)中,它诡计多端——如果 CacheBuffer 有回调函数 FusionFunc,会调用回调函数;如果没有,会把接收到的数据放入缓冲区中。很显然在上面的构造函数中,只有信道 0 设置了回调函数 FusionFunc因此当信道 0 有数据到来, DataDispatcher::Dispatch() 被调用时(代码见 DataDispatcher 部分),进而调用 Fill() 函数时, FusionFunc 才会被调用,将最新的消息融合,并将融合的消息填入到 buffer_fusion_ 中。而其他信道的数据到来时, Fill() 函数只是单纯往对应的 CacheBuffer 中填数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
void CacheBuffer::Fill(const T& value) {
if (fusion_callback_)
fusion_callback_(value); // buffer 0 运行这个
else {
if (Full()) {
buffer_[GetIndex(head_)] = value;
++head_; ++tail_;
} else {
buffer_[GetIndex(tail_ + 1)] = value;
++tail_;
}
}
}

在协程的处理函数中(回顾一下, DataVisitor 的创建协程工厂中提到的)会调用 DataVisitor::TryFetch() 函数,再调用 Fusion() 函数(代码如下),它从融合数据的缓冲区 buffer_fusion_ 中拿走(Fetch)融合消息,这也就意味着同时拿多个信道上的最新消息,保证了每次给 Component::Process() 函数的参数都必须“全员到齐”,并且所有信息都是最新的。综上所述,只有信道 0 收到消息后,才会融合其他信道的消息,往往主导通信处理的节奏,因此信道 0 的选取就比较关键了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool DataVisitor::TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) {
if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {
next_msg_index_++;
return true;
}
return false;
}

bool AllLatest::Fusion(uint64_t* index, std::shared_ptr<M0>& m0,
std::shared_ptr<M1>& m1) override {
std::shared_ptr<FusionDataType> fusion_data;
// 从 fusion 缓冲区中取数据
if (!buffer_fusion_.Fetch(index, fusion_data))
return false;
// 得到了数据 分别赋值给 m0 m1
m0 = std::get<0>(*fusion_data);
m1 = std::get<1>(*fusion_data);
return true;
}

ChannelBuffer & CacheBuffer

前文我一直提到一个词,叫做缓冲区。事实上这个词具体指向了两个类,ChannelBufferCacheBuffer 类。为了让读者更好地理解“缓冲区”这个词,我简要地介绍一下这两个类。ChannelBuffer 类包含了两个成员:信道 id 和 指向 CacheBuffer 的指针。它的函数 Fetch()Latest() 分别用于取对应索引的消息和取得最新消息。而 CacheBuffer 类,其实质就是一个循环队列,用来放置某个信道产生的数据。需要注意的是,CacheBuffer 占用的内存是恒定的,因为里面的数组长度一开始就被限定了,所以,一旦缓冲区装满了,它会毫不犹豫地丢弃最旧的消息,推入最新的消息。具体的队列实现在 cyber/data/cache_buffer.hcyber/data/channel_buffer.h,很简单,有兴趣可以直接读代码。

总结

(⊙o⊙)哦终于,我把 Cyber RT 所有的通信机制都看完了,哦,BTW,上篇通信链接在这里。现在,在把所有的内容都搞明白后,让我们理一理自己昏沉的头脑,跳脱出代码的边边框框,从上帝视角审视一下数据是如何流通的。

这里有两张非常好的图,感谢 [2],我可以不用自己动手,花费数小时画图了😀

在上面这张图中,仍然以两个信道的 Component 为例,从左上角出发:

  • Component 初始化建成了两个 Reader 对象,然后创建了一个 DataVisitor<M0, M1> 对象
  • 两个 Reader 对象也分别创建了一个 DataVisitor<> 对象,回顾一下上篇提到的 Reader 的初始化过程
  • 这些 DataVisitor 对象会分别创建出 ChannelBuffer ,并使用 DataDispatcher 管理这些缓冲区(注意看它内部的表)
  • 当接收到新消息后,DataDispatcher 会给对应的信道上的所有缓冲区进行分派,特别地,若信道 0 有新消息,还会对其他消息进行融合(注意看 AllLatest 对象)
  • 之后,DataVisitor 会使用 DataNotifier 对象,唤醒相应的协程,处理收到的数据

现在,我们再来看看底层的通信架构,虽然有些部分我略去未讲,但这张图我们还是可以看明白的🐶。这次,我们从最右边开始看起。

  • 可以看出两个 Reader 对象共用了一个 Receiver 。这是因为在同一个进程内,不同的 Reader 对象订阅同一个信道,就会使用同一个 Receiver
  • 默认选择的 Receiver 是 hybrid 的,因此需要三个底层接收类 IntraReceiverShmReceiverRtpsReceiver 配合
  • 在创建 Receiver 时,监听者处理函数 msg_listener_ 就已经“准备就绪”了。
  • Dispatcher 的表中记录了监听者和它们负责的信道,并把它们连接 Connect 起来,如同 Qt 中的信号槽机制
  • 一有新消息到达(最左边的函数是我陌生的),那么就会立刻触发信号槽机制,调用 msg_listener 函数,之后就是上层 DataDispatcher 的工作了

最后的最后,事实上我还是落了一个东西:服务—客户通信方式😓😓😂😂。的确,这篇博客的内容全是关于发布—订阅通信方式的,对于 ServiceClient ,几乎没有提及,之后,我就会补上这一部分服务发现的内容。

参考

[1] Dig into Apollo - Cyber

[2] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通信传输

[3] 百度Apollo系统学习-Cyber RT 通信-上层

[4] 百度 Apollo Cyber RT简介、基本概念以及与 ROS 对照

[5] 百度Apollo系统学习-Cyber RT 通信-底层

[6] cyber-rt.readthedocs.io

[7] 自动驾驶汽车平台技术基础/杨世春等编著. —北京:清华大学出版社


Apollo Cyber RT 通信(下)
https://dingfen.github.io/2020/11/07/2020-11-7-CyberCommu2/
作者
Bill Ding
发布于
2020年11月7日
更新于
2024年4月9日
许可协议