Apollo Cyber RT 服务与服务发现

Apollo Cyber 服务与服务发现

前言

现在我们开始介绍 Cyber RT 通信部分的最后一块内容——服务发现。在 Cyber RT 通信(上)中,我说过,Cyber RT 支持两种通信方式:

  • 发布—订阅通信方式(Publish-Subscribe),也叫基于信道的通信方式
  • 服务—客户通信方式(Service-Client),也叫基于服务的通信方式

Cyber RT 通信(上)Cyber RT 通信(下)中,我已经详细地介绍了发布—订阅通信方式,之所以先对该发布—订阅方式进行介绍,是因为该方式使用的场景比较多。考虑到对系统认识的完整性以及后面的研究工作,我觉得还是有必要说一下服务—客户通信方式。

与发布—订阅方式最大的不同是,服务—客户通信方式需要两个节点之间完成请求(Request)和应答(Response)才可完成通信,常用于节点之间双向通信的场景。

回顾在Cyber RT 通信(上)讲过的底层通信方式,一共有如下三种

  • 同一进程内。在同一个进程节点之间的相互通信,对于进程内的数据,直接传递消息对象的指针,避免消息数据复制的开销
  • 同主机进程间。在不同进程之间的节点传播信息,可以利用共享内存传输,减少传输中的数据复制,显著提升传输效率,并满足一对多的传输场景
  • 跨主机。跨主机的数据利用 socket 传输,跨主机通信采用了第三方的开源库 Fast RTPS(Real Time Publish Subscribe Protocol,实时发布订阅协议),是 DDS(Data Distribution Service)标准的一个非常流行的开源实现,支持 RTPS 协议版本的一个订阅发布消息组件,具有高性能,实时性高,多平台支持等优点

在服务—客户通信方式中,使用的是第三种通信渠道—— Fast RTPS ,之后会进行介绍。

从 NodeServiceImpl 说起

如果你看过我之前写的博客,就应该知道 Node 类通过 NodeServiceImpl 类来创建 ServiceClient ,我们以 Service 类为例,其创建过程如下:

  1. 直接创建 Service 对象,并进行初始化 Init()
  2. 将创建好的 Service 指针放入到数组 serivce_list_ 中,并注册名字获得 id
  3. 调用服务发现的拓扑管理类 service_discovery::TopologyManager 中的 Join() 函数,将其加入到整个服务发现的拓扑结构中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename Request, typename Response>
auto NodeServiceImpl::CreateService(const std::string& service_name,
const typename Service<Request, Response>::ServiceCallback& service_callback)
-> typename std::shared_ptr<Service<Request, Response>> {
auto service_ptr = std::make_shared<Service<Request, Response>>(
node_name_, service_name, service_callback);
RETURN_VAL_IF(!service_ptr->Init(), nullptr);

service_list_.emplace_back(service_ptr);
attr_.set_service_name(service_name);
auto service_id = common::GlobalData::RegisterService(service_name);
attr_.set_service_id(service_id);
service_discovery::TopologyManager::Instance()->service_manager()->Join(
attr_, RoleType::ROLE_SERVER);
return service_ptr;
}

创建 Client 的过程与 Service 极为相似,这里不过多介绍。接下来,我们不妨从以上三步出发,仔细盘一盘 ServiceClient 类的实现过程,进而理解一下服务发现的功能。

Service & Client

Service

先来看看它的构造函数以及它们的成员。Service 模板类继承了 ServiceBase 类,此类只包含了一个服务名称,算是一个实现 Service 类的接口。在构造函数中,Service 接收三个参数,一个是所在节点的名称,一个是通信服务的名称,最后一个是服务的回调函数,在接收到消息后执行。关于 Service 具体的成员,还请仔细看一下下面的代码:point_down:,为了简洁,我删去了不少多线程相关的变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Service(const std::string& node_name, const std::string& service_name, 
const ServiceCallback& service_callback)

