对象与数据结构
下面是Fast-RTPS实现中的核心结构。
Publish-Subscriber模块
RTPS标准的高层类型。
- Domain:用来创建,管理和销毁Participants。
- Participant:包括Publisher和Subscriber,并管理它们的配置。
- ParticipantAttributes:创建Participant的配置参数。
- ParticipantListener:可以让开发者实现Participant的回调函数。
- Publisher:在Topic上发布数据的对象。
- PublisherAttributes:创建Publisher的配置参数。
- PublisherListener:可以让开发者实现Publisher的回调函数。
- Subscriber:在Topic上接受数据的对象。
- SubscriberAttributes:创建Subscriber的配置参数。
- SubscriberListener:可以让开发者实现Subscriber的回调函数。
RTPS模块
RTPS的底层模型。包含下面几个子模块:
- RTPS Common
- CacheChange_t:描述Topic上的变更,存储在历史Cache中。
- Data:Cache变化的负载。
- Message:RTPS消息。
- Header:RTPS协议的头信息。
- Sub-Message Header:标识RTPS的订阅消息。
- MessageReceiver:反序列化和处理接受到的RTPS消息。
- RTPSMessageCreator:构建RTPS消息。
- RTPS Domain
- RTPSDomain:用来创建,管理和销毁底层的RTPSParticipants。
- RTPSParticipant:包括Writer和Reader。
- RTPS Reader
- RTPSReader:读者的基类。
- ReaderAttributes:包含RTPS读者的配置参数。
- ReaderHistory:存储Topic变化的历史数据。
- ReaderListener:读者的回调类型。
- RTPS Writer
- RTPSWriter:写者的基类。
- WriterAttributes:包含RTPS写者的配置参数。
- WriterHistory:存储写者的历史数据。
配置Attributes
上面的数据结构中看到了许多Attributes
后缀的类名。这些类包含了对协议或者对象的配置参数,很多特性都需要设置这些属性来完成。
这些类的定义基本都位于下面三个文件夹中:
- include/fastrtps/attributes
- include/fastrtps/rtps/attributes
- include/fastdds/rtps/attributes
Fast RTPS支持非常多的配置参数,并且参数的结构常常是嵌套的。
通过代码去配置这些参数会产生很多啰嗦的代码,而且最大的问题在于:每次更改配置参数都需要重新编译。这个问题并非Fast RTPS才有,只要包含大量配置参数的软件都会这样的问题。通常的解决方法就是:提供文本格式的配置文件的方式来配置参数。因此对于Fast-RTPS来说,除了支持通过代码配置参数,它也支持通过XML文件的方式来进行配置。
有了配置文件之后,在代码中直接读取就好了,例如:
Participant*participant=Domain::createParticipant("participant_xml_profile");
在这之后,如果需要调整配置,只需要修改配置文件,不用在改动代码,自然也不用重新编译。这对于项目部署是很重要的。
Fast-RTPS支持的配置项,以及这些配置项说明和默认值都可以到这个链接中查看:XML profiles。
Domain
RTPS中的通信参数者之间,通过Domain进行隔离。
同一时刻可能有多个Domain同时存在,一个Domain中可以包含任意数目的消息发送者和接受者。
其结构如下图所示:
开发者可以通过domainId来指定参与者所属Domain。
如果没有指定,默认的domainId = 80
。
发现
作为DDS的实现,Fast-RTPS提供了Publisher和Subscriber自动发现和匹配的功能。在实现上,这分为两个步骤来完成:
- Participant Discovery Phase (PDP):在这个阶段,参与者互相通知彼此的存在。为了达到这个目的,每个参与者需要定时发送公告消息。公告消息通过周知的多播地址和端口发送(根据domain计算得到)。
- Endpoint Discovery Phase (EDP):在这个阶段,Publisher和Subscriber互相确认。为此,参与者使用在PDP期间建立的通信通道,彼此共享有关其发布者和订阅者的信息。 该信息包含了Topic和数据类型。为了使两个端点匹配,它们的Topic和数据类型必须一致。 一旦发布者和订阅者匹配,他们就发送/接收数据了。
这两个阶段对应了两个独立的协议:
- Simple Participant Discovery Protocol:指定参与者如何在网络中发现彼此。
- Simple Endpoint Discovery Protocol:定义了已经互相发现的参与者交换信息的协议。
Fast-RTPS提供了四种发现机制:
- Simple:这是默认机制。它在PDP和EDP阶段均使用RTPS标准,因此可与任何其他DDS和RTPS实现兼容。
- Static:此机制在PDP阶段使用Simple Participant Discovery Protocol。如果所有发布者和订阅者的地址以及端口和主题信息是事先知道的,则允许跳过EDP阶段。
- Server-Client:这种发现机制使用集中式发现结构,由服务器充当发现机制的Hub。
- Manual:此机制仅与RTPSDomain层兼容。它禁用了PDP阶段,使用户可以使用其选择的任何外部元信息通道手动匹配和取消匹配RTPS参与者,读者和写者。
不同的发现机制具有一些共同的配置:
名称 | 描述 | 默认值 |
Ignore Participant flags | 在必要的时候,可以选择忽略一些参与者。 例如:另一台主机上的,另一个进程的或者同一个进程的。 |
NO_FILTER |
Lease Duration | 指定远程参与者在多少时间内认为本地参与者还活着。 | 20 s |
Announcement Period | 指定参与者的PDP公告消息的周期。 | 3s |
关于发现机制的更多信息可以浏览这个链接:Discovery。
传输控制
Fast-RTPS实现了可插拔的传输架构,这意味着每一个参与者可以随时加入和退出。
在传输上,Fast-RTPS支持以下五种传输方式:
- UDPv4
- UDPv6
- TCPv4
- TCPv6
- SHM(Shared Memory)
默认的,当Participant
创建时,会自动的配置两个传输通道:
- SHM:用来与同一个机器上的参与者通信。
- UDPv4:同来与跨机器的参与者通信。
当然,开发者可以改变这个默认行为,通过C++接口或者XML配置文件都可以。
SHM要求所有参与者位于同一个系统上,它是借助了操作系统提供的共享内存机制实现。共享内存的好处是:支持大数据传输,减少了数据拷贝,并且也减少系统负载。因此通常情况下,使用SHM会获得更好的性能。使用SHM时,可以配置共享内存的大小。
网络通信包含了非常多的参数需要配置,例如:Buffer大小,端口号,超时时间等等。框架本身为参数设置了默认值,大部分情况下开发者不用调整它们。但是知道这些默认值是什么,在一些情况下可能会对分析问题有所帮助。关于这些配置的默认值,以及如果配置可以查看这个链接:Transport descriptors。
与UDP不同,TCP传输是面向连接的,因此,Fast-RTPS必须在发送RTPS消息之前建立TCP连接。TCP传输可以具有两种行为:充当TCP服务器或充当TCP客户端。服务器打开一个TCP端口以侦听传入的连接,然后客户端尝试连接到服务器。服务器和客户端的概念独立于RTPS概念,例如:Publisher,Subscriber,Reader或Writer。它们中的任何一个都可以用作TCP服务器或TCP客户端,因为这些实体仅用于建立TCP连接,而RTPS协议可以在该TCP连接上工作。
如果要使用TCP传输,开发者需要做更多的配置,关于这部分内容可以继续阅读官方文档,这里不再赘述。
FastRTPS代码示例
FastRTPS不仅有框架文档,API Reference,还有丰富的代码示例。
对于开发者来说,浏览这些代码可能是上手最快捷的方法。
你可以在这里浏览这些示例:Fast-RTPS/examples/C++/。
FASTRTPSGEN
FASTRTPSGEN是一个Java程序。用来为在Topic上传输的数据类型生成源码。
开发者通过接口描述语言(Interface Definition Language)定义需要传输的数据类型。然后通过FASTRTPSGEN生成C++编译需要的源文件。
可以通过下面的方法获取和编译FASTRTPSGEN。
git clone --recursive https://github.com/eProsima/Fast-RTPS-Gen.git cd Fast-RTPS-Gen gradle assemble
编译完成之后可执行文件位于./scripts/
目录。如果需要,可以将该路径添加到$PATH
变量中。
关于如何通过IDL定义数据类型请参见这里:Defining a data type via IDL。
以下面这个示例文件为例:
struct TestData { char char_type; octet octet_type; long long_type; string string_type; float float_array[4]; sequence<double> double_list; };
我们将其保存到文件名为data_type.idl
的文件中。然后通过下面这条命令生成C++文件:
~/Fast-RTPS-Gen/scripts/fastrtpsgen data_type.idl
最后会得到下面四个文件:
data_type.cxx data_type.h data_typePubSubTypes.cxx data_typePubSubTypes.h
前两个文件定义的是实际存储数据的结构,后两个文件定义的类是eprosima::fastrtps::TopicDataType
的子类。用来在参与者上注册类型:
/** * Register a type in a participant. * @param part Pointer to the Participant. * @param type Pointer to the Type. * @return True if correctly registered. */ RTPS_DllAPIstaticboolregisterType( Participant*part, fastdds::dds::TopicDataType*type);
每一套通信系统中通常都会包含一个或多个自定义的数据类型。
发布者-订阅者层
可以通过 HelloWorldExample 来熟悉发布者-订阅者层接口。
该目录下文件列表如下:
-rw-r--r-- 1 paul staff 1.8K 3 16 13:36 CMakeLists.txt -rw-r--r-- 1 paul staff 2.8K 3 16 13:36 HelloWorld.cxx -rw-r--r-- 1 paul staff 6.1K 3 16 13:36 HelloWorld.h -rw-r--r-- 1 paul staff 62B 3 16 13:36 HelloWorld.idl -rw-r--r-- 1 paul staff 4.4K 3 16 13:36 HelloWorldPubSubTypes.cxx -rw-r--r-- 1 paul staff 1.7K 3 16 13:36 HelloWorldPubSubTypes.h -rw-r--r-- 1 paul staff 4.6K 3 16 13:36 HelloWorldPublisher.cpp -rw-r--r-- 1 paul staff 1.7K 3 16 13:36 HelloWorldPublisher.h -rw-r--r-- 1 paul staff 3.8K 3 16 13:36 HelloWorldSubscriber.cpp -rw-r--r-- 1 paul staff 1.8K 3 16 13:36 HelloWorldSubscriber.h -rw-r--r-- 1 paul staff 2.0K 3 16 13:36 HelloWorld_main.cpp -rw-r--r-- 1 paul staff 3.1K 3 16 13:36 Makefile -rw-r--r-- 1 paul staff 203B 3 16 13:36 README.txt
这其中:
- README.txt是工程说明
- CMakeLists.txt与Makefile是编译文件
- HelloWorld_main.cpp包含了生成可执行文件的
main
函数 - HelloWorld.idl是待传输的数据结构定义
- HelloWorld.h,HelloWorld.cxx,HelloWorldPubSubTypes.h和HelloWorldPubSubTypes.cxx是由HelloWorld.idl文件生成
- HelloWorldPublisher.h和HelloWorldPublisher.cpp是发布者的实现
- HelloWorldSubscriber.h和HelloWorldSubscriber.cpp是订阅者的实现
熟悉一个C++工程可以先从main
入手,HelloWorld_main.cpp中的主要逻辑就是根据用户输入的参数是"publisher"
还是"subscriber"
来确定启动哪个模块。
switch(type) { case1: { HelloWorldPublishermypub; if(mypub.init()) { mypub.run(count,sleep); } break; } case2: { HelloWorldSubscribermysub; if(mysub.init()) { mysub.run(); } break; } }
接下来我们直接看HelloWorldPublisher和HelloWorldSubscriber就好。
HelloWorldPublisher::init
中主要是为Publisher的对象设置参数:
boolHelloWorldPublisher::init() { m_Hello.index(0); m_Hello.message("HelloWorld"); ParticipantAttributesPParam; PParam.rtps.builtin.discovery_config.discoveryProtocol=DiscoveryProtocol_t::SIMPLE; PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol=true; PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter=true; PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader=true; PParam.rtps.builtin.domainId=0; PParam.rtps.builtin.discovery_config.leaseDuration=c_TimeInfinite; PParam.rtps.setName("Participant_pub"); mp_participant=Domain::createParticipant(PParam); <span class="k">if</span><span class="p">(</span><span class="n">mp_participant</span><span class="o">==</span><span class="nb">nullptr</span><span class="p">)</span> <span class="k">return</span> <span class="nb">false</span><span class="p">;</span> <span class="c1">//REGISTER THE TYPE</span> <span class="n">Domain</span><span class="o">::</span><span class="n">registerType</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="o">&</span><span class="n">m_type</span><span class="p">);</span> <span class="c1">//CREATE THE PUBLISHER</span> <span class="n">PublisherAttributes</span> <span class="n">Wparam</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicKind</span> <span class="o">=</span> <span class="n">NO_KEY</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicDataType</span> <span class="o">=</span> <span class="s">"HelloWorld"</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicName</span> <span class="o">=</span> <span class="s">"HelloWorldTopic"</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">KEEP_LAST_HISTORY_QOS</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">depth</span> <span class="o">=</span> <span class="mi">30</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">max_samples</span> <span class="o">=</span> <span class="mi">50</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">allocated_samples</span> <span class="o">=</span> <span class="mi">20</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">times</span><span class="p">.</span><span class="n">heartbeatPeriod</span><span class="p">.</span><span class="n">seconds</span> <span class="o">=</span> <span class="mi">2</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">times</span><span class="p">.</span><span class="n">heartbeatPeriod</span><span class="p">.</span><span class="n">nanosec</span> <span class="o">=</span> <span class="mi">200</span><span class="o">*</span><span class="mi">1000</span><span class="o">*</span><span class="mi">1000</span><span class="p">;</span> <span class="n">Wparam</span><span class="p">.</span><span class="n">qos</span><span class="p">.</span><span class="n">m_reliability</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">RELIABLE_RELIABILITY_QOS</span><span class="p">;</span> <span class="n">mp_publisher</span> <span class="o">=</span> <span class="n">Domain</span><span class="o">::</span><span class="n">createPublisher</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="n">Wparam</span><span class="p">,(</span><span class="n">PublisherListener</span><span class="o">*</span><span class="p">)</span><span class="o">&</span><span class="n">m_listener</span><span class="p">);</span> <span class="k">if</span><span class="p">(</span><span class="n">mp_publisher</span> <span class="o">==</span> <span class="nb">nullptr</span><span class="p">)</span> <span class="k">return</span> <span class="nb">false</span><span class="p">;</span> <span class="k">return</span> <span class="nb">true</span><span class="p">;</span> }
这里的参数配置请参阅API说明:
- RTPSParticipantAttributes
- BuiltinAttributes
- PublisherAttributes
- TopicAttributes
- WriterQos
- WriterTimes
- WriterQos
Publisher发送消息的逻辑很简单:
boolHelloWorldPublisher::publish(boolwaitForListener) { if(m_listener.firstConnected||!waitForListener||m_listener.n_matched>0) { m_Hello.index(m_Hello.index()+1); mp_publisher->write((void*)&m_Hello); returntrue; } returnfalse; }
注意,这里write
的对象是通过idl文件生成的类型。
Subscriber的初始化和Publisher是类似的:
boolHelloWorldSubscriber::init() { ParticipantAttributesPParam; PParam.rtps.builtin.discovery_config.discoveryProtocol=DiscoveryProtocol_t::SIMPLE; PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol=true; PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter=true; PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader=true; PParam.rtps.builtin.domainId=0; PParam.rtps.builtin.discovery_config.leaseDuration=c_TimeInfinite; PParam.rtps.setName("Participant_sub"); mp_participant=Domain::createParticipant(PParam); if(mp_participant==nullptr) returnfalse; <span class="c1">//REGISTER THE TYPE</span> <span class="n">Domain</span><span class="o">::</span><span class="n">registerType</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="o">&</span><span class="n">m_type</span><span class="p">);</span> <span class="c1">//CREATE THE SUBSCRIBER</span> <span class="n">SubscriberAttributes</span> <span class="n">Rparam</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicKind</span> <span class="o">=</span> <span class="n">NO_KEY</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicDataType</span> <span class="o">=</span> <span class="s">"HelloWorld"</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicName</span> <span class="o">=</span> <span class="s">"HelloWorldTopic"</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">KEEP_LAST_HISTORY_QOS</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">depth</span> <span class="o">=</span> <span class="mi">30</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">max_samples</span> <span class="o">=</span> <span class="mi">50</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">allocated_samples</span> <span class="o">=</span> <span class="mi">20</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">qos</span><span class="p">.</span><span class="n">m_reliability</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">RELIABLE_RELIABILITY_QOS</span><span class="p">;</span> <span class="n">Rparam</span><span class="p">.</span><span class="n">qos</span><span class="p">.</span><span class="n">m_durability</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">TRANSIENT_LOCAL_DURABILITY_QOS</span><span class="p">;</span> <span class="n">mp_subscriber</span> <span class="o">=</span> <span class="n">Domain</span><span class="o">::</span><span class="n">createSubscriber</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="n">Rparam</span><span class="p">,(</span><span class="n">SubscriberListener</span><span class="o">*</span><span class="p">)</span><span class="o">&</span><span class="n">m_listener</span><span class="p">);</span> <span class="k">if</span><span class="p">(</span><span class="n">mp_subscriber</span> <span class="o">==</span> <span class="nb">nullptr</span><span class="p">)</span> <span class="k">return</span> <span class="nb">false</span><span class="p">;</span> <span class="k">return</span> <span class="nb">true</span><span class="p">;</span> }
当然,Subscriber有自己的配置参数类型:
- SubscriberAttributes
- ReaderQos
- ReaderTimes
需要注意的是,Subscriber与Publisher的通信是建立在Topic上的,因此对于Topic标识的配置要保持一致:
Wparam.topic.topicDataType="HelloWorld"; Wparam.topic.topicName="HelloWorldTopic";
有了Topic的这个抽象概念,使得Subscriber与Publisher不用物理地址上有任何关联,也屏蔽了硬件和操作系统的差异:同样的代码,其编译产物可以一个跑在x86的Mac系统上,一个跑在ARM架构的Android设备上。
Subscriber通过void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)
方法来处理接受到的数据。在示例的实现中,就是将消息体打印出来:
voidHelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber*sub) { if(sub->takeNextData((void*)&m_Hello,&m_info)) { if(m_info.sampleKind==ALIVE) { this->n_samples++; // Print your structure data here. std::cout<<"Message "<<m_Hello.message()<<" "<<m_Hello.index()<<" RECEIVED"<<std::endl; } } }
Publisher与Subscriber各自有一些回调,开发者可以利用它们来进行需要的处理:
回调 | Publisher | Subscriber |
onNewDataMessage | - | √ |
onSubscriptionMatched | - | √ |
on_requested_deadline_missed | - | √ |
on_liveliness_changed | - | √ |
onPublicationMatched | √ | - |
on_offered_deadline_missed | √ | - |
on_liveliness_lost | √ | - |