Apollo Cyber RT 服务与服务发现
Apollo Cyber 服务与服务发现
前言
现在我们开始介绍 Cyber RT 通信部分的最后一块内容——服务发现。在 Cyber RT 通信(上)中,我说过,Cyber RT 支持两种通信方式:
- 发布—订阅通信方式(Publish-Subscribe),也叫基于信道的通信方式
- 服务—客户通信方式(Service-Client),也叫基于服务的通信方式
在 Cyber RT 通信(上)和 Cyber RT 通信(下)中,我已经详细地介绍了发布—订阅通信方式,之所以先对该发布—订阅方式进行介绍,是因为该方式使用的场景比较多。考虑到对系统认识的完整性以及后面的研究工作,我觉得还是有必要说一下服务—客户通信方式。
与发布—订阅方式最大的不同是,服务—客户通信方式需要两个节点之间完成请求(Request)和应答(Response)才可完成通信,常用于节点之间双向通信的场景。
回顾在Cyber RT 通信(上)讲过的底层通信方式,一共有如下三种
- 同一进程内。在同一个进程节点之间的相互通信,对于进程内的数据,直接传递消息对象的指针,避免消息数据复制的开销
- 同主机进程间。在不同进程之间的节点传播信息,可以利用共享内存传输,减少传输中的数据复制,显著提升传输效率,并满足一对多的传输场景
- 跨主机。跨主机的数据利用 socket 传输,跨主机通信采用了第三方的开源库 Fast RTPS(Real Time Publish Subscribe Protocol,实时发布订阅协议),是 DDS(Data Distribution Service)标准的一个非常流行的开源实现,支持 RTPS 协议版本的一个订阅发布消息组件,具有高性能,实时性高,多平台支持等优点
在服务—客户通信方式中,使用的是第三种通信渠道—— Fast RTPS ,之后会进行介绍。
从 NodeServiceImpl 说起
如果你看过我之前写的博客,就应该知道 Node
类通过 NodeServiceImpl
类来创建 Service
和 Client
,我们以 Service
类为例,其创建过程如下:
- 直接创建
Service
对象,并进行初始化Init()
- 将创建好的
Service
指针放入到数组serivce_list_
中,并注册名字获得 id - 调用服务发现的拓扑管理类
service_discovery::TopologyManager
中的Join()
函数,将其加入到整个服务发现的拓扑结构中
1 |
|
创建 Client
的过程与 Service
极为相似,这里不过多介绍。接下来,我们不妨从以上三步出发,仔细盘一盘 Service
和 Client
类的实现过程,进而理解一下服务发现的功能。
Service & Client
Service
先来看看它的构造函数以及它们的成员。Service
模板类继承了 ServiceBase
类,此类只包含了一个服务名称,算是一个实现 Service
类的接口。在构造函数中,Service
接收三个参数,一个是所在节点的名称,一个是通信服务的名称,最后一个是服务的回调函数,在接收到消息后执行。关于 Service
具体的成员,还请仔细看一下下面的代码:point_down:,为了简洁,我删去了不少多线程相关的变量。
1 |
|
啊哈,在上篇博客我说过,Reader
和 Writer
的底层实现是 Receiver
和 Transmitter
,从包含类的成员来看,Service
和 Client
的底层也是靠 Receiver
和 Transmitter
实现的。所以说,我之前博客里大书特书的订阅—发布通信模式,其底层的通信办法与服务—客户通信方式还是一致的。Reader
创建时用的信道,和这里 Service
中的“信道”,其实本质是一样的,它们都是字符串,在系统中充当消息传递的“桥梁”,当然为作区分以及避免误解,我在 Service
这边的“信道”打个引号。同时,Service
这边有线程也有任务队列,意味着消息会在 Service
内部得到处理,不需要理会调度器,也不需要使用那边的协程。
再来瞧一瞧 Service::Init()
函数,它在构造函数完成后,立刻被调用执行。因为代码有点长,我将不重要的部分略去,突出重点。
1 |
|
仔细梳理一下,Init()
函数一共做了:
-
判断是否已经初始化,若没有,继续往下做,将 RoleAttribute 的值填充好,略去不说
-
获得
Transport
全局对象(该类我已经在这里讨论过了),并创建Transmitter
对象1
CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
注意到,这边的通信渠道选择的是 RTPS,意味着在服务—客户通信方式中,底层使用的都是 Fast RTPS 协议进行通信。
-
创建请求回调函数。请求回调函数,与
HandleRequest
函数做了绑定,我猜测Service
在接收到服务请求后,就会调用这个函数进行处理。1
2std::bind(&Service<Request, Response>::HandleRequest, this,
std::placeholders::_1, std::placeholders::_2); -
最复杂的来了,创建
Receiver
对象,这部分我在这里也讨论过了。需要注意的是传入的MessageListener
,即那个复杂的 Lambda 表达式,其内部又套了一个 Lambda 表达式。MessageListerner
这个回调函数会在Receiver
接收到新消息后被调用。其动作是:建立一个函数task
,task
函数调用HandleRequest()
函数对消息进行处理,然后将task
入队列。简单地说,就是每当一个新消息到来,就会创建一个针对该消息的处理函数,然后把它放入到任务队列中。 -
创建线程,开始运行
Process()
函数。
很自然地,你会好奇 Process()
函数是如何运作的,其实很简单,就是维护了一个先进先出的任务队列。程序中的条件变量会不断地检查 !inited_ || !this->tasks_.empty()
这个布尔值,一旦为真,那就 unblock 自己,从任务队列的最前端取出任务并运行。
1 |
|
最后,HandleRequest()
函数要做什么呢?Service
是如何处理接收到的请求呢?:point_down:在下面的代码中:
- 创建一个
Response
对象,并执行service_callback_
函数,如前所述,该函数在构造时就已经传入 - 用 copy 构造函数创建一个
MessageInfo
,然后更新发送者的 id - 调用
SendResponse()
函数
1 |
|
SendResponse()
函数也是十分简单,它只是将 Response
对象通过 Transmit()
函数发送出去。这往后的技术细节我就不再讨论了,不过要注意的是,response_transmitter_
的类型是 RtpsTransmitter
,如果你想往下研究的话,只需要看这个类的 Transmit()
函数就可以了。
1 |
|
Client
Client
类的实现与 Service
类有很多相似的地方,但也有很多不同。Client
类继承了 ClientBase
类,与 ServiceBase
类似,ClientBase
也就是个接口。Client
类的成员与 Service
的有点镜像关系 :joy:,也有很多不同。
1 |
|
与 Service
有明显不同的是,Client
多了 pending_requests_
表,其类型也十分复杂:std::unordered_map<uint64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>>
,它的含义是正在提交的请求,该成员与发出请求和接收应答的过程密切相关。
现在让我们先来看一下 Client::Init()
函数,代码有点长,因而略去了一些不重要的东西:
1 |
|
Client::Init()
函数做的事情好像和 Service::Init()
的差不多,就简单说一下:
-
判断是否已经初始化,并将 RoleAttribute 的值填充好
-
通过
Transport
类创建Transmitter
对象,用于发送请求1
CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
注意到,这边的通信渠道依然是 RTPS
-
创建应答回调函数
response_callback_
,并与HandleResponse()
函数做了绑定 -
创建
Receiver
对象,仍然要注意那个 Lambda 表达式,该函数直接调用了response_callback_
回调函数。回顾我在这里讨论过的OnNewMessage()
函数,Receiver
会在接收到新的应答后立即调用response_callback_
,这就是证明我所言的最直接的证据
从 Init()
函数中,可以了解到,Client
在接收到 Response 后,会调用 HandleResponse()
对 Response 进行处理,让我们快速过一遍:
- 检查请求是否由该
Client
对象发出 - 从
MessageInfo
中获得序列号。有趣的是,这个序列号实际上是Client
自己给的,仔细看一下,MessageInfo
发送到Service
后,Service
只改变了它的发送者 id,然后就立马将它塞入 Response 中传回 - 根据序列号,从
pending_request_
获得消息,然后执行操作:将得到的应答消息放入std::promise
中,然后调用回调函数,处理这个消息。如果你对 C++ 的多线程不太熟悉,建议猛戳此链接
1 |
|
看完了应答消息的处理,再来看一下 Client
如何发送请求。在 SendRequest
函数中,变量 future
在发送了一个异步请求后,会等待一定时间(默认为 5 秒),期待接收到一个应答。这其中涉及到 AsyncSendRequest
函数,它先创建了一个 MessageInfo
,然后将它和 Request 通过 Transmitter
发送出去,之后就是建立一个 tuple,把在“未来”会接收到的消息提前布局在了成员变量 pending_request_
中。
1 |
|
也许上面的只言片语让你感到困惑, pending_request_
到底何方神圣?:point_up_2:综合之前所说的发布请求和应答接收过程,pending_request_
表记录下了正在发送的请求。表中用序列号作为主键,而后面的值是一个 tuple,放入了 std::promise
,它用于放入“未来”接收到的 Response (在 HandleResponse()
中一清二楚),而最后面的 std::future
用于获取它,中间的 CallbackType
则是处理这个 Response 的回调函数。兵马未动粮草先行,在将请求发出后,Client
就准备好了这一系列东西,因而 HandleResponse()
在处理应答时就显得游刃有余,简洁明了:slightly_smiling_face:。当然,因为这是记录正在发送的请求,收到后一定要将它删去。
1 |
|
服务发现
让我们回到最初的地方,在 NodeServiceImpl
类创建 Service
完后,服务发现的拓扑管理类 service_discovery::TopologyManager
中的 Join()
函数,会将刚创建的 Service
加入到整个服务发现的拓扑结构中,而在早先时候,我们就遇到过,在 Reader
和 Writer
初始化的最后,会调用到 JoinTheTopology
函数。这些都和服务发现相关,接下来我们就重点聊一聊这部分内容。
先明确,服务发现不负责具体消息的传递(这些我们已经在之前讲的很清楚了),主要负责监测 cyber RT 中通信节点的情况,并且处理新节点加入或旧节点退出等操作,在其中起主要作用负责的就是 TopologyManager
。
网络拓扑结构是如何来的?事实上就是节点、信道、读者、写者、服务、客户之间的通信关系的抽象表达。在之前的博客里,我说过 Node
类包含了 Server
、Client
、Writer
、Reader
这四类,而它们可以看做是网络中的顶点,信道就是 Writer
到 Reader
的有向边(因为信息只能从 Writer
传到 Reader
),Service
是 Server
与 Client
的边。Cyber RT 中把 Writer
和 Server
称作 Upstream,另外两个是 Downstream。
TopologyManager
TopologyManager
是个单例,这很明显,因为系统中只需要有一个“监管员”来负责监控网络拓扑就足够了。TopologyManager
有三个子管理器,并有共同的基类 Manager
。它们分别为:
- NodeManager
用于管理网络拓扑中的节点
- ChannelManager
用于管理信道以及 Reader
和 Writer
- ServiceManager
用于管理 Service
和 Client
除此之外,它还有其他的成员 :point_down:。
participant_
,意为通信网络中使用 RTPS 的参与者,不要忘了虽然TopologyManager
监管整个网络,但它也使用 RTPS ,也是其中的一员,该变量就是指TopologyManager
自身。participant_listener_
会在之后介绍change_signal_
类似于 Qt 中的信号槽机制,通过调用AddChangeListener()
函数, 连接(connect)了相应的监听器,一有风吹草动,就会立马调用它participant_names_
目前网络中的参与者 rtps::id 与参与者名称对应的表格
1 |
|
TopologyManager
的初始化非常简单,就是先创建参与者 Participant
,然后初始化 NodeManager
、ChannelManager
和 ServiceManager
。先说说 Participant
的创建:
1 |
|
看起来很复杂,其实很简单,Participant
的名字从全局变量中的 HostName()
和 ProcessId()
获得。然后就是创建监听器 participant_listener_
,它与 OnParticipantChange
绑定,在参与者发生变化时被调用,设置监听的端口为 11511,进而构造出 Participant
对象,该类牵扯的技术细节比较繁琐,我在此不进一步介绍。
我们关心的是,在参与者发生了变化后(加入、离去),TopologyManager
会做出何种反应?请允许我隆重介绍一下基于 Fast RTPS 的发现机制2。之所以这么起名,是因为这层拓扑监控主要是通过 Fast RTPS 提供的自动发现机制。若进程意外退出,则要将各管理类中相应信息进行更新。优点在于,如果进程出错或设备断开,该机制也可以工作,但粒度比较粗,且不是非常及时(比如断开时)。对于该机制的内容,都在 OnParticipantChange()
函数里了。
1 |
|
先来看第一步:TopologyManager::Convert()
函数。就是一个数据类型的转换函数,它将 Fast RTPS 中的 ParticipantDiscoveryInfo
也就是参与者的变动信息,包装为 Cyber RT 中所需要的信息类型 ChangeMsg
,同时根据消息的类型(加入还是离去),更新 participant_names_
表格。也就是说,现在主要信息源从 info
转移到了 msg
中。
第二步,如果检查发现该参与者要离去,那么 OnParticipantChange()
会通知三个通信网络的管理器,即调用 OnTopoModuleLeave()
函数,让各自的管理者把将离去的参与者排除在外。而加入的时候就不需要考虑,因为这些参与者在创建时,就会调用相应的函数加入到这个网络中。
第三步,就是这句话 change_signal_(msg)
,change_signal_
的本质就是若干个回调函数组成的列表。这句话就是在调用:point_down:这个函数(为简单,去掉了加锁),将所有的、调用过 AddChangeListener()
函数后与之 connect 的回调函数全部执行,相当于是监听器在监听到变化后,开始工作了。
1 |
|
好,TopologyManager
的功能基本就说完了,接下来让我们看看它的三个成员。
Manager
我承认这部分内容对了解整个 Apollo 系统有帮助,但这些内容也有点偏离我所在的课题组最初的任务目标,而且考虑到这里面细节繁多,想要逐一了解完全会花费许多精力,但缺少这部分内容又显得不太完整,因此我打算对这部分内容作一个简短的说明,部分解释会不到位甚至出现错误,也请理解。如果想进一步了解,可以参考一下文后的参考链接。
先来说说这三者的基类——Manager
。Manager
类的成员主要就是一个 Fast RTPS 的 publisher_
和一个 subscriber_
,订阅者对应的回调函数 listener_
和 signal_
信号槽。
1 |
|
在 TopologyManager
初始化时,我说过 NodeManager
、ChannelManager
和 ServiceManager
在这时被创建并初始化的。
在 CreateSubscriber()
创建订阅者时,listener_
绑定了 Manager::OnRemoteChange()
函数。而 OnRemoteChange
函数会调用 Dispose()
来对这些远处传来的消息进行处理。每当通信网络中有参与者加入或离开拓扑结构时,程序会调用 Manager::Join()
或 Manager::Leave()
,这两个函数会通过 RTPS 底层库发布相应的消息,然后订阅者的回调函数 listener_
就被调用了。
与 TopologyManager
类一样,有一类似于 Qt 的信号槽机制,signal_
——有若干个注册过的回调函数。该信号在 Manager::Notify()
时就会调用,执行内部所有的回调函数(与 TopologyManager 那一节所描述的一样)。与你想的一样,注册槽的函数是 Manager::AddChangeListener()
。
NodeManager
相比基类,NodeManager
多了一个SingleValueWarehouse
类型的成员。SingleValueWarehouse
类,实质就是一个更高级的线程安全 std::unordered_map
表 。加入和离开拓扑,就是向表中 Add()
或 Remove()
节点。而对于 NodeManager::Dispose()
函数,它会根据消息种类来向表中加入或删除节点,然后调用 Notify()
函数,通知(执行)等待该信号槽的回调函数。
我们知道,NodeManager
类管理 Node
网络拓扑。那么它具体体现在哪里呢?令我惊讶的是,他居然在 NodeChannelImpl
类中。当 Node
被创建或销毁的时候,进行Join()
或 Leave()
操作。Node 网络主要是监察所有节点的存活,并提供查询接口。
1 |
|
ChannelManager
相比基类,ChannelManager
多了四个MultiValueWarehouse
类型(其本质是线程安全的 std::unordered_multimap
)的成员,还有一个 Graph
类。它们分别记录读写者间的关系及它们形成的网络图结构,并分别以 node id 和 channel id 为主键来保存读写者。其他过程与 NodeManager
类似,保存的信息和索引方式略有不同。
ChannelManager
类管理 Reader
和 Writer
的信道拓扑网络。Reader
和 Writer
对象在初始化时会调用 JoinTheTopology()
函数加入拓扑结构,而在它们被 Shutdown()
时调用 LeaveTheTopology()
函数离开拓扑。以 Reader
的情况为例:
JoinTheTopology()
函数会先把Reader & Writer::OnChannelChange()
注册到ChannelManager
的信号槽中,这样当信道拓扑结构发生变化时,回调函数OnChannelChange()
就会被执行,Reader
就会得知信道已经改变。- 获取信道中所有的
Writer
,并调整自己的receiver_
,让这些写者发出的信息能被Reader
获取。Enable()
函数实际上也是让某一回调函数加入到信号槽中,真是一招鲜吃遍天 :man_shrugging: - 最后调用
join()
函数,把自己加入信道网络拓扑中。当然,一旦有新参与者加入,就会引起拓扑网络变化,会调用对应信道的节点的回调函数,最终让存在于该信道的Writer
得知有新的Reader
加入。
1 |
|
ServiceManage
ServiceManager
类相比其基类,多了一个 MultiValueWarehouse
和 SingleValueWarehouse
类型,分别为 servers_
、clients_
两个表。有新的 Server
或 Client
加入时,这两张表会更新,当然这些数据结构也提供了查询功能,判断某一个服务是否存在等。
总结
对于 TopologyManager
,其主要功能就是监视网络拓扑结构中是否有参与者加入或离开。主要的监听任务由 CreateParticipant()
函数中创建的 Participant
对象完成。它包含 host name 和 process id ,还有监听器 ParticipantListener
,本质是一个网络变化后就执行的回调函数。当网络拓扑发生变化时,从底层 Fast RTPS 传上来的信息 ,会先在 Convert()
函数中被转换成 Cyber RT 中的数据结构 ChangeMsg
,然后,监听器会执行 OnParticipantChange()
,三个子管理器就开始更新网络信息。