template <typename Request, typename Response>
class Service : public ServiceBase {
private:
std::string node_name_; // 创建服务的节点名称
ServiceCallback service_callback_; // 服务的回调函数
std::function<void(const std::shared_ptr<Request>&,
const transport::MessageInfo&)> request_callback_; // 请求回调函数
std::shared_ptr<transport::Transmitter<Response>> response_transmitter_; // 应答的写者
std::shared_ptr<transport::Receiver<Request>> request_receiver_; // 请求的读者
std::string request_channel_; // 请求“信道”
std::string response_channel_; // 应答“信道”
std::thread thread_; // 线程
std::list<std::function<void()>> tasks_; // 任务队列
};

啊哈,在上篇博客我说过,ReaderWriter 的底层实现是 ReceiverTransmitter ,从包含类的成员来看,ServiceClient 的底层也是靠 ReceiverTransmitter 实现的。所以说,我之前博客里大书特书的订阅—发布通信模式,其底层的通信办法与服务—客户通信方式还是一致的。Reader 创建时用的信道,和这里 Service 中的“信道”,其实本质是一样的,它们都是字符串,在系统中充当消息传递的“桥梁”,当然为作区分以及避免误解,我在 Service 这边的“信道”打个引号。同时,Service 这边有线程也有任务队列,意味着消息会在 Service 内部得到处理,不需要理会调度器,也不需要使用那边的协程。

再来瞧一瞧 Service::Init() 函数,它在构造函数完成后,立刻被调用执行。因为代码有点长,我将不重要的部分略去,突出重点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
bool Service<Request, Response>::Init() {
/* RoleAttribute 的赋值 初始化 */
auto transport = transport::Transport::Instance();
response_transmitter_ =
transport->CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
request_callback_ =
std::bind(&Service<Request, Response>::HandleRequest, this,
std::placeholders::_1, std::placeholders::_2);
/* .... */
request_receiver_ = transport->CreateReceiver<Request>(
role,
[=](const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info,
const proto::RoleAttributes& reader_attr) {
(void)reader_attr;
// 创建 task 函数
auto task = [this, request, message_info]() {
this->HandleRequest(request, message_info);
};
// 将其放入任务队列
Enqueue(std::move(task));
},
proto::OptionalMode::RTPS);
thread_ = std::thread(&Service<Request, Response>::Process, this);
return true;
}

仔细梳理一下,Init() 函数一共做了:

  1. 判断是否已经初始化,若没有,继续往下做,将 RoleAttribute 的值填充好,略去不说

  2. 获得 Transport 全局对象(该类我已经在这里讨论过了),并创建 Transmitter 对象

    1
    CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);

    注意到,这边的通信渠道选择的是 RTPS,意味着在服务—客户通信方式中,底层使用的都是 Fast RTPS 协议进行通信。

  3. 创建请求回调函数。请求回调函数,与 HandleRequest 函数做了绑定,我猜测 Service 在接收到服务请求后,就会调用这个函数进行处理。

    1
    2
    std::bind(&Service<Request, Response>::HandleRequest, this,
    std::placeholders::_1, std::placeholders::_2);
  4. 最复杂的来了,创建 Receiver 对象,这部分我在这里也讨论过了。需要注意的是传入的 MessageListener ,即那个复杂的 Lambda 表达式,其内部又套了一个 Lambda 表达式。MessageListerner 这个回调函数会在 Receiver 接收到新消息后被调用。其动作是:建立一个函数 tasktask 函数调用 HandleRequest() 函数对消息进行处理,然后将 task 入队列。简单地说,就是每当一个新消息到来,就会创建一个针对该消息的处理函数,然后把它放入到任务队列中。

  5. 创建线程,开始运行 Process() 函数。

很自然地,你会好奇 Process() 函数是如何运作的,其实很简单,就是维护了一个先进先出的任务队列。程序中的条件变量会不断地检查 !inited_ || !this->tasks_.empty() 这个布尔值,一旦为真,那就 unblock 自己,从任务队列的最前端取出任务并运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Service<Request, Response>::Process() {
while (!cyber::IsShutdown()) {
std::unique_lock<std::mutex> ul(queue_mutex_);
condition_.wait(ul, [this]() { return !inited_ || !this->tasks_.empty(); });
if (!inited_)
break;
if (!tasks_.empty()) {
auto task = tasks_.front();
tasks_.pop_front();
ul.unlock();
task();
}
}
}

