DDS与FastRTPS(三)

简介: DDS与FastRTPS

读者-写者层

读者-写者层是相对于发布者-订阅者层更底层的API。

它提供了更多的控制,但也意味着使用起来会稍微麻烦一些。

两个层次在几个核心概念上存在一一对应的关系,如下表所示:

Publisher-Subscriber Layer Writer-Reader Layer
Domain RTPSDomain
Participant RTPSParticipant
Publisher RTPSWriter
Subscriber RTPSReader

如果你浏览Fast-RTPS的源码,你会发现其实发布者-订阅者层的实现就是依赖读者-写者层的。

想要很快的熟悉读者-写者层的使用可以浏览下面三个代码示例:

  • RTPSTest_registered
  • RTPSTest_persistent
  • RTPSTest_as_socket

RTPSParticipant,RTPSWriter和RTPSReader都通过RTPSDomain创建。

相对于发布者-订阅层不一样的是,这一层不支持通过XML的形式配置参数。开发者必须通过代码的形式配置所有的参数,例如:

//CREATE PARTICIPANT
RTPSParticipantAttributesPParam;
PParam.builtin.discovery_config.discoveryProtocol=eprosima::fastrtps::rtps::DiscoveryProtocol::SIMPLE;
PParam.builtin.use_WriterLivelinessProtocol=true;
mp_participant=RTPSDomain::createParticipant(PParam);
if(mp_participant==nullptr)
  returnfalse;
//CREATE WRITERHISTORY
HistoryAttributes hatt;
hatt.payloadMaxSize = 255;
hatt.maximumReservedCaches = 50;
mp_history = new WriterHistory(hatt);
//CREATE WRITER
WriterAttributeswatt;
watt.endpoint.reliabilityKind=BEST_EFFORT;
mp_writer=RTPSDomain::createRTPSWriter(mp_participant,watt,mp_history,&m_listener);


这里的逻辑主要就是设置参数和创建RTPSParticipant,RTPSWriter对象。并且,RTPSParticipant将被用来注册RTPSWriter:

TopicAttributesTatt;
Tatt.topicKind=NO_KEY;
Tatt.topicDataType="string";
Tatt.topicName="exampleTopic";
ReaderQosRqos;
returnmp_participant->registerReader(mp_reader,Tatt,Rqos);

在RTPS协议中,Reader和Writer将有关Topic的数据保存在其关联的历史记录中。每个数据段都由一个变更表示,对应的实现是CacheChange_t

更改通过历史记录管理。读者和写者的历史是两种类型:

  • eprosima::fastrtps::rtps::WriterHistory;
  • eprosima::fastrtps::rtps::ReaderHistory;

对于Writer来说,发送消息是往历史中添加变更:

//Request a change from the history
CacheChange_t*change=writer->new_change([]()->uint32_t{return255;},ALIVE);
//Write serialized data into the change
change->serializedPayload.length=sprintf((char*)change->serializedPayload.data,"My example string %d",2)+1;
//Insert change back into the history. The Writer takes care of the rest.
history->add_change(change);

而对于Reader来说,新消息会被放入到历史中,读取完了可以将其删除:

voidTestReaderRegistered::MyListener::onNewCacheChangeAdded(
        RTPSReader*reader,
        constCacheChange_t*constchange)
{
    printf("Received: %s\n",change->serializedPayload.data);
    reader->getHistory()->remove_change((CacheChange_t*)change);
    n_received++;
}

框架会根据消息触发Reader的回调。

持久化

默认情况下,Writer的历史在其生命周期以内可以被Reader访问。这意味着,一旦Writer退出,则其历史就没有了。但如果需要,你可以配置持久化,这使得即便Writer重启了,仍然可以维护早先的历史。

使用持久化功能可以保护端点的状态免受意外故障的影响,因为端点在重新启动后仍会继续通信,就像它们刚从网络断开连接一样。

你可以通过RTPSTest_persistent这个示例来了解如何使用这个功能。

要使用持久化功能,Writer和Reader需要进行以下设置:

  • durabilityKind设置为TRANSIENT
  • persistence_guid不能是全0
  • 为Writer,Reader或者RTPSParticipant设置持久化插件。目前内置的插件是SQLITE3。

