DDS与FastRTPS(二)

简介: DDS与FastRTPS

对象与数据结构

下面是Fast-RTPS实现中的核心结构。

Publish-Subscriber模块

RTPS标准的高层类型。

  • Domain:用来创建,管理和销毁Participants。
  • Participant:包括Publisher和Subscriber,并管理它们的配置。
  • ParticipantAttributes:创建Participant的配置参数。
  • ParticipantListener:可以让开发者实现Participant的回调函数。
  • Publisher:在Topic上发布数据的对象。
  • PublisherAttributes:创建Publisher的配置参数。
  • PublisherListener:可以让开发者实现Publisher的回调函数。
  • Subscriber:在Topic上接受数据的对象。
  • SubscriberAttributes:创建Subscriber的配置参数。
  • SubscriberListener:可以让开发者实现Subscriber的回调函数。

RTPS模块

RTPS的底层模型。包含下面几个子模块:

  • RTPS Common
  • CacheChange_t:描述Topic上的变更,存储在历史Cache中。
  • Data:Cache变化的负载。
  • Message:RTPS消息。
  • Header:RTPS协议的头信息。
  • Sub-Message Header:标识RTPS的订阅消息。
  • MessageReceiver:反序列化和处理接受到的RTPS消息。
  • RTPSMessageCreator:构建RTPS消息。
  • RTPS Domain
  • RTPSDomain:用来创建,管理和销毁底层的RTPSParticipants。
  • RTPSParticipant:包括Writer和Reader。
  • RTPS Reader
  • RTPSReader:读者的基类。
  • ReaderAttributes:包含RTPS读者的配置参数。
  • ReaderHistory:存储Topic变化的历史数据。
  • ReaderListener:读者的回调类型。
  • RTPS Writer
  • RTPSWriter:写者的基类。
  • WriterAttributes:包含RTPS写者的配置参数。
  • WriterHistory:存储写者的历史数据。

配置Attributes

上面的数据结构中看到了许多Attributes后缀的类名。这些类包含了对协议或者对象的配置参数,很多特性都需要设置这些属性来完成。

这些类的定义基本都位于下面三个文件夹中:

  • include/fastrtps/attributes
  • include/fastrtps/rtps/attributes
  • include/fastdds/rtps/attributes

Fast RTPS支持非常多的配置参数,并且参数的结构常常是嵌套的。

通过代码去配置这些参数会产生很多啰嗦的代码,而且最大的问题在于:每次更改配置参数都需要重新编译。这个问题并非Fast RTPS才有,只要包含大量配置参数的软件都会这样的问题。通常的解决方法就是:提供文本格式的配置文件的方式来配置参数。因此对于Fast-RTPS来说,除了支持通过代码配置参数,它也支持通过XML文件的方式来进行配置。

有了配置文件之后,在代码中直接读取就好了,例如:

Participant*participant=Domain::createParticipant("participant_xml_profile");

在这之后,如果需要调整配置,只需要修改配置文件,不用在改动代码,自然也不用重新编译。这对于项目部署是很重要的。

Fast-RTPS支持的配置项,以及这些配置项说明和默认值都可以到这个链接中查看:XML profiles。

Domain

RTPS中的通信参数者之间,通过Domain进行隔离。

同一时刻可能有多个Domain同时存在,一个Domain中可以包含任意数目的消息发送者和接受者。

其结构如下图所示:

90f5f2e01a7eb35461d48b9e3cc19fab_640_wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1.png

开发者可以通过domainId来指定参与者所属Domain。

如果没有指定,默认的domainId = 80

发现

作为DDS的实现,Fast-RTPS提供了Publisher和Subscriber自动发现和匹配的功能。在实现上,这分为两个步骤来完成:

  • Participant Discovery Phase (PDP):在这个阶段,参与者互相通知彼此的存在。为了达到这个目的,每个参与者需要定时发送公告消息。公告消息通过周知的多播地址和端口发送(根据domain计算得到)。
  • Endpoint Discovery Phase (EDP):在这个阶段,Publisher和Subscriber互相确认。为此,参与者使用在PDP期间建立的通信通道,彼此共享有关其发布者和订阅者的信息。 该信息包含了Topic和数据类型。为了使两个端点匹配,它们的Topic和数据类型必须一致。 一旦发布者和订阅者匹配,他们就发送/接收数据了。