最后,HandleRequest() 函数要做什么呢?Service 是如何处理接收到的请求呢?:point_down:在下面的代码中:

  1. 创建一个 Response 对象,并执行 service_callback_ 函数,如前所述,该函数在构造时就已经传入
  2. 用 copy 构造函数创建一个 MessageInfo ,然后更新发送者的 id
  3. 调用 SendResponse() 函数
1
2
3
4
5
6
7
8
9
10
11
12
template <typename Request, typename Response>
void Service<Request, Response>::HandleRequest(
const std::shared_ptr<Request>& request,
const transport::MessageInfo& message_info) {
/* .... */
std::lock_guard<std::mutex> lk(service_handle_request_mutex_);
auto response = std::make_shared<Response>();
service_callback_(request, response);
transport::MessageInfo msg_info(message_info);
msg_info.set_sender_id(response_transmitter_->id());
SendResponse(msg_info, response);
}

SendResponse() 函数也是十分简单,它只是将 Response 对象通过 Transmit() 函数发送出去。这往后的技术细节我就不再讨论了,不过要注意的是,response_transmitter_ 的类型是 RtpsTransmitter ,如果你想往下研究的话,只需要看这个类的 Transmit() 函数就可以了。

1
2
3
4
5
6
7
template <typename Request, typename Response>
void Service<Request, Response>::SendResponse(
const transport::MessageInfo& message_info,
const std::shared_ptr<Response>& response) {
/* .... */
response_transmitter_->Transmit(response, message_info);
}

Client

Client 类的实现与 Service 类有很多相似的地方,但也有很多不同。Client 类继承了 ClientBase 类,与 ServiceBase 类似,ClientBase 也就是个接口。Client 类的成员与 Service 的有点镜像关系 :joy:,也有很多不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template <typename Request, typename Response>
class Client : public ClientBase {
std::string node_name_; // 创建客户的节点名称
std::function<void(const std::shared_ptr<Response>&,
const transport::MessageInfo&)> response_callback_; // 应答回调函数

std::unordered_map<uint64_t,
std::tuple<SharedPromise, CallbackType, SharedFuture>>
pending_requests_; // 正在提交的请求
std::shared_ptr<transport::Transmitter<Request>> request_transmitter_; // 请求的写者
std::shared_ptr<transport::Receiver<Response>> response_receiver_; // 应答的读者
std::string request_channel_; // 请求“信道”
std::string response_channel_; // 应答“信道”
transport::Identity writer_id_; // 写者 id
uint64_t sequence_number_; // 序列号
};

Service 有明显不同的是,Client 多了 pending_requests_ 表,其类型也十分复杂:std::unordered_map<uint64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>>,它的含义是正在提交的请求,该成员与发出请求和接收应答的过程密切相关。

现在让我们先来看一下 Client::Init() 函数,代码有点长,因而略去了一些不重要的东西:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template <typename Request, typename Response>
bool Client<Request, Response>::Init() {
/* RoleAttribute 的赋值 初始化 */
auto transport = transport::Transport::Instance();
request_transmitter_ =
transport->CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
writer_id_ = request_transmitter_->id();

response_callback_ =
std::bind(&Client<Request, Response>::HandleResponse, this,
std::placeholders::_1, std::placeholders::_2);
/* .... */
response_receiver_ = transport->CreateReceiver<Response>(
role,
[=](const std::shared_ptr<Response>& response,
const transport::MessageInfo& message_info,
const proto::RoleAttributes& reader_attr) {
(void)message_info;
(void)reader_attr;
response_callback_(response, message_info);
},
proto::OptionalMode::RTPS);
return true;
}

Client::Init() 函数做的事情好像和 Service::Init() 的差不多,就简单说一下:

  1. 判断是否已经初始化,并将 RoleAttribute 的值填充好

  2. 通过 Transport 类创建 Transmitter 对象,用于发送请求

    1
    CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);

    注意到,这边的通信渠道依然是 RTPS

  3. 创建应答回调函数 response_callback_,并与 HandleResponse() 函数做了绑定

  4. 创建 Receiver 对象,仍然要注意那个 Lambda 表达式,该函数直接调用了 response_callback_ 回调函数。回顾我在这里讨论过的 OnNewMessage() 函数,Receiver 会在接收到新的应答后立即调用 response_callback_,这就是证明我所言的最直接的证据