下面是一段代码示例:

PropertyPolicyproperty_policy;
property_policy.properties().emplace_back("dds.persistence.plugin","builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename","test.db");
//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.durabilityKind = TRANSIENT;
watt.endpoint.persistence_guid.guidPrefix.value[11] = 1;
watt.endpoint.persistence_guid.entityId.value[3] = 1;
watt.endpoint.properties = property_policy;
mp_writer=RTPSDomain::createRTPSWriter(mp_participant,watt,mp_history,&m_listener);


durabilityKind参数定义了Writer与新Reader匹配时对于已发送的数据的行为,该参数有三个选项:

  • VOLATILE(默认值):丢掉所有已经发送的数据。
  • TRANSIENT_LOCAL:保存最近发送的k条数据。
  • TRANSIENT:与TRANSIENT_LOCAL类似,但是还将消息保存到持久化存储中。这就使得即便它的进程异常退出了,其数据不会丢失。

对于读者来说,其配置方法是类似的:

PropertyPolicyproperty_policy;
property_policy.properties().emplace_back("dds.persistence.plugin","builtin.SQLITE3");
property_policy.properties().emplace_back("dds.persistence.sqlite3.filename","test.db");
//CREATE READER
ReaderAttributesratt;
Locator_tloc(22222);
ratt.endpoint.unicastLocatorList.push_back(loc);
ratt.endpoint.durabilityKind=TRANSIENT;
ratt.endpoint.persistence_guid.guidPrefix.value[11]=2;
ratt.endpoint.persistence_guid.entityId.value[3]=1;
ratt.endpoint.properties=property_policy;
mp_reader=RTPSDomain::createRTPSReader(mp_participant,ratt,mp_history,&m_listener);


QoS

使用Fast-RTPS,你有非常多的QoS策略可以配置。

fa56d2eb98bacb445e765279d0fd9b41_640_wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1.png

它们主要可以分为下面几类:

  • durability:定义了Writer与新Reader匹配时对于已发送的数据的行为,“持久化”一节已经提到过。
  • liveliness:定义Publisher的活跃程度。例如:多长时间发布一次公告消息。
  • reliability:定义消息的可靠性。它有两个选项:1、BEST_EFFORT,发送消息时,接收者(订阅者)没有到达确认。速度快,但是消息可能会丢失。2、RELIABLE,发送方(发布者)期望接收方(订阅者)进行到达确认。速度较慢,但可以防止数据丢失。
  • partition:可以在domain的物理分区上建立逻辑分区。
  • deadline:指定消息的更新频率,当新消息的频率降至某个阈值以下时,会发出警报。这对于需要定期更新数据的场景很有用。
  • lifespan:指定Publisher发布数据的最大有效期限。当使用寿命到期时,数据将从历史记录中删除。
  • disablePositiveAcks:指定是否需要取消确认消息。在不需要严格可靠的通信且带宽受限时,这么做可以减少网络流量。

在实现中,QoS包含了一系列的类,它们继承自QoSPolicy父类:

9534b7afe81645e4e5f9bb5a65df587f_640_wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1.png

之所以提供如此多的选择,是因为不同的系统对于消息的质量有不同的要求。

在实际系统中,并非每个端点都需要本地存储所有数据。DDS在发送信息方面很聪明,如果消息不一定总是到达预期的目的地,则中间件将保证需要的可靠性。当系统发生更改时,Fast-RTPS会动态地找出将哪些数据发送到何处,并智能地将更改通知参与者。如果总数据量巨大,则DDS会智能过滤并仅发送每个端点真正需要的数据。当需要快速更新时,DDS发送多播消息以一次更新许多远程应用程序。当数据格式变更时,DDS会跟踪系统各个部分使用的版本并自动进行转换。对于安全性至关重要的应用程序,DDS控制访问,强制执行数据流路径并实时加密数据。

当系统要满足:以极高的速度,在动态,苛刻且不可预测的环境工作时,DDS的真正威力就会显现。

实操测试

文章的最后,我们通过实际运行程序来进行一些实验。虽然Fast-RTPS支持非常多的操作系统,但在Ubuntu系统上验证可能是最方便的。

