Apollo Cyber RT 服务与服务发现
Apollo Cyber 服务与服务发现
前言
现在我们开始介绍 Cyber RT 通信部分的最后一块内容——服务发现。在 Cyber RT 通信(上)中,我说过,Cyber RT 支持两种通信方式:
- 发布—订阅通信方式(Publish-Subscribe),也叫基于信道的通信方式
- 服务—客户通信方式(Service-Client),也叫基于服务的通信方式
在 Cyber RT 通信(上)和 Cyber RT 通信(下)中,我已经详细地介绍了发布—订阅通信方式,之所以先对该发布—订阅方式进行介绍,是因为该方式使用的场景比较多。考虑到对系统认识的完整性以及后面的研究工作,我觉得还是有必要说一下服务—客户通信方式。
与发布—订阅方式最大的不同是,服务—客户通信方式需要两个节点之间完成请求(Request)和应答(Response)才可完成通信,常用于节点之间双向通信的场景。
回顾在Cyber RT 通信(上)讲过的底层通信方式,一共有如下三种
- 同一进程内。在同一个进程节点之间的相互通信,对于进程内的数据,直接传递消息对象的指针,避免消息数据复制的开销
- 同主机进程间。在不同进程之间的节点传播信息,可以利用共享内存传输,减少传输中的数据复制,显著提升传输效率,并满足一对多的传输场景
- 跨主机。跨主机的数据利用 socket 传输,跨主机通信采用了第三方的开源库 Fast RTPS(Real Time Publish Subscribe Protocol,实时发布订阅协议),是 DDS(Data Distribution Service)标准的一个非常流行的开源实现,支持 RTPS 协议版本的一个订阅发布消息组件,具有高性能,实时性高,多平台支持等优点
在服务—客户通信方式中,使用的是第三种通信渠道—— Fast RTPS ,之后会进行介绍。
从 NodeServiceImpl 说起
如果你看过我之前写的博客,就应该知道 Node 类通过 NodeServiceImpl 类来创建 Service 和 Client ,我们以 Service 类为例,其创建过程如下:
- 直接创建
Service对象,并进行初始化Init() - 将创建好的
Service指针放入到数组serivce_list_中,并注册名字获得 id - 调用服务发现的拓扑管理类
service_discovery::TopologyManager中的Join()函数,将其加入到整个服务发现的拓扑结构中
1 | |
创建 Client 的过程与 Service 极为相似,这里不过多介绍。接下来,我们不妨从以上三步出发,仔细盘一盘 Service 和 Client 类的实现过程,进而理解一下服务发现的功能。
Service & Client
Service
先来看看它的构造函数以及它们的成员。Service 模板类继承了 ServiceBase 类,此类只包含了一个服务名称,算是一个实现 Service 类的接口。在构造函数中,Service 接收三个参数,一个是所在节点的名称,一个是通信服务的名称,最后一个是服务的回调函数,在接收到消息后执行。关于 Service 具体的成员,还请仔细看一下下面的代码:point_down:,为了简洁,我删去了不少多线程相关的变量。
1 | |
啊哈,在上篇博客我说过,Reader 和 Writer 的底层实现是 Receiver 和 Transmitter ,从包含类的成员来看,Service 和 Client 的底层也是靠 Receiver 和 Transmitter 实现的。所以说,我之前博客里大书特书的订阅—发布通信模式,其底层的通信办法与服务—客户通信方式还是一致的。Reader 创建时用的信道,和这里 Service 中的“信道”,其实本质是一样的,它们都是字符串,在系统中充当消息传递的“桥梁”,当然为作区分以及避免误解,我在 Service 这边的“信道”打个引号。同时,Service 这边有线程也有任务队列,意味着消息会在 Service 内部得到处理,不需要理会调度器,也不需要使用那边的协程。
再来瞧一瞧 Service::Init() 函数,它在构造函数完成后,立刻被调用执行。因为代码有点长,我将不重要的部分略去,突出重点。
1 | |
仔细梳理一下,Init() 函数一共做了:
-
判断是否已经初始化,若没有,继续往下做,将 RoleAttribute 的值填充好,略去不说
-
获得
Transport全局对象(该类我已经在这里讨论过了),并创建Transmitter对象1
CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);注意到,这边的通信渠道选择的是 RTPS,意味着在服务—客户通信方式中,底层使用的都是 Fast RTPS 协议进行通信。
-
创建请求回调函数。请求回调函数,与
HandleRequest函数做了绑定,我猜测Service在接收到服务请求后,就会调用这个函数进行处理。1
2std::bind(&Service<Request, Response>::HandleRequest, this,
std::placeholders::_1, std::placeholders::_2); -
最复杂的来了,创建
Receiver对象,这部分我在这里也讨论过了。需要注意的是传入的MessageListener,即那个复杂的 Lambda 表达式,其内部又套了一个 Lambda 表达式。MessageListerner这个回调函数会在Receiver接收到新消息后被调用。其动作是:建立一个函数task,task函数调用HandleRequest()函数对消息进行处理,然后将task入队列。简单地说,就是每当一个新消息到来,就会创建一个针对该消息的处理函数,然后把它放入到任务队列中。 -
创建线程,开始运行
Process()函数。
很自然地,你会好奇 Process() 函数是如何运作的,其实很简单,就是维护了一个先进先出的任务队列。程序中的条件变量会不断地检查 !inited_ || !this->tasks_.empty() 这个布尔值,一旦为真,那就 unblock 自己,从任务队列的最前端取出任务并运行。
1 | |
最后,HandleRequest() 函数要做什么呢?Service 是如何处理接收到的请求呢?:point_down:在下面的代码中:
- 创建一个
Response对象,并执行service_callback_函数,如前所述,该函数在构造时就已经传入 - 用 copy 构造函数创建一个
MessageInfo,然后更新发送者的 id - 调用
SendResponse()函数
1 | |
SendResponse() 函数也是十分简单,它只是将 Response 对象通过 Transmit() 函数发送出去。这往后的技术细节我就不再讨论了,不过要注意的是,response_transmitter_ 的类型是 RtpsTransmitter ,如果你想往下研究的话,只需要看这个类的 Transmit() 函数就可以了。
1 | |
Client
Client 类的实现与 Service 类有很多相似的地方,但也有很多不同。Client 类继承了 ClientBase 类,与 ServiceBase 类似,ClientBase 也就是个接口。Client 类的成员与 Service 的有点镜像关系 :joy:,也有很多不同。
1 | |
与 Service 有明显不同的是,Client 多了 pending_requests_ 表,其类型也十分复杂:std::unordered_map<uint64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>>,它的含义是正在提交的请求,该成员与发出请求和接收应答的过程密切相关。
现在让我们先来看一下 Client::Init() 函数,代码有点长,因而略去了一些不重要的东西:
1 | |
Client::Init() 函数做的事情好像和 Service::Init() 的差不多,就简单说一下:
-
判断是否已经初始化,并将 RoleAttribute 的值填充好
-
通过
Transport类创建Transmitter对象,用于发送请求1
CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);注意到,这边的通信渠道依然是 RTPS
-
创建应答回调函数
response_callback_,并与HandleResponse()函数做了绑定 -
创建
Receiver对象,仍然要注意那个 Lambda 表达式,该函数直接调用了response_callback_回调函数。回顾我在这里讨论过的OnNewMessage()函数,Receiver会在接收到新的应答后立即调用response_callback_,这就是证明我所言的最直接的证据
从 Init() 函数中,可以了解到,Client 在接收到 Response 后,会调用 HandleResponse() 对 Response 进行处理,让我们快速过一遍:
- 检查请求是否由该
Client对象发出 - 从
MessageInfo中获得序列号。有趣的是,这个序列号实际上是Client自己给的,仔细看一下,MessageInfo发送到Service后,Service只改变了它的发送者 id,然后就立马将它塞入 Response 中传回 - 根据序列号,从
pending_request_获得消息,然后执行操作:将得到的应答消息放入std::promise中,然后调用回调函数,处理这个消息。如果你对 C++ 的多线程不太熟悉,建议猛戳此链接
1 | |
看完了应答消息的处理,再来看一下 Client 如何发送请求。在 SendRequest 函数中,变量 future 在发送了一个异步请求后,会等待一定时间(默认为 5 秒),期待接收到一个应答。这其中涉及到 AsyncSendRequest 函数,它先创建了一个 MessageInfo,然后将它和 Request 通过 Transmitter 发送出去,之后就是建立一个 tuple,把在“未来”会接收到的消息提前布局在了成员变量 pending_request_ 中。
1 | |
也许上面的只言片语让你感到困惑, pending_request_ 到底何方神圣?:point_up_2:综合之前所说的发布请求和应答接收过程,pending_request_ 表记录下了正在发送的请求。表中用序列号作为主键,而后面的值是一个 tuple,放入了 std::promise ,它用于放入“未来”接收到的 Response (在 HandleResponse() 中一清二楚),而最后面的 std::future 用于获取它,中间的 CallbackType 则是处理这个 Response 的回调函数。兵马未动粮草先行,在将请求发出后,Client 就准备好了这一系列东西,因而 HandleResponse() 在处理应答时就显得游刃有余,简洁明了:slightly_smiling_face:。当然,因为这是记录正在发送的请求,收到后一定要将它删去。
1 | |
服务发现
让我们回到最初的地方,在 NodeServiceImpl 类创建 Service 完后,服务发现的拓扑管理类 service_discovery::TopologyManager 中的 Join() 函数,会将刚创建的 Service 加入到整个服务发现的拓扑结构中,而在早先时候,我们就遇到过,在 Reader 和 Writer 初始化的最后,会调用到 JoinTheTopology 函数。这些都和服务发现相关,接下来我们就重点聊一聊这部分内容。
先明确,服务发现不负责具体消息的传递(这些我们已经在之前讲的很清楚了),主要负责监测 cyber RT 中通信节点的情况,并且处理新节点加入或旧节点退出等操作,在其中起主要作用负责的就是 TopologyManager 。
网络拓扑结构是如何来的?事实上就是节点、信道、读者、写者、服务、客户之间的通信关系的抽象表达。在之前的博客里,我说过 Node 类包含了 Server、Client、Writer、Reader 这四类,而它们可以看做是网络中的顶点,信道就是 Writer 到 Reader 的有向边(因为信息只能从 Writer 传到 Reader ),Service 是 Server 与 Client 的边。Cyber RT 中把 Writer 和 Server 称作 Upstream,另外两个是 Downstream。
TopologyManager
TopologyManager 是个单例,这很明显,因为系统中只需要有一个“监管员”来负责监控网络拓扑就足够了。TopologyManager 有三个子管理器,并有共同的基类 Manager。它们分别为:
- NodeManager 用于管理网络拓扑中的节点
- ChannelManager 用于管理信道以及 Reader 和 Writer
- ServiceManager 用于管理 Service 和 Client
除此之外,它还有其他的成员 :point_down:。
participant_,意为通信网络中使用 RTPS 的参与者,不要忘了虽然TopologyManager监管整个网络,但它也使用 RTPS ,也是其中的一员,该变量就是指TopologyManager自身。participant_listener_会在之后介绍change_signal_类似于 Qt 中的信号槽机制,通过调用AddChangeListener()函数, 连接(connect)了相应的监听器,一有风吹草动,就会立马调用它participant_names_目前网络中的参与者 rtps::id 与参与者名称对应的表格
1 | |
TopologyManager 的初始化非常简单,就是先创建参与者 Participant ,然后初始化 NodeManager、ChannelManager 和 ServiceManager。先说说 Participant 的创建:
1 | |
看起来很复杂,其实很简单,Participant 的名字从全局变量中的 HostName() 和 ProcessId() 获得。然后就是创建监听器 participant_listener_,它与 OnParticipantChange 绑定,在参与者发生变化时被调用,设置监听的端口为 11511,进而构造出 Participant 对象,该类牵扯的技术细节比较繁琐,我在此不进一步介绍。
我们关心的是,在参与者发生了变化后(加入、离去),TopologyManager 会做出何种反应?请允许我隆重介绍一下基于 Fast RTPS 的发现机制2。之所以这么起名,是因为这层拓扑监控主要是通过 Fast RTPS 提供的自动发现机制。若进程意外退出,则要将各管理类中相应信息进行更新。优点在于,如果进程出错或设备断开,该机制也可以工作,但粒度比较粗,且不是非常及时(比如断开时)。对于该机制的内容,都在 OnParticipantChange() 函数里了。
1 | |
先来看第一步:TopologyManager::Convert() 函数。就是一个数据类型的转换函数,它将 Fast RTPS 中的 ParticipantDiscoveryInfo 也就是参与者的变动信息,包装为 Cyber RT 中所需要的信息类型 ChangeMsg,同时根据消息的类型(加入还是离去),更新 participant_names_ 表格。也就是说,现在主要信息源从 info 转移到了 msg 中。
第二步,如果检查发现该参与者要离去,那么 OnParticipantChange() 会通知三个通信网络的管理器,即调用 OnTopoModuleLeave() 函数,让各自的管理者把将离去的参与者排除在外。而加入的时候就不需要考虑,因为这些参与者在创建时,就会调用相应的函数加入到这个网络中。
第三步,就是这句话 change_signal_(msg),change_signal_ 的本质就是若干个回调函数组成的列表。这句话就是在调用:point_down:这个函数(为简单,去掉了加锁),将所有的、调用过 AddChangeListener() 函数后与之 connect 的回调函数全部执行,相当于是监听器在监听到变化后,开始工作了。
1 | |
好,TopologyManager 的功能基本就说完了,接下来让我们看看它的三个成员。
Manager
我承认这部分内容对了解整个 Apollo 系统有帮助,但这些内容也有点偏离我所在的课题组最初的任务目标,而且考虑到这里面细节繁多,想要逐一了解完全会花费许多精力,但缺少这部分内容又显得不太完整,因此我打算对这部分内容作一个简短的说明,部分解释会不到位甚至出现错误,也请理解。如果想进一步了解,可以参考一下文后的参考链接。
先来说说这三者的基类——Manager 。Manager 类的成员主要就是一个 Fast RTPS 的 publisher_ 和一个 subscriber_ ,订阅者对应的回调函数 listener_ 和 signal_ 信号槽。
1 | |
在 TopologyManager 初始化时,我说过 NodeManager 、ChannelManager 和 ServiceManager 在这时被创建并初始化的。
在 CreateSubscriber() 创建订阅者时,listener_ 绑定了 Manager::OnRemoteChange() 函数。而 OnRemoteChange 函数会调用 Dispose() 来对这些远处传来的消息进行处理。每当通信网络中有参与者加入或离开拓扑结构时,程序会调用 Manager::Join() 或 Manager::Leave(),这两个函数会通过 RTPS 底层库发布相应的消息,然后订阅者的回调函数 listener_ 就被调用了。
与 TopologyManager 类一样,有一类似于 Qt 的信号槽机制,signal_ ——有若干个注册过的回调函数。该信号在 Manager::Notify() 时就会调用,执行内部所有的回调函数(与 TopologyManager 那一节所描述的一样)。与你想的一样,注册槽的函数是 Manager::AddChangeListener()。
NodeManager
相比基类,NodeManager 多了一个SingleValueWarehouse 类型的成员。SingleValueWarehouse 类,实质就是一个更高级的线程安全 std::unordered_map 表 。加入和离开拓扑,就是向表中 Add() 或 Remove() 节点。而对于 NodeManager::Dispose() 函数,它会根据消息种类来向表中加入或删除节点,然后调用 Notify() 函数,通知(执行)等待该信号槽的回调函数。
我们知道,NodeManager 类管理 Node 网络拓扑。那么它具体体现在哪里呢?令我惊讶的是,他居然在 NodeChannelImpl 类中。当 Node 被创建或销毁的时候,进行Join() 或 Leave() 操作。Node 网络主要是监察所有节点的存活,并提供查询接口。
1 | |
ChannelManager
相比基类,ChannelManager 多了四个MultiValueWarehouse 类型(其本质是线程安全的 std::unordered_multimap)的成员,还有一个 Graph 类。它们分别记录读写者间的关系及它们形成的网络图结构,并分别以 node id 和 channel id 为主键来保存读写者。其他过程与 NodeManager 类似,保存的信息和索引方式略有不同。
ChannelManager 类管理 Reader 和 Writer 的信道拓扑网络。Reader 和 Writer 对象在初始化时会调用 JoinTheTopology() 函数加入拓扑结构,而在它们被 Shutdown() 时调用 LeaveTheTopology() 函数离开拓扑。以 Reader 的情况为例:
JoinTheTopology()函数会先把Reader & Writer::OnChannelChange()注册到ChannelManager的信号槽中,这样当信道拓扑结构发生变化时,回调函数OnChannelChange()就会被执行,Reader就会得知信道已经改变。- 获取信道中所有的
Writer,并调整自己的receiver_,让这些写者发出的信息能被Reader获取。Enable()函数实际上也是让某一回调函数加入到信号槽中,真是一招鲜吃遍天 :man_shrugging: - 最后调用
join()函数,把自己加入信道网络拓扑中。当然,一旦有新参与者加入,就会引起拓扑网络变化,会调用对应信道的节点的回调函数,最终让存在于该信道的Writer得知有新的Reader加入。
1 | |
ServiceManage
ServiceManager 类相比其基类,多了一个 MultiValueWarehouse 和 SingleValueWarehouse 类型,分别为 servers_、clients_ 两个表。有新的 Server 或 Client 加入时,这两张表会更新,当然这些数据结构也提供了查询功能,判断某一个服务是否存在等。
总结
对于 TopologyManager ,其主要功能就是监视网络拓扑结构中是否有参与者加入或离开。主要的监听任务由 CreateParticipant() 函数中创建的 Participant 对象完成。它包含 host name 和 process id ,还有监听器 ParticipantListener ,本质是一个网络变化后就执行的回调函数。当网络拓扑发生变化时,从底层 Fast RTPS 传上来的信息 ,会先在 Convert() 函数中被转换成 Cyber RT 中的数据结构 ChangeMsg,然后,监听器会执行 OnParticipantChange() ,三个子管理器就开始更新网络信息。