REDHAWK——连接(续)(一)

简介: REDHAWK——连接(续)(一)

前言

上文


一、突发 IO

对于那些需要小型且可能是非连续的数据块(或突发)以及频繁变化的元数据的应用程序,突发输入/输出(BurstIO)提供了满足这些要求的数据传输容器和接口。这个接口仅支持数据向量的传输:float, double, octet (int8/uint8), short (int16), ushort (uint16), long (int32), ulong (uint32), longlong (int64), 和 ulonglong(uint64)。与批量输入/输出(BulkIO)类似,BurstIO 提供了突发信号相关信息(SRI)和精确时间戳,但是它通过每个数据突发中的带内信息提供这些信息。由于元数据的增加的开销要求,通过将多个突发分组为单次传输,无论是通过编程还是通过可配置的策略设置,BurstIO 可以实现其最高吞吐量,以尝试最大化效率并限制延迟

1、数据传输

突发输入/输出(BurstIO)数据传输通过 REDHAWK 资源(组件或设备)的 BurstIO 端口对象的 pushBurst()pushBursts() 方法调用来实现。资源可以使用这些 push 方法将突发及其关联的元数据从一个资源传输到另一个资源,这一过程在资源的服务功能内进行。与批量输入/输出(BulkIO)相似,BurstIO 接口为突发的每个数据向量提供相同的 BULKIO::PrecisionUTCTime 时间戳。BurstIO 定义了一个新的 BurstSRI 信号相关信息(SRI)对象,使开发人员能够进一步描述信号环境和数据转换。这些字段在突发信号相关信息(SRI)中有进一步的描述。

①、输入

拥有提供端口(输入端口)的资源,使用 getBurst() 方法从端口抓取数据。此方法从输入端口的数据队列返回一个 PacketType 对象(在突发包访问器中描述),如果队列为空,则返回 null/None 值。

以下代码片段是 getBurst() 方法的一个示例。

/**
   Grab data from the port's getBurst method
 */
burstio::BurstShortIn::PacketType *pkt;
pkt = inShortPort->getBurst( bulkio::Const::NON_BLOCKING );
// check if a valid packet was returned
if ( pkt == NULL ) {
  return NOOP;
}
// check for EOS
if ( pkt->getEOS() ) {
  outShortPort->pushBurst(pkt->getSequence(), pkt->getSRI(), pkt->getEOS());
}
...  perform algorithm on the data: pkt->getData() ... or pkt->getSequence()

②、输出

由于 BurstIO 数据的异步性,该接口使开发者能够控制从资源中输出(输出)突发。将突发数据从一个资源推送到另一个资源的两个主要方法调用是:pushBursts()pushBurst()pushBursts() 允许直接将多个突发作为 BurstType 对象的序列直接下游发送而 pushBurst() 提供了一个接口来排队推送单个突发,但根据突发的数量、总队列大小和发送间隔遵循策略指令。这两种方法都使用指定的路由约束和连接过滤器路由突发数据,这些可以通过以下接口控制:

// this route streams with Stream ID == "data-stream-one" to a connection
// identified as "connection-one"
shortBurstPort->addConnectionFilter("data-stream-one", "connection-one");

或者:

// update connection filter using the Component's connection property
// "myConnectionTable"
shortBurstPort->updateConnectionFilter(myConnectionTable);
// this sets the stream filter to only route streams to specific connections
shortBurstPort->setRoutingMode(burstio::ROUTE_CONNECTION_STREAMS);

pushBurst()pushBursts() 方法之间的主要区别在于管理数据传输的方式和时间的能力。只有使用 pushBurst() 排队的突发流量受到策略约束的控制,而对 pushBursts() 的调用则直接发送到下游连接的资源。

// this method will limit the maximum number of bursts that
// can be queued  before they are sent
shortBurstPort->setMaxBursts(size_t count);
// this method will enable threshold monitoring for the amount of sample
// data that exceeds this limit before sending data downstream
shortBurstPort->setByteThreshold(size_t bytes);
// this method will enable the latency time between the sending of
// available data downstream
shortBurstPort->setLatencyThreshold( long usec );