Init() 函数中,可以了解到,Client 在接收到 Response 后,会调用 HandleResponse() 对 Response 进行处理,让我们快速过一遍:

  1. 检查请求是否由该 Client 对象发出
  2. MessageInfo 中获得序列号。有趣的是,这个序列号实际上是 Client 自己给的,仔细看一下, MessageInfo 发送到 Service 后,Service 只改变了它的发送者 id,然后就立马将它塞入 Response 中传回
  3. 根据序列号,从 pending_request_ 获得消息,然后执行操作:将得到的应答消息放入 std::promise 中,然后调用回调函数,处理这个消息。如果你对 C++ 的多线程不太熟悉,建议猛戳此链接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void Client<Request, Response>::HandleResponse(
const std::shared_ptr<Response>& response,
const transport::MessageInfo& request_header) {
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
// 检查请求是否是该client发出的
if (request_header.spare_id() != writer_id_)
return;
// 从请求 消息中获得序列号
uint64_t sequence_number = request_header.seq_num();
if (this->pending_requests_.count(sequence_number) == 0)
return;
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
// 放入应答消息,并调用回调函数
call_promise->set_value(response);
callback(future);
}

看完了应答消息的处理,再来看一下 Client 如何发送请求。在 SendRequest 函数中,变量 future 在发送了一个异步请求后,会等待一定时间(默认为 5 秒),期待接收到一个应答。这其中涉及到 AsyncSendRequest 函数,它先创建了一个 MessageInfo,然后将它和 Request 通过 Transmitter 发送出去,之后就是建立一个 tuple,把在“未来”会接收到的消息提前布局在了成员变量 pending_request_ 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template <typename Request, typename Response>
typename Client<Request, Response>::SharedResponse
Client<Request, Response>::SendRequest(SharedRequest request,
const std::chrono::seconds& timeout_s) {
/* .... */
auto future = AsyncSendRequest(request);
/* .... */
auto status = future.wait_for(timeout_s);
if (status == std::future_status::ready)
return future.get();
else
return nullptr;
}

template <typename Request, typename Response>
typename Client<Request, Response>::SharedFuture
Client<Request, Response>::AsyncSendRequest(SharedRequest request,
CallbackType&& cb) {
if (IsInit()) {
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
sequence_number_++; // 序列号自加 创建 MessageInfo
transport::MessageInfo info(writer_id_, sequence_number_, writer_id_);
request_transmitter_->Transmit(request, info);
// 建立一个 tuple
SharedPromise call_promise = std::make_shared<Promise>();
SharedFuture f(call_promise->get_future());
pending_requests_[info.seq_num()] =
std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
return f;
} else
return std::shared_future<std::shared_ptr<Response>>();
}

// pending_request_ 类型
std::unordered_map<uint64_t,
std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_

也许上面的只言片语让你感到困惑, pending_request_ 到底何方神圣?:point_up_2:综合之前所说的发布请求和应答接收过程,pending_request_ 表记录下了正在发送的请求。表中用序列号作为主键,而后面的值是一个 tuple,放入了 std::promise ,它用于放入“未来”接收到的 Response (在 HandleResponse() 中一清二楚),而最后面的 std::future 用于获取它,中间的 CallbackType 则是处理这个 Response 的回调函数。兵马未动粮草先行,在将请求发出后,Client 就准备好了这一系列东西,因而 HandleResponse() 在处理应答时就显得游刃有余,简洁明了:slightly_smiling_face:。当然,因为这是记录正在发送的请求,收到后一定要将它删去。

1
2
3
4
5
6
7
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
call_promise->set_value(response);
callback(future);

服务发现

让我们回到最初的地方,在 NodeServiceImpl 类创建 Service 完后,服务发现的拓扑管理类 service_discovery::TopologyManager 中的 Join() 函数,会将刚创建的 Service 加入到整个服务发现的拓扑结构中,而在早先时候,我们就遇到过,在 ReaderWriter 初始化的最后,会调用到 JoinTheTopology 函数。这些都和服务发现相关,接下来我们就重点聊一聊这部分内容。

先明确,服务发现不负责具体消息的传递(这些我们已经在之前讲的很清楚了),主要负责监测 cyber RT 中通信节点的情况,并且处理新节点加入或旧节点退出等操作,在其中起主要作用负责的就是 TopologyManager