这两个阶段对应了两个独立的协议:

  • Simple Participant Discovery Protocol:指定参与者如何在网络中发现彼此。
  • Simple Endpoint Discovery Protocol:定义了已经互相发现的参与者交换信息的协议。

Fast-RTPS提供了四种发现机制:

  • Simple:这是默认机制。它在PDP和EDP阶段均使用RTPS标准,因此可与任何其他DDS和RTPS实现兼容。
  • Static:此机制在PDP阶段使用Simple Participant Discovery Protocol。如果所有发布者和订阅者的地址以及端口和主题信息是事先知道的,则允许跳过EDP阶段。
  • Server-Client:这种发现机制使用集中式发现结构,由服务器充当发现机制的Hub。
  • Manual:此机制仅与RTPSDomain层兼容。它禁用了PDP阶段,使用户可以使用其选择的任何外部元信息通道手动匹配和取消匹配RTPS参与者,读者和写者。

不同的发现机制具有一些共同的配置:

名称 描述 默认值
Ignore Participant flags 在必要的时候,可以选择忽略一些参与者。
例如:另一台主机上的,另一个进程的或者同一个进程的。
NO_FILTER
Lease Duration 指定远程参与者在多少时间内认为本地参与者还活着。 20 s
Announcement Period 指定参与者的PDP公告消息的周期。 3s

关于发现机制的更多信息可以浏览这个链接:Discovery。

传输控制

Fast-RTPS实现了可插拔的传输架构,这意味着每一个参与者可以随时加入和退出。

在传输上,Fast-RTPS支持以下五种传输方式:

  • UDPv4
  • UDPv6
  • TCPv4
  • TCPv6
  • SHM(Shared Memory)

默认的,当Participant创建时,会自动的配置两个传输通道:

  • SHM:用来与同一个机器上的参与者通信。
  • UDPv4:同来与跨机器的参与者通信。

当然,开发者可以改变这个默认行为,通过C++接口或者XML配置文件都可以。

SHM要求所有参与者位于同一个系统上,它是借助了操作系统提供的共享内存机制实现。共享内存的好处是:支持大数据传输,减少了数据拷贝,并且也减少系统负载。因此通常情况下,使用SHM会获得更好的性能。使用SHM时,可以配置共享内存的大小。

网络通信包含了非常多的参数需要配置,例如:Buffer大小,端口号,超时时间等等。框架本身为参数设置了默认值,大部分情况下开发者不用调整它们。但是知道这些默认值是什么,在一些情况下可能会对分析问题有所帮助。关于这些配置的默认值,以及如果配置可以查看这个链接:Transport descriptors。

与UDP不同,TCP传输是面向连接的,因此,Fast-RTPS必须在发送RTPS消息之前建立TCP连接。TCP传输可以具有两种行为:充当TCP服务器或充当TCP客户端。服务器打开一个TCP端口以侦听传入的连接,然后客户端尝试连接到服务器。服务器和客户端的概念独立于RTPS概念,例如:Publisher,Subscriber,Reader或Writer。它们中的任何一个都可以用作TCP服务器或TCP客户端,因为这些实体仅用于建立TCP连接,而RTPS协议可以在该TCP连接上工作。

如果要使用TCP传输,开发者需要做更多的配置,关于这部分内容可以继续阅读官方文档,这里不再赘述。

FastRTPS代码示例

FastRTPS不仅有框架文档,API Reference,还有丰富的代码示例。

对于开发者来说,浏览这些代码可能是上手最快捷的方法。

你可以在这里浏览这些示例:Fast-RTPS/examples/C++/。

FASTRTPSGEN

FASTRTPSGEN是一个Java程序。用来为在Topic上传输的数据类型生成源码。

开发者通过接口描述语言(Interface Definition Language)定义需要传输的数据类型。然后通过FASTRTPSGEN生成C++编译需要的源文件。

可以通过下面的方法获取和编译FASTRTPSGEN。

git clone --recursive https://github.com/eProsima/Fast-RTPS-Gen.git
cd Fast-RTPS-Gen
gradle assemble

编译完成之后可执行文件位于./scripts/目录。如果需要,可以将该路径添加到$PATH变量中。

关于如何通过IDL定义数据类型请参见这里:Defining a data type via IDL。

以下面这个示例文件为例:

struct TestData 
{
char char_type;
octet octet_type;
long long_type;
string string_type;
float float_array[4];
sequence<double> double_list;
};