以下代码片段是一个使用 pushBurst() 方法调用将向量数据样本排队到端口的示例。

std::vector< BurstShortOut::NativeType > data;
my_transform(data);
BURSTIO::BurstSRI  sri;
burstio::BurstShortOut::BurstType burst;
burst.SRI = sri;
burst.EOS = false;
burst.T = burstio::utils::now();
burst.data.length(data.size());
for(int i=0; i< data.size(); i++ ) burst.data[i] = data[i];
// this queues a single burst
shortBurstPort->pushBurst( burst );
// or
std::vector< BurstShortOut::NativeType > data;
my_transform(data);
// this queues a single burst
shortBurstPort->pushBurst( data, sri, burstio::utils::now() );

以下代码片段是一个使用 pushBursts() 方法调用的向量数据样本的示例。此调用中的突发直接传递到下游的连接资源。

std::vector< BurstShortOut::NativeType > data;
my_transform(data);
BurstShortOut::BurstSequenceType bursts;
bursts.length(1);
burstio::BurstShortOut::BurstType burst;
burst.SRI = sri;
burst.EOS = false;
burst.T = burstio::utils::now();
burst.data.length(data.size());
for(int i=0; i< data.size(); i++ ) burst.data[i] = data[i];
bursts[0] = burst;
// this pushes the burst directly downstream because
// it is a sequence of bursts
shortBurstPort->pushBursts(bursts);

2、突发信号相关信息 (SRI)

BurstSRI 对象随每个数据突发一起传送,并描述数据生产者的数据负载和处理状态。下表仅描述了在资源之间传递突发数据时数据结构的必需字段。

3、多输出端口

每个输出突发输入/输出(BurstIO)端口类型都提供了基于流 ID 和连接 ID 过滤来自资源的突发数据的能力。要使用端口的多出能力,资源必须包含类似于以下的代码:

<structsequence id="connectionTable">
    <struct id="connectionTable::connection_descriptor"
               name="connection_descriptor">
      <simple id="connectionTable::connection_id" name="connection_id"
                 type="string">
        <kind kindtype="configure"/>
      </simple>
      <simple id="connectionTable::stream_id" name="stream_id" type="string">
        <kind kindtype="configure"/>
      </simple>
      <simple id="connectionTable::port_name" name="port_name" type="string">
        <kind kindtype="configure"/>
      </simple>
    </struct>
    <configurationkind kindtype="configure"/>
</structsequence>

为了将特定的数据流引导到特定的连接,需要将 connectionTable 对象传递给端口的 updateConnectionFilter 方法。当路由模式设置为 ROUTE_CONNECTION_STREAMS 时,端口将会将过滤状态应用于通过资源的 BurstIO 端口传出的任何突发流量。要将突发传递给现有连接,端口的过滤器表中必须存在与下游资源的突发的流 ID 和连接 ID 匹配的项。

4、使用复数数据