网络拓扑结构是如何来的?事实上就是节点、信道、读者、写者、服务、客户之间的通信关系的抽象表达。在之前的博客里,我说过 Node 类包含了 ServerClientWriterReader 这四类,而它们可以看做是网络中的顶点,信道就是 WriterReader 的有向边(因为信息只能从 Writer 传到 Reader ),ServiceServerClient 的边。Cyber RT 中把 WriterServer 称作 Upstream,另外两个是 Downstream。

TopologyManager

TopologyManager 是个单例,这很明显,因为系统中只需要有一个“监管员”来负责监控网络拓扑就足够了。TopologyManager 有三个子管理器,并有共同的基类 Manager。它们分别为:

- NodeManager 用于管理网络拓扑中的节点
  - ChannelManager 用于管理信道以及 ReaderWriter
  - ServiceManager 用于管理 ServiceClient

除此之外,它还有其他的成员 :point_down:。

  • participant_ ,意为通信网络中使用 RTPS 的参与者,不要忘了虽然 TopologyManager 监管整个网络,但它也使用 RTPS ,也是其中的一员,该变量就是指 TopologyManager 自身。
  • participant_listener_ 会在之后介绍
  • change_signal_ 类似于 Qt 中的信号槽机制,通过调用 AddChangeListener() 函数, 连接(connect)了相应的监听器,一有风吹草动,就会立马调用它
  • participant_names_ 目前网络中的参与者 rtps::id 与参与者名称对应的表格
1
2
3
4
5
6
7
8
9
10
11
12
13
using PartNameContainer =
std::map<eprosima::fastrtps::rtps::GUID_t, std::string>;
class TopologyManager {
NodeManagerPtr node_manager_; // shared ptr of NodeManager
ChannelManagerPtr channel_manager_; // shared ptr of ChannelManager
ServiceManagerPtr service_manager_; // shared ptr of ServiceManager
// rtps participant to publish and subscribe
transport::ParticipantPtr participant_;
ParticipantListener* participant_listener_;
ChangeSignal change_signal_; // topology changing signal,
// < connect to `ChangeFunc`s
PartNameContainer participant_names_; // other participant in the topology
}

TopologyManager 的初始化非常简单,就是先创建参与者 Participant ,然后初始化 NodeManagerChannelManagerServiceManager。先说说 Participant 的创建:

1
2
3
4
5
6
7
8
9
10
bool TopologyManager::CreateParticipant() {
std::string participant_name =
common::GlobalData::Instance()->HostName() + '+' +
std::to_string(common::GlobalData::Instance()->ProcessId());
participant_listener_ = new ParticipantListener(std::bind(
&TopologyManager::OnParticipantChange, this, std::placeholders::_1));
participant_ = std::make_shared<transport::Participant>(
participant_name, 11511, participant_listener_);
return true;
}

看起来很复杂,其实很简单,Participant 的名字从全局变量中的 HostName()ProcessId() 获得。然后就是创建监听器 participant_listener_,它与 OnParticipantChange 绑定,在参与者发生变化时被调用,设置监听的端口为 11511,进而构造出 Participant 对象,该类牵扯的技术细节比较繁琐,我在此不进一步介绍。

我们关心的是,在参与者发生了变化后(加入、离去),TopologyManager 会做出何种反应?请允许我隆重介绍一下基于 Fast RTPS 的发现机制2。之所以这么起名,是因为这层拓扑监控主要是通过 Fast RTPS 提供的自动发现机制。若进程意外退出,则要将各管理类中相应信息进行更新。优点在于,如果进程出错或设备断开,该机制也可以工作,但粒度比较粗,且不是非常及时(比如断开时)。对于该机制的内容,都在 OnParticipantChange() 函数里了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TopologyManager::OnParticipantChange(const PartInfo& info) {
ChangeMsg msg;
if (!Convert(info, &msg))
return;
/* .... */
if (msg.operate_type() == OperateType::OPT_LEAVE) {
auto& host_name = msg.role_attr().host_name();
int process_id = msg.role_attr().process_id();
node_manager_->OnTopoModuleLeave(host_name, process_id);
channel_manager_->OnTopoModuleLeave(host_name, process_id);
service_manager_->OnTopoModuleLeave(host_name, process_id);
}
change_signal_(msg);
}

