读者-写者层
读者-写者层是相对于发布者-订阅者层更底层的API。
它提供了更多的控制,但也意味着使用起来会稍微麻烦一些。
两个层次在几个核心概念上存在一一对应的关系,如下表所示:
Publisher-Subscriber Layer | Writer-Reader Layer |
Domain | RTPSDomain |
Participant | RTPSParticipant |
Publisher | RTPSWriter |
Subscriber | RTPSReader |
如果你浏览Fast-RTPS的源码,你会发现其实发布者-订阅者层的实现就是依赖读者-写者层的。
想要很快的熟悉读者-写者层的使用可以浏览下面三个代码示例:
- RTPSTest_registered
- RTPSTest_persistent
- RTPSTest_as_socket
RTPSParticipant,RTPSWriter和RTPSReader都通过RTPSDomain创建。
相对于发布者-订阅层不一样的是,这一层不支持通过XML的形式配置参数。开发者必须通过代码的形式配置所有的参数,例如:
//CREATE PARTICIPANT RTPSParticipantAttributesPParam; PParam.builtin.discovery_config.discoveryProtocol=eprosima::fastrtps::rtps::DiscoveryProtocol::SIMPLE; PParam.builtin.use_WriterLivelinessProtocol=true; mp_participant=RTPSDomain::createParticipant(PParam); if(mp_participant==nullptr) returnfalse; //CREATE WRITERHISTORY HistoryAttributes hatt; hatt.payloadMaxSize = 255; hatt.maximumReservedCaches = 50; mp_history = new WriterHistory(hatt); //CREATE WRITER WriterAttributeswatt; watt.endpoint.reliabilityKind=BEST_EFFORT; mp_writer=RTPSDomain::createRTPSWriter(mp_participant,watt,mp_history,&m_listener);
这里的逻辑主要就是设置参数和创建RTPSParticipant,RTPSWriter对象。并且,RTPSParticipant将被用来注册RTPSWriter:
TopicAttributesTatt; Tatt.topicKind=NO_KEY; Tatt.topicDataType="string"; Tatt.topicName="exampleTopic"; ReaderQosRqos; returnmp_participant->registerReader(mp_reader,Tatt,Rqos);
在RTPS协议中,Reader和Writer将有关Topic的数据保存在其关联的历史记录中。每个数据段都由一个变更表示,对应的实现是CacheChange_t
。
更改通过历史记录管理。读者和写者的历史是两种类型:
eprosima::fastrtps::rtps::WriterHistory
;eprosima::fastrtps::rtps::ReaderHistory
;
对于Writer来说,发送消息是往历史中添加变更:
//Request a change from the history CacheChange_t*change=writer->new_change([]()->uint32_t{return255;},ALIVE); //Write serialized data into the change change->serializedPayload.length=sprintf((char*)change->serializedPayload.data,"My example string %d",2)+1; //Insert change back into the history. The Writer takes care of the rest. history->add_change(change);
而对于Reader来说,新消息会被放入到历史中,读取完了可以将其删除:
voidTestReaderRegistered::MyListener::onNewCacheChangeAdded( RTPSReader*reader, constCacheChange_t*constchange) { printf("Received: %s\n",change->serializedPayload.data); reader->getHistory()->remove_change((CacheChange_t*)change); n_received++; }
框架会根据消息触发Reader的回调。
持久化
默认情况下,Writer的历史在其生命周期以内可以被Reader访问。这意味着,一旦Writer退出,则其历史就没有了。但如果需要,你可以配置持久化,这使得即便Writer重启了,仍然可以维护早先的历史。
使用持久化功能可以保护端点的状态免受意外故障的影响,因为端点在重新启动后仍会继续通信,就像它们刚从网络断开连接一样。
你可以通过RTPSTest_persistent这个示例来了解如何使用这个功能。
要使用持久化功能,Writer和Reader需要进行以下设置:
durabilityKind
设置为TRANSIENT
persistence_guid
不能是全0- 为Writer,Reader或者RTPSParticipant设置持久化插件。目前内置的插件是SQLITE3。
下面是一段代码示例:
PropertyPolicyproperty_policy; property_policy.properties().emplace_back("dds.persistence.plugin","builtin.SQLITE3"); property_policy.properties().emplace_back("dds.persistence.sqlite3.filename","test.db"); //CREATE WRITER WriterAttributes watt; watt.endpoint.reliabilityKind = BEST_EFFORT; watt.endpoint.durabilityKind = TRANSIENT; watt.endpoint.persistence_guid.guidPrefix.value[11] = 1; watt.endpoint.persistence_guid.entityId.value[3] = 1; watt.endpoint.properties = property_policy; mp_writer=RTPSDomain::createRTPSWriter(mp_participant,watt,mp_history,&m_listener);
durabilityKind
参数定义了Writer与新Reader匹配时对于已发送的数据的行为,该参数有三个选项:
- VOLATILE(默认值):丢掉所有已经发送的数据。
- TRANSIENT_LOCAL:保存最近发送的k条数据。
- TRANSIENT:与TRANSIENT_LOCAL类似,但是还将消息保存到持久化存储中。这就使得即便它的进程异常退出了,其数据不会丢失。
对于读者来说,其配置方法是类似的:
PropertyPolicyproperty_policy; property_policy.properties().emplace_back("dds.persistence.plugin","builtin.SQLITE3"); property_policy.properties().emplace_back("dds.persistence.sqlite3.filename","test.db"); //CREATE READER ReaderAttributesratt; Locator_tloc(22222); ratt.endpoint.unicastLocatorList.push_back(loc); ratt.endpoint.durabilityKind=TRANSIENT; ratt.endpoint.persistence_guid.guidPrefix.value[11]=2; ratt.endpoint.persistence_guid.entityId.value[3]=1; ratt.endpoint.properties=property_policy; mp_reader=RTPSDomain::createRTPSReader(mp_participant,ratt,mp_history,&m_listener);
QoS
使用Fast-RTPS,你有非常多的QoS策略可以配置。
它们主要可以分为下面几类:
durability
:定义了Writer与新Reader匹配时对于已发送的数据的行为,“持久化”一节已经提到过。liveliness
:定义Publisher的活跃程度。例如:多长时间发布一次公告消息。reliability
:定义消息的可靠性。它有两个选项:1、BEST_EFFORT
,发送消息时,接收者(订阅者)没有到达确认。速度快,但是消息可能会丢失。2、RELIABLE
,发送方(发布者)期望接收方(订阅者)进行到达确认。速度较慢,但可以防止数据丢失。partition
:可以在domain的物理分区上建立逻辑分区。deadline
:指定消息的更新频率,当新消息的频率降至某个阈值以下时,会发出警报。这对于需要定期更新数据的场景很有用。lifespan
:指定Publisher发布数据的最大有效期限。当使用寿命到期时,数据将从历史记录中删除。disablePositiveAcks
:指定是否需要取消确认消息。在不需要严格可靠的通信且带宽受限时,这么做可以减少网络流量。
在实现中,QoS包含了一系列的类,它们继承自QoSPolicy父类:
之所以提供如此多的选择,是因为不同的系统对于消息的质量有不同的要求。
在实际系统中,并非每个端点都需要本地存储所有数据。DDS在发送信息方面很聪明,如果消息不一定总是到达预期的目的地,则中间件将保证需要的可靠性。当系统发生更改时,Fast-RTPS会动态地找出将哪些数据发送到何处,并智能地将更改通知参与者。如果总数据量巨大,则DDS会智能过滤并仅发送每个端点真正需要的数据。当需要快速更新时,DDS发送多播消息以一次更新许多远程应用程序。当数据格式变更时,DDS会跟踪系统各个部分使用的版本并自动进行转换。对于安全性至关重要的应用程序,DDS控制访问,强制执行数据流路径并实时加密数据。
当系统要满足:以极高的速度,在动态,苛刻且不可预测的环境工作时,DDS的真正威力就会显现。
实操测试
文章的最后,我们通过实际运行程序来进行一些实验。虽然Fast-RTPS支持非常多的操作系统,但在Ubuntu系统上验证可能是最方便的。
Fast-RTPS是面向分布式系统的,这意味着在一个系统上验证它的功能意义不大。但另一方面我们大部分人并没有同时拥有多个设备。
在这种情况下,我们可以借助docker,它可以在同一个系统上运行多个独立的虚拟系统。然后我们就可以在这些独立的系统上进行测试了,这样就模拟了分布式的环境。
Fast-RTPS提供了包含依赖环境的Docker容器,我们只要下载和运行这些容器,就可以拥有多个独立的系统了。
不过,在这运行下面这些示例之前,你需要配置好docker环境。关于docker的基本使用已经超过了本文的范畴,你可以浏览这个链接:Install Docker Engine on Ubuntu。
Fast-RTPS需要的文件可以到官网下载:https://eprosima.com/index.php/downloads-all。
点击上面这个链接,然后输入个人信息就可以进入下载页面了。你可以选择最新版本的Docker和Fast-RTPS包进行下载:
考虑到国内的网络状况,下载的速度可能非常慢。我下载需要的文件耗费的好几个小时,为了节省你的时间,我已经将下载好的文件放在了这里:
- eProsima_FastRTPS-1.9.3-Linux.tgz:包含了FastRTPS的源码和编译命令,用来在Ubuntu系统上安装环境。
- ubuntu-fast-rtps v1.9.3.tar:已经预装了Fast-RTPS环境,可以在上面运行Fast-RTPS的程序,用来进行测试。
在Ubuntu系统中,先将eProsima_FastRTPS-1.9.3-Linux.tgz解压缩,为了编译它,还需要安装一些依赖,相关命令如下:
sudo apt install cmake g++ sudo apt install libasio-dev libtinyxml2-dev mkdir fast-rtps tar-xvf eProsima_FastRTPS-1.9.3-Linux.tgz -C fast-rtps/ cd fast-rtps/ chmod a+x install.sh sudo ./install.sh
在这之后你就可以转到/fast-rtps/src/fastrtps/examples/目录下编译示例了。不过这个目录下的CMakeList.txt似乎存在问题,我在这个文件的开头增加了下面一行才完成编译:
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
编译完成之后,我们并非是在Ubuntu系统上运行程序,而是将这些可执行文件放到docker容器中,以分布式的环境来运行它们。
所以需要启动docker容器:
$ docker load -i ubuntu-fast-rtps.tar $ docker run -it ubuntu-fast-rtps:v1.9.3
你可以通过docker run -it ...
同时启动多个docker容器以进行测试(每个容器对应一个通信的参与者。当然,你需要同时打开多个shell窗口)。
例如我启动了两个docker容器:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 2125504ee62f ubuntu-fast-rtps:v1.9.3 "/bin/bash" 5 minutes ago Up 5 minutes mystifying_jennings b17517fefecd ubuntu-fast-rtps:v1.9.3 "/bin/bash" 23 minutes ago Up 23 minutes stoic_leavitt
运行docker run -it ...
之后会直接进入docker的shell中,你可以在根目录创建fastrtps
目录用来存放测试程序。
然后在Ubuntu系统上将编译出的示例程序拷贝到docker中:
sudo docker cp ./ b17517fefecd:/fastrtps sudo docker cp ./ 2125504ee62f:/fastrtps
在这之后就可以转到docker容器的shell中运行测试程序了。
例如,在两个docker上运行HelloWorld的示例:
- 下面是Publisher程序:
root@b17517fefecd:/fastrtps/HelloWorldExample# ./HelloWorldExample publisher Publisher running 10 samples. Publisher matched Message: HelloWorld with index: 1 SENT Message: HelloWorld with index: 2 SENT Message: HelloWorld with index: 3 SENT Message: HelloWorld with index: 4 SENT Message: HelloWorld with index: 5 SENT Message: HelloWorld with index: 6 SENT Message: HelloWorld with index: 7 SENT Message: HelloWorld with index: 8 SENT Message: HelloWorld with index: 9 SENT Message: HelloWorld with index: 10 SENT
- 下面是Subscriber程序:
root@2125504ee62f:/fastrtps/HelloWorldExample# ./HelloWorldExample subscriber Starting Subscriber running. Please press enter to stop the Subscriber Subscriber matched Message HelloWorld 1 RECEIVED Message HelloWorld 2 RECEIVED Message HelloWorld 3 RECEIVED Message HelloWorld 4 RECEIVED Message HelloWorld 5 RECEIVED Message HelloWorld 6 RECEIVED Message HelloWorld 7 RECEIVED Message HelloWorld 8 RECEIVED Message HelloWorld 9 RECEIVED Message HelloWorld 10 RECEIVED Subscriber unmatched
接下来是Benchmark程序:
- Benchmark subscriber端:
root@b17517fefecd:/fastrtps/Benchmark# ./Benchmark subscriber Subscriber running... Subscriber matched Publisher matched Subscriber unmatched Publisher unmatched
- Benchmark publisher端:
root@2125504ee62f:/fastrtps/Benchmark# ./Benchmark publisher Publisher running... Subscriber matched Publisher matched. Test starts... RESULTS after 10000 milliseconds: COUNT: 53951 SAMPLES: 0,771,668,548,582,716,700,706,408,440,592,636,738,698,648,574,706,776,690,584,638,556,750,740,640,584,572,542,526,560,552,528,608,504,630,478,598,708,620,528,660,718,578,646,702,528,652,528,450,508,566,544,516,616,652,584,532,434,542,678,752,696,412,544,654,766,736,612,496,470,662,580,566,634,674,568,532,546,528,552,552,528,490,508,598,620,672,506,468,654,
在运行的过程中,你可以感受到借助Fast-RTPS,不同系统上的参与者是多么快速的发现了对方并完成了通信的。 当然,你可以运行更多的用例,或者修改代码进行你想要的测试。
参考资料与推荐读物
- What is DDS?
- Where Can I Get DDS?
- PDF: What can DDS do for You?
- Data Distribution Services Performance Evaluation Framework
- Using DDS with TSN and Adaptive AUTOSAR
- Object Management Group: Data Distribution Service™
- DDS Interoperability Wire Protocol
- eProsima Fast RTPS Documentation
- RTPS Introduction
- eProsima Fast RTPS: PubSub Hello World
- Github: Fast-RTPS
- DDS in a Nutshell
- Data Distribution Service
- What’s the difference between DDS and SOME/IP?
</article>