我们将其保存到文件名为data_type.idl的文件中。然后通过下面这条命令生成C++文件:

~/Fast-RTPS-Gen/scripts/fastrtpsgen data_type.idl

最后会得到下面四个文件:

data_type.cxx
data_type.h
data_typePubSubTypes.cxx
data_typePubSubTypes.h

前两个文件定义的是实际存储数据的结构,后两个文件定义的类是eprosima::fastrtps::TopicDataType的子类。用来在参与者上注册类型:

/**
 * Register a type in a participant.
 * @param part Pointer to the Participant.
 * @param type Pointer to the Type.
 * @return True if correctly registered.
 */
RTPS_DllAPIstaticboolregisterType(
        Participant*part,
        fastdds::dds::TopicDataType*type);

每一套通信系统中通常都会包含一个或多个自定义的数据类型。

发布者-订阅者层

可以通过 HelloWorldExample 来熟悉发布者-订阅者层接口。

该目录下文件列表如下:

-rw-r--r--  1 paul  staff   1.8K  3 16 13:36 CMakeLists.txt
-rw-r--r--  1 paul  staff   2.8K  3 16 13:36 HelloWorld.cxx
-rw-r--r--  1 paul  staff   6.1K  3 16 13:36 HelloWorld.h
-rw-r--r--  1 paul  staff    62B  3 16 13:36 HelloWorld.idl
-rw-r--r--  1 paul  staff   4.4K  3 16 13:36 HelloWorldPubSubTypes.cxx
-rw-r--r--  1 paul  staff   1.7K  3 16 13:36 HelloWorldPubSubTypes.h
-rw-r--r--  1 paul  staff   4.6K  3 16 13:36 HelloWorldPublisher.cpp
-rw-r--r--  1 paul  staff   1.7K  3 16 13:36 HelloWorldPublisher.h
-rw-r--r--  1 paul  staff   3.8K  3 16 13:36 HelloWorldSubscriber.cpp
-rw-r--r--  1 paul  staff   1.8K  3 16 13:36 HelloWorldSubscriber.h
-rw-r--r--  1 paul  staff   2.0K  3 16 13:36 HelloWorld_main.cpp
-rw-r--r--  1 paul  staff   3.1K  3 16 13:36 Makefile
-rw-r--r--  1 paul  staff   203B  3 16 13:36 README.txt

这其中:

  • README.txt是工程说明
  • CMakeLists.txt与Makefile是编译文件
  • HelloWorld_main.cpp包含了生成可执行文件的main函数
  • HelloWorld.idl是待传输的数据结构定义
  • HelloWorld.h,HelloWorld.cxx,HelloWorldPubSubTypes.h和HelloWorldPubSubTypes.cxx是由HelloWorld.idl文件生成
  • HelloWorldPublisher.h和HelloWorldPublisher.cpp是发布者的实现
  • HelloWorldSubscriber.h和HelloWorldSubscriber.cpp是订阅者的实现

熟悉一个C++工程可以先从main入手,HelloWorld_main.cpp中的主要逻辑就是根据用户输入的参数是"publisher"还是"subscriber"来确定启动哪个模块。

switch(type)
{
    case1:
        {
            HelloWorldPublishermypub;
            if(mypub.init())
            {
                mypub.run(count,sleep);
            }
            break;
        }
    case2:
        {
            HelloWorldSubscribermysub;
            if(mysub.init())
            {
                mysub.run();
            }
            break;
        }
}

接下来我们直接看HelloWorldPublisher和HelloWorldSubscriber就好。

HelloWorldPublisher::init中主要是为Publisher的对象设置参数:

boolHelloWorldPublisher::init()
{
    m_Hello.index(0);
    m_Hello.message("HelloWorld");
    ParticipantAttributesPParam;
    PParam.rtps.builtin.discovery_config.discoveryProtocol=DiscoveryProtocol_t::SIMPLE;
    PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol=true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter=true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader=true;
    PParam.rtps.builtin.domainId=0;
    PParam.rtps.builtin.discovery_config.leaseDuration=c_TimeInfinite;
    PParam.rtps.setName("Participant_pub");
    mp_participant=Domain::createParticipant(PParam);
<span class="k">if</span><span class="p">(</span><span class="n">mp_participant</span><span class="o">==</span><span class="nb">nullptr</span><span class="p">)</span>
    <span class="k">return</span> <span class="nb">false</span><span class="p">;</span>
<span class="c1">//REGISTER THE TYPE</span>
<span class="n">Domain</span><span class="o">::</span><span class="n">registerType</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="o">&amp;</span><span class="n">m_type</span><span class="p">);</span>
<span class="c1">//CREATE THE PUBLISHER</span>
<span class="n">PublisherAttributes</span> <span class="n">Wparam</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicKind</span> <span class="o">=</span> <span class="n">NO_KEY</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicDataType</span> <span class="o">=</span> <span class="s">"HelloWorld"</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicName</span> <span class="o">=</span> <span class="s">"HelloWorldTopic"</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">KEEP_LAST_HISTORY_QOS</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">depth</span> <span class="o">=</span> <span class="mi">30</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">max_samples</span> <span class="o">=</span> <span class="mi">50</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">allocated_samples</span> <span class="o">=</span> <span class="mi">20</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">times</span><span class="p">.</span><span class="n">heartbeatPeriod</span><span class="p">.</span><span class="n">seconds</span> <span class="o">=</span> <span class="mi">2</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">times</span><span class="p">.</span><span class="n">heartbeatPeriod</span><span class="p">.</span><span class="n">nanosec</span> <span class="o">=</span> <span class="mi">200</span><span class="o">*</span><span class="mi">1000</span><span class="o">*</span><span class="mi">1000</span><span class="p">;</span>
<span class="n">Wparam</span><span class="p">.</span><span class="n">qos</span><span class="p">.</span><span class="n">m_reliability</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">RELIABLE_RELIABILITY_QOS</span><span class="p">;</span>
<span class="n">mp_publisher</span> <span class="o">=</span> <span class="n">Domain</span><span class="o">::</span><span class="n">createPublisher</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="n">Wparam</span><span class="p">,(</span><span class="n">PublisherListener</span><span class="o">*</span><span class="p">)</span><span class="o">&amp;</span><span class="n">m_listener</span><span class="p">);</span>
<span class="k">if</span><span class="p">(</span><span class="n">mp_publisher</span> <span class="o">==</span> <span class="nb">nullptr</span><span class="p">)</span>
    <span class="k">return</span> <span class="nb">false</span><span class="p">;</span>
<span class="k">return</span> <span class="nb">true</span><span class="p">;</span>
}


这里的参数配置请参阅API说明:

  • RTPSParticipantAttributes
  • BuiltinAttributes
  • PublisherAttributes
  • TopicAttributes
  • WriterQos
  • WriterTimes
  • WriterQos

Publisher发送消息的逻辑很简单:

boolHelloWorldPublisher::publish(boolwaitForListener)
{
    if(m_listener.firstConnected||!waitForListener||m_listener.n_matched>0)
    {
        m_Hello.index(m_Hello.index()+1);
        mp_publisher->write((void*)&m_Hello);
        returntrue;
    }
    returnfalse;
}

注意,这里write的对象是通过idl文件生成的类型。

Subscriber的初始化和Publisher是类似的:

boolHelloWorldSubscriber::init()
{
    ParticipantAttributesPParam;
    PParam.rtps.builtin.discovery_config.discoveryProtocol=DiscoveryProtocol_t::SIMPLE;
    PParam.rtps.builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol=true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationReaderANDSubscriptionWriter=true;
    PParam.rtps.builtin.discovery_config.m_simpleEDP.use_PublicationWriterANDSubscriptionReader=true;
    PParam.rtps.builtin.domainId=0;
    PParam.rtps.builtin.discovery_config.leaseDuration=c_TimeInfinite;
    PParam.rtps.setName("Participant_sub");
    mp_participant=Domain::createParticipant(PParam);
    if(mp_participant==nullptr)
        returnfalse;
<span class="c1">//REGISTER THE TYPE</span>
<span class="n">Domain</span><span class="o">::</span><span class="n">registerType</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="o">&amp;</span><span class="n">m_type</span><span class="p">);</span>
<span class="c1">//CREATE THE SUBSCRIBER</span>
<span class="n">SubscriberAttributes</span> <span class="n">Rparam</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicKind</span> <span class="o">=</span> <span class="n">NO_KEY</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicDataType</span> <span class="o">=</span> <span class="s">"HelloWorld"</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">topicName</span> <span class="o">=</span> <span class="s">"HelloWorldTopic"</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">KEEP_LAST_HISTORY_QOS</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">historyQos</span><span class="p">.</span><span class="n">depth</span> <span class="o">=</span> <span class="mi">30</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">max_samples</span> <span class="o">=</span> <span class="mi">50</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">topic</span><span class="p">.</span><span class="n">resourceLimitsQos</span><span class="p">.</span><span class="n">allocated_samples</span> <span class="o">=</span> <span class="mi">20</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">qos</span><span class="p">.</span><span class="n">m_reliability</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">RELIABLE_RELIABILITY_QOS</span><span class="p">;</span>
<span class="n">Rparam</span><span class="p">.</span><span class="n">qos</span><span class="p">.</span><span class="n">m_durability</span><span class="p">.</span><span class="n">kind</span> <span class="o">=</span> <span class="n">TRANSIENT_LOCAL_DURABILITY_QOS</span><span class="p">;</span>
<span class="n">mp_subscriber</span> <span class="o">=</span> <span class="n">Domain</span><span class="o">::</span><span class="n">createSubscriber</span><span class="p">(</span><span class="n">mp_participant</span><span class="p">,</span><span class="n">Rparam</span><span class="p">,(</span><span class="n">SubscriberListener</span><span class="o">*</span><span class="p">)</span><span class="o">&amp;</span><span class="n">m_listener</span><span class="p">);</span>
<span class="k">if</span><span class="p">(</span><span class="n">mp_subscriber</span> <span class="o">==</span> <span class="nb">nullptr</span><span class="p">)</span>
    <span class="k">return</span> <span class="nb">false</span><span class="p">;</span>