先来看第一步:TopologyManager::Convert() 函数。就是一个数据类型的转换函数,它将 Fast RTPS 中的 ParticipantDiscoveryInfo 也就是参与者的变动信息,包装为 Cyber RT 中所需要的信息类型 ChangeMsg,同时根据消息的类型(加入还是离去),更新 participant_names_ 表格。也就是说,现在主要信息源从 info 转移到了 msg 中。

第二步,如果检查发现该参与者要离去,那么 OnParticipantChange() 会通知三个通信网络的管理器,即调用 OnTopoModuleLeave() 函数,让各自的管理者把将离去的参与者排除在外。而加入的时候就不需要考虑,因为这些参与者在创建时,就会调用相应的函数加入到这个网络中。

第三步,就是这句话 change_signal_(msg)change_signal_ 的本质就是若干个回调函数组成的列表。这句话就是在调用:point_down:这个函数(为简单,去掉了加锁),将所有的、调用过 AddChangeListener() 函数后与之 connect 的回调函数全部执行,相当于是监听器在监听到变化后,开始工作了。

1
2
3
4
5
6
7
8
9
10
11
12
void operator()(Args... args) {
SlotList local; // Slot 就相当于一个 std::function
for (auto& slot : slots_) {
local.emplace_back(slot);
}

if (!local.empty()) {
for (auto& slot : local)
(*slot)(args...);
}
ClearDisconnectedSlots();
}

好,TopologyManager 的功能基本就说完了,接下来让我们看看它的三个成员。

Manager

我承认这部分内容对了解整个 Apollo 系统有帮助,但这些内容也有点偏离我所在的课题组最初的任务目标,而且考虑到这里面细节繁多,想要逐一了解完全会花费许多精力,但缺少这部分内容又显得不太完整,因此我打算对这部分内容作一个简短的说明,部分解释会不到位甚至出现错误,也请理解。如果想进一步了解,可以参考一下文后的参考链接。

先来说说这三者的基类——ManagerManager 类的成员主要就是一个 Fast RTPS 的 publisher_ 和一个 subscriber_ ,订阅者对应的回调函数 listener_signal_ 信号槽。

1
2
3
4
5
6
7
8
9
10
11
class Manager {
/* .... */
ChangeType change_type_;
std::string host_name_; //
int process_id_;
std::string channel_name_;
eprosima::fastrtps::Publisher* publisher_;
eprosima::fastrtps::Subscriber* subscriber_;
SubscriberListener* listener_;
ChangeSignal signal_;
};

TopologyManager 初始化时,我说过 NodeManagerChannelManagerServiceManager 在这时被创建并初始化的。

CreateSubscriber() 创建订阅者时,listener_ 绑定了 Manager::OnRemoteChange() 函数。而 OnRemoteChange 函数会调用 Dispose() 来对这些远处传来的消息进行处理。每当通信网络中有参与者加入或离开拓扑结构时,程序会调用 Manager::Join()Manager::Leave(),这两个函数会通过 RTPS 底层库发布相应的消息,然后订阅者的回调函数 listener_ 就被调用了。
TopologyManager 类一样,有一类似于 Qt 的信号槽机制,signal_ ——有若干个注册过的回调函数。该信号在 Manager::Notify() 时就会调用,执行内部所有的回调函数(与 TopologyManager 那一节所描述的一样)。与你想的一样,注册槽的函数是 Manager::AddChangeListener()

NodeManager

相比基类,NodeManager 多了一个SingleValueWarehouse 类型的成员。SingleValueWarehouse 类,实质就是一个更高级的线程安全 std::unordered_map 表 。加入和离开拓扑,就是向表中 Add()Remove() 节点。而对于 NodeManager::Dispose() 函数,它会根据消息种类来向表中加入或删除节点,然后调用 Notify() 函数,通知(执行)等待该信号槽的回调函数。