Fast-RTPS是面向分布式系统的,这意味着在一个系统上验证它的功能意义不大。但另一方面我们大部分人并没有同时拥有多个设备。

在这种情况下,我们可以借助docker,它可以在同一个系统上运行多个独立的虚拟系统。然后我们就可以在这些独立的系统上进行测试了,这样就模拟了分布式的环境。

Fast-RTPS提供了包含依赖环境的Docker容器,我们只要下载和运行这些容器,就可以拥有多个独立的系统了。

不过,在这运行下面这些示例之前,你需要配置好docker环境。关于docker的基本使用已经超过了本文的范畴,你可以浏览这个链接:Install Docker Engine on Ubuntu。

Fast-RTPS需要的文件可以到官网下载:https://eprosima.com/index.php/downloads-all

点击上面这个链接,然后输入个人信息就可以进入下载页面了。你可以选择最新版本的Docker和Fast-RTPS包进行下载:

6fc85cf668936071de1ca9d411739859_640_wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1.png

考虑到国内的网络状况,下载的速度可能非常慢。我下载需要的文件耗费的好几个小时,为了节省你的时间,我已经将下载好的文件放在了这里:

  • eProsima_FastRTPS-1.9.3-Linux.tgz:包含了FastRTPS的源码和编译命令,用来在Ubuntu系统上安装环境。
  • ubuntu-fast-rtps v1.9.3.tar:已经预装了Fast-RTPS环境,可以在上面运行Fast-RTPS的程序,用来进行测试。

在Ubuntu系统中,先将eProsima_FastRTPS-1.9.3-Linux.tgz解压缩,为了编译它,还需要安装一些依赖,相关命令如下:

sudo apt install cmake g++
sudo apt install libasio-dev libtinyxml2-dev
mkdir fast-rtps
tar-xvf eProsima_FastRTPS-1.9.3-Linux.tgz -C fast-rtps/
cd fast-rtps/
chmod a+x install.sh
sudo ./install.sh

在这之后你就可以转到/fast-rtps/src/fastrtps/examples/目录下编译示例了。不过这个目录下的CMakeList.txt似乎存在问题,我在这个文件的开头增加了下面一行才完成编译:

SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")

编译完成之后,我们并非是在Ubuntu系统上运行程序,而是将这些可执行文件放到docker容器中,以分布式的环境来运行它们。

所以需要启动docker容器:

$ docker load -i ubuntu-fast-rtps.tar
$ docker run -it ubuntu-fast-rtps:v1.9.3

你可以通过docker run -it ...同时启动多个docker容器以进行测试(每个容器对应一个通信的参与者。当然,你需要同时打开多个shell窗口)。

例如我启动了两个docker容器:

CONTAINER ID        IMAGE                     COMMAND             CREATED             STATUS              PORTS               NAMES
2125504ee62f        ubuntu-fast-rtps:v1.9.3   "/bin/bash"         5 minutes ago       Up 5 minutes                            mystifying_jennings
b17517fefecd        ubuntu-fast-rtps:v1.9.3   "/bin/bash"         23 minutes ago      Up 23 minutes                           stoic_leavitt

运行docker run -it ...之后会直接进入docker的shell中,你可以在根目录创建fastrtps目录用来存放测试程序。

然后在Ubuntu系统上将编译出的示例程序拷贝到docker中:

sudo docker cp ./ b17517fefecd:/fastrtps
sudo docker cp ./ 2125504ee62f:/fastrtps

在这之后就可以转到docker容器的shell中运行测试程序了。

例如,在两个docker上运行HelloWorld的示例:

  • 下面是Publisher程序:
root@b17517fefecd:/fastrtps/HelloWorldExample# ./HelloWorldExample publisher
Publisher running 10 samples.
Publisher matched
Message: HelloWorld with index: 1 SENT
Message: HelloWorld with index: 2 SENT
Message: HelloWorld with index: 3 SENT
Message: HelloWorld with index: 4 SENT
Message: HelloWorld with index: 5 SENT
Message: HelloWorld with index: 6 SENT
Message: HelloWorld with index: 7 SENT
Message: HelloWorld with index: 8 SENT
Message: HelloWorld with index: 9 SENT
Message: HelloWorld with index: 10 SENT
  • 下面是Subscriber程序:
root@2125504ee62f:/fastrtps/HelloWorldExample# ./HelloWorldExample subscriber
Starting 
Subscriber running. Please press enter to stop the Subscriber
Subscriber matched
Message HelloWorld 1 RECEIVED
Message HelloWorld 2 RECEIVED
Message HelloWorld 3 RECEIVED
Message HelloWorld 4 RECEIVED
Message HelloWorld 5 RECEIVED
Message HelloWorld 6 RECEIVED
Message HelloWorld 7 RECEIVED
Message HelloWorld 8 RECEIVED
Message HelloWorld 9 RECEIVED
Message HelloWorld 10 RECEIVED
Subscriber unmatched

接下来是Benchmark程序:

  • Benchmark subscriber端:
root@b17517fefecd:/fastrtps/Benchmark# ./Benchmark subscriber
Subscriber running...
Subscriber matched
Publisher matched
Subscriber unmatched
Publisher unmatched
  • Benchmark publisher端:
root@2125504ee62f:/fastrtps/Benchmark# ./Benchmark publisher
Publisher running...
Subscriber matched
Publisher matched. Test starts...
RESULTS after 10000 milliseconds:
COUNT: 53951
SAMPLES: 0,771,668,548,582,716,700,706,408,440,592,636,738,698,648,574,706,776,690,584,638,556,750,740,640,584,572,542,526,560,552,528,608,504,630,478,598,708,620,528,660,718,578,646,702,528,652,528,450,508,566,544,516,616,652,584,532,434,542,678,752,696,412,544,654,766,736,612,496,470,662,580,566,634,674,568,532,546,528,552,552,528,490,508,598,620,672,506,468,654,

在运行的过程中,你可以感受到借助Fast-RTPS,不同系统上的参与者是多么快速的发现了对方并完成了通信的。 当然,你可以运行更多的用例,或者修改代码进行你想要的测试。

参考资料与推荐读物

  • What is DDS?
  • Where Can I Get DDS?
  • PDF: What can DDS do for You?
  • Data Distribution Services Performance Evaluation Framework
  • Using DDS with TSN and Adaptive AUTOSAR
  • Object Management Group: Data Distribution Service™
  • DDS Interoperability Wire Protocol
  • eProsima Fast RTPS Documentation
  • RTPS Introduction
  • eProsima Fast RTPS: PubSub Hello World
  • Github: Fast-RTPS
  • DDS in a Nutshell
  • Data Distribution Service
  • What’s the difference between DDS and SOME/IP?
</article>
相关文章
|
2月前
|
Java API Android开发
DDS、FastDDS、OpenDDS扫盲
DDS、FastDDS、OpenDDS扫盲
486 0
|
2月前
|
运维 监控 负载均衡
ACS
阿里云容器计算服务ACS(Alibaba Cloud Container Compute Service,ACS)是一种基于容器技术的云计算服务,它可以帮助用户快速构建、部署和管理容器化应用程序。ACS提供了容器镜像、容器编排、负载均衡、日志监控等功能,使得用户可以专注于应用程序的开发和迭代,而无需关注底层的基础设施和运维。
148 3
|
11月前
|
存储 网络协议 中间件
DDS数据分发服务
DDS数据分发服务
400 0
|
存储 XML 网络协议
|
安全 调度 数据安全/隐私保护
PCIe访问控制服务(ACS)
PCIe访问控制服务(ACS)
3768 0
PCIe访问控制服务(ACS)
|
12月前
|
12月前
GPDB6和GPDB7直连primary命令的不同
GPDB6和GPDB7直连primary命令的不同
95 0
|
算法 异构计算
基于FPGA的DDS设计,并通过DDS实现ASK,FSK,PSK三种调制
基于FPGA的DDS设计,并通过DDS实现ASK,FSK,PSK三种调制
116 0
|
消息中间件 安全 网络协议
|
存储 安全 芯片
基于DDS的信号源设计
基于DDS的信号源设计
102 0
基于DDS的信号源设计