每个传入数据的 BurstPacket 提供了 getComplex()` 方法,以表示向量是否包含复数样本(它由实部和虚部组成)。复数数据以交替的实部和虚部值发送。开发者可以以任何方式处理这些数据;然而,本节描述了将数据转换为更易于处理形式的常用方法。

①、在 C++ 中转换复数数据

在 C++ 中,传入的突发输入/输出(BurstIO)数据向量可以被强制转换为复数值的 std::vector。例如:

BurstShortIn::BurstPacket *pkt = myShortPort->getPacket(bulkio::Const::BLOCKING);
if ( pkt->isComplex() ) {
  BurstShortIn::ComplexType cplx_data = pkt->getComplexData();
  // ... do some processing with cplx_data
}

5、时间戳

以下代码段提供了一个示例,展示如何构造一个要在突发信号相关信息(SRI)中发送的 BULKIO::PrecisionUTCTime 时间戳。

/**
 * To create a time stamp from the current time of day
 */
BULKIO::PrecisionUTCTime tstamp = burstio::utils::now();

6、端口统计

所有突发输入/输出(BurstIO)端口支持批量输入/输出(BulkIO)统计接口,并添加了跟踪特定于突发的指标的额外关键词。统计数据是在 10 次 pushBurst 调用的窗口中跟踪的。输入端口包含单个 PortStatistics 结构,而输出端口包含一系列 PortStatistics 结构;每个连接一个结构。有关 BULKIO::PortStatistics 的更多信息,请参见端口统计。输入和输出端口的额外 BurstIO 指标在以下表格中描述:

①、C++

以下示例说明了一个组件,该组件对传入的突发数据进行转换,并将结果向下游推送。

burstio::BurstShortIn::PacketType *pkt;
pkt = inShortPort->getPacket(bulkio::Const::NON_BLOCKING);
// check if a valid packet was returned
if ( pkt == NULL ) {
  return NOOP;
}
// check for EOS
if ( pkt->getEOS() ) {
  outShortPort->pushBurst(pkt->getSequence(), pkt->getSRI(), pkt->getEOS());
}
// do some processing.....to the burst contents
BurstShortOut::SequenceType  data =  do_some_magic(pkt->getSequence());
// we changed the data so calc new time stamp....
BULKIO::PrecisionUTCTime newTS = calc_timestamp(pkt->getTime());  
outShortPort->pushBurst(data, pkt->getSRI(), newTS, pkt->getEOS());

二、消息传递

消息传递依赖于 CORBA 的事件结构作为传输结构。在 CORBA 的事件 API 中,消息通过使用函数 push() 以 Any 类型传递。

虽然 CORBA 管理数据的编组和传递,但它并未提供任何固有于事件的机制来描述 Any 类型的内容。REDHAWK 决定利用现有的负载结构描述符来描述消息的负载,即属性接口描述语言(IDL)。选择此接口消除了创建描述消息的新 IDL 的需要。此外,已有一种 XML 结构映射到高效的二进制数据结构,允许使用 XML 来描述消息内容,同时消除了在消息传递机制中引入 XML 解析器的需要。

为了支持这项额外功能,REDHAWK 扩展了属性描述符,允许属性具有消息类型。唯一可以具有有效消息类型的属性是结构。

1、消息生产者

在创建新组件或编辑现有组件时,可以创建一个消息生产者。创建消息生产者后,您必须注册您的代码,以便从端口发送消息。以下程序解释了如何创建消息生产者并发送消息。

①、创建一个消息生产者

使用 REDHAWK IDE 向组件或设备添加消息生产者端口,请遵循以下步骤:

  • 1)从项目资源管理器视图中,双击组件的软件包描述符(SPD)文件。此时将显示组件编辑器。
  • 2)在组件编辑器中,选择“属性”标签。组件编辑器的属性标签页将被显示。

  • 3)要添加结构属性,请点击“添加结构”。此时将显示属性标签的结构属性部分。

  • 4)在结构属性部分,输入产生的消息的名称。ID 默认为您输入的名称。从“类型(Kind)”下拉菜单中,选择“消息(message)。


REDHAWK——连接(续)(二)https://developer.aliyun.com/article/1474068

目录
相关文章
|
7月前
|
IDE 测试技术 API
REDHAWK——连接(续)(三)
REDHAWK——连接(续)(三)
59 0
|
7月前
|
IDE Java 开发工具
REDHAWK——组件结构
REDHAWK——组件结构
90 0
|
7月前
|
Ubuntu Linux Shell
mc实现目录同步并封装成Linux服务形式
mc实现目录同步并封装成Linux服务形式
303 9
|
7月前
|
存储 算法 中间件
REDHAWK——连接(二)
REDHAWK——连接(二)
89 1
|
7月前
|
IDE 算法 中间件
初识REDHAWK
初识REDHAWK
242 2
|
7月前
|
C++
REDHAWK——连接(续)(二)
REDHAWK——连接(续)(二)
40 0
|
7月前
|
XML 区块链 C++
REDHAWK——连接(一)
REDHAWK——连接(一)
107 0
|
7月前
|
C++ Python
REDHAWK——连接(三)
REDHAWK——连接(三)
45 0
|
Python
【从零学习python 】74. UDP网络程序:端口问题与绑定信息详解
【从零学习python 】74. UDP网络程序:端口问题与绑定信息详解
220 0