我们知道,NodeManager 类管理 Node 网络拓扑。那么它具体体现在哪里呢?令我惊讶的是,他居然在 NodeChannelImpl 类中。当 Node 被创建或销毁的时候,进行Join()Leave() 操作。Node 网络主要是监察所有节点的存活,并提供查询接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
NodeChannelImpl(const std::string& node_name)
: is_reality_mode_(true), node_name_(node_name) {
/* 给 node_attr 赋值 */
uint64_t node_id = common::GlobalData::RegisterNode(node_name);
node_attr_.set_node_id(node_id);
is_reality_mode_ = common::GlobalData::Instance()->IsRealityMode();
if (is_reality_mode_) {
node_manager_ =
service_discovery::TopologyManager::Instance()->node_manager();
node_manager_->Join(node_attr_, RoleType::ROLE_NODE);
}
}

virtual ~NodeChannelImpl() {
if (is_reality_mode_) {
node_manager_->Leave(node_attr_, RoleType::ROLE_NODE);
node_manager_ = nullptr;
}
}

ChannelManager

相比基类,ChannelManager 多了四个MultiValueWarehouse 类型(其本质是线程安全的 std::unordered_multimap)的成员,还有一个 Graph 类。它们分别记录读写者间的关系及它们形成的网络图结构,并分别以 node id 和 channel id 为主键来保存读写者。其他过程与 NodeManager 类似,保存的信息和索引方式略有不同。

ChannelManager 类管理 ReaderWriter 的信道拓扑网络。ReaderWriter 对象在初始化时会调用 JoinTheTopology() 函数加入拓扑结构,而在它们被 Shutdown() 时调用 LeaveTheTopology() 函数离开拓扑。以 Reader 的情况为例:

  • JoinTheTopology() 函数会先把 Reader & Writer::OnChannelChange() 注册到ChannelManager 的信号槽中,这样当信道拓扑结构发生变化时,回调函数 OnChannelChange() 就会被执行,Reader 就会得知信道已经改变。
  • 获取信道中所有的 Writer ,并调整自己的 receiver_ ,让这些写者发出的信息能被 Reader 获取。Enable() 函数实际上也是让某一回调函数加入到信号槽中,真是一招鲜吃遍天 :man_shrugging:
  • 最后调用 join() 函数,把自己加入信道网络拓扑中。当然,一旦有新参与者加入,就会引起拓扑网络变化,会调用对应信道的节点的回调函数,最终让存在于该信道的 Writer 得知有新的 Reader 加入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void Reader<MessageT>::JoinTheTopology() {
// add listener
change_conn_ = channel_manager_->AddChangeListener(std::bind(
&Reader<MessageT>::OnChannelChange, this, std::placeholders::_1));

// get peer writers
const std::string& channel_name = this->role_attr_.channel_name();
std::vector<proto::RoleAttributes> writers;
channel_manager_->GetWritersOfChannel(channel_name, &writers);
for (auto& writer : writers) {
receiver_->Enable(writer);
}
channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_READER,
message::HasSerializer<MessageT>::value);
}

void Reader<MessageT>::LeaveTheTopology() {
channel_manager_->RemoveChangeListener(change_conn_);
channel_manager_->Leave(this->role_attr_, proto::RoleType::ROLE_READER);
}

ServiceManage

ServiceManager 类相比其基类,多了一个 MultiValueWarehouseSingleValueWarehouse 类型,分别为 servers_clients_ 两个表。有新的 ServerClient 加入时,这两张表会更新,当然这些数据结构也提供了查询功能,判断某一个服务是否存在等。

总结

对于 TopologyManager ,其主要功能就是监视网络拓扑结构中是否有参与者加入或离开。主要的监听任务由 CreateParticipant() 函数中创建的 Participant 对象完成。它包含 host name 和 process id ,还有监听器 ParticipantListener ,本质是一个网络变化后就执行的回调函数。当网络拓扑发生变化时,从底层 Fast RTPS 传上来的信息 ,会先在 Convert() 函数中被转换成 Cyber RT 中的数据结构 ChangeMsg,然后,监听器会执行 OnParticipantChange() ,三个子管理器就开始更新网络信息。

参考

[1] 百度Apollo系统学习-Cyber RT 通信-服务发现

[2] 自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通信传输


Apollo Cyber RT 服务与服务发现
https://dingfen.github.io/2020/11/18/2020-11-18-CyberServiceDiscovery/
作者
Bill Ding
发布于
2020年11月18日
更新于
2024年4月9日
许可协议