<span class="k">return</span> <span class="nb">true</span><span class="p">;</span>
}


当然,Subscriber有自己的配置参数类型:

  • SubscriberAttributes
  • ReaderQos
  • ReaderTimes

需要注意的是,Subscriber与Publisher的通信是建立在Topic上的,因此对于Topic标识的配置要保持一致:

Wparam.topic.topicDataType="HelloWorld";
Wparam.topic.topicName="HelloWorldTopic";

有了Topic的这个抽象概念,使得Subscriber与Publisher不用物理地址上有任何关联,也屏蔽了硬件和操作系统的差异:同样的代码,其编译产物可以一个跑在x86的Mac系统上,一个跑在ARM架构的Android设备上。

Subscriber通过void HelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber* sub)方法来处理接受到的数据。在示例的实现中,就是将消息体打印出来:

voidHelloWorldSubscriber::SubListener::onNewDataMessage(Subscriber*sub)
{
    if(sub->takeNextData((void*)&m_Hello,&m_info))
    {
        if(m_info.sampleKind==ALIVE)
        {
            this->n_samples++;
            // Print your structure data here.
            std::cout<<"Message "<<m_Hello.message()<<" "<<m_Hello.index()<<" RECEIVED"<<std::endl;
        }
    }
}


Publisher与Subscriber各自有一些回调,开发者可以利用它们来进行需要的处理:

回调 Publisher Subscriber
onNewDataMessage -
onSubscriptionMatched -
on_requested_deadline_missed -
on_liveliness_changed -
onPublicationMatched -
on_offered_deadline_missed -
on_liveliness_lost -
相关文章
|
8月前
|
Java API Android开发
DDS、FastDDS、OpenDDS扫盲
DDS、FastDDS、OpenDDS扫盲
1513 0
|
8月前
|
运维 监控 负载均衡
ACS
阿里云容器计算服务ACS(Alibaba Cloud Container Compute Service,ACS)是一种基于容器技术的云计算服务,它可以帮助用户快速构建、部署和管理容器化应用程序。ACS提供了容器镜像、容器编排、负载均衡、日志监控等功能,使得用户可以专注于应用程序的开发和迭代,而无需关注底层的基础设施和运维。
323 3
|
存储 网络协议 中间件
DDS数据分发服务
DDS数据分发服务
581 0
|
5月前
|
SQL 监控 关系型数据库
drds学习
【8月更文挑战第6天】
154 1
|
8月前
|
Oracle 关系型数据库
|
8月前
|
SQL 运维 监控
drds相关的问题
drds相关的问题
82 5
GPDB6和GPDB7直连primary命令的不同
GPDB6和GPDB7直连primary命令的不同
144 0
|
消息中间件 安全 网络协议
|
算法 异构计算
基于FPGA的DDS设计,并通过DDS实现ASK,FSK,PSK三种调制
基于FPGA的DDS设计,并通过DDS实现ASK,FSK,PSK三种调制
185 0