REDHAWK——连接(一)https://developer.aliyun.com/article/1474047
3、可附加流
可附加流提供了一种使用带外数据传输的方法。数据传输的机制由单独的网络协议定义,而 REDHAWK 提供连接管理。对于批量输入/输出(BulkIO),支持的两种可附加流是 SDDS 和 VITA49。BulkIO 通过使用 attach 方法将 StreamDefinition 对象传递给连接的端口。连接的输入端口为下游资源提供 StreamDefinition 对象,以创建到实际数据源的连接。传递的 StreamDefinition 对象被映射到底层的 BulkIO 数据类型。BulkIO SDDS 端口传递 SDDSStreamDefinition 对象,而 BulkIO VITA49 端口传递 VITA49StreamDefinition 对象。
①、SDDS 流定义
SDDS 流定义对象从网络接口定义了到数据源的连接。SDDS 流定义接口的方法不遵循常规的 BulkIO pushPacket()
约定;相反,该接口定义了 attach()
和 detach()
方法。attach()
和 detach()
方法在以下代码片段中提供。
/** * SDDS Stream Definition Interface */ /** * attach : request to an attachment to a specified network data source */ char *attach( BULKIO::SDDSStreamDefinition stream, const char * userid ); /** * detach: unlatch from a network data source */ void detach( const char* attachId );
下表描述 attach()
和 detach()
方法以及 SDDS 流定义成员。
4、pushPacket 数据流
强烈推荐对于C++和Python中的BulkIO数据,使用批量输入/输出(BulkIO)流 API,该 API 提供了一个高级接口来通过 BulkIO 端口发送和接收数据。
数据传输通过 REDHAWK 组件的端口对象的 pushPacket()
方法调用来实现。该方法将数据从使用方端口传输到相应连接的提供方端口。数据由中间件(omniORB)编组,并放置在一个队列中,等待接收组件处理。pushPacket()
方法的实现最大化了数据吞吐量的效率,同时提供了网络可访问的数据输入/输出,并尽量减少了实现的复杂性。
每个实现都维护着在接收任何数据传输之前提供一个信号相关信息(SRI)对象的必要行为。这是通过使用 SRI 对象调用端口的 pushSRI()
方法来实现的。在大多数情况下,组件接收来自输入端口的 ingest SRI 对象,根据需要进行任何必要的修改,并通过其输出端口将此对象下游传递。如果组件在其第一个 pushPacket()
之前没有提供 SRI 对象,端口将创建一个具有名义值的默认 SRI 对象以传出端口。
以下部分解释了组件传输支持的数据类型的不同方法。
对于当前的 omniORB 实现,/etc/omniORB.cfg 维护了由 giopMaxMsgSize 值定义的可配置最大传输大小。默认最大传输大小设置为 2097152(2MB)。对于每个
pushPacket()
,数据+头部必须小于此值;否则,中间件会抛出一个 MARSHAL 异常。这个最大值可以在运行时使用omniORB::giopMaxMsgSize()
函数调用或 bulkio::Const::MAX_TRANSFER_BYTES 值找到
①、矢量数据
组件通常在其服务函数中从端口摄取和输出数据。拥有提供端口(输入)的组件,使用 getPacket()
方法从端口抓取数据。此方法从输入端口的数据队列返回一个 dataTransfer 对象(在DataTransfer 成员描述中描述),如果队列为空,则返回 null/None 值。
以下代码片段是 getPacket()
方法的一个示例。
/** Grab data from the port's getPacket method */ bulkio::InFloatPort::dataTransfer *pkt; pkt = inFloatPort->getPacket( bulkio::Const::NON_BLOCKING ); // check if a valid packet was returned if ( pkt == NULL ) { return NOOP; } // check if any SRI changes occurred if ( pkt->sriChanged ) { outFloatPort->pushSRI( pkt->SRI ); } ... perform some algorithm on the data: pkt->dataBuffer ...
下表描述了 DataTransfer 成员。
当队列中的数据包数量超过队列深度时,会发生队列刷新条件。当发生刷新时,队列中的每个流都会被单个数据包替换。每个数据包包含最后的数据有效载荷和相应的时间戳,以及可能发生的任何 SRI 变更、队列刷新和 EOS 指示。如果在刷新发生时一个流只包含单个数据包,那么该流的 inputQueueFlushed 标志不会被设置,因为没有数据丢失。如果一个流标识符多次出现(流 ID 重用),每个这样的流都包含一个具有正确的数据有效载荷、时间戳、SRI 变更、队列刷新和 EOS 指示器的单个数据包。
以下代码片段是一个示例,演示了带有示例参数的向量数据的 pushPacket()
方法调用。
std::vector<short> data; ... perform algorithm and save results to data vector ... BULKIO::PrecisionUTCTime tstamp = bulkio::time::utils::now(); outShortPort->pushPacket( data, tstamp, false, "sample" );
下表描述向量数据的 pushPacket()
参数。
②、字符串数据/XML 文档
以下代码片段是使用 pushPacket()
示例参数调用字符串数据的方法的示例。
std::string data; ... generate some text data to transfer ... outStringPort->pushPacket( data.c_str(), false, "sample" );
下表描述了字符串数据的 pushPacket()
参数。
③、URL/文件数据
下面的代码片段是 pushPacket()
方法调用的示例,用于带有示例参数的文件传输。
std::string uri; uri = "file:///data/samples.8t"; ... open the file, fill with samples of data, close the file ... BULKIO::PrecisionUTCTime tstamp = bulkio::time::utils::now(); outURLPort->pushPacket( uri.c_str(), tstamp, false, "sample" );
下表描述了用于文件传输的 pushPacket()
参数。
数据文件可以通过批量输入/输出(BulkIO)dataFile 类型发送。使用 BulkIO dataFile 类型时,文件名会传递给 pushPacket()
方法。文件的位置由指向本地文件系统或软件定义无线电(SDR)文件系统的URI 指定。为了支持可移植性,推荐使用 SDR 文件系统。
④、URI 选项
下表描述了 URI 路径选项。
5、位数据
在 REDHAWK 2.2.0 及以上版本中,批量输入/输出(BulkIO)包括了一种打包位数据格式,即 BULKIO::dataBit。在处理阶段之间传输打包位数据已经标准化,包括传输非字节对齐位数的能力。
①、通用
REDHAWK 将位作为字节数组管理,每个字节包含最多8个连续位。位索引从最重要的位开始:位索引 0 是第一个字节的最重要位,位索引 1 是第二重要位,依此类推。
②、C++
在 C++中,有两个类,redhawk::shared_bitbuffer 和 redhawk::bitbuffer,它们提供对位数据的高级访问。这些类分别类似于 redhawk::shared_buffer 和 redhawk::buffer,但用于位数据。
BulkIO 打包位流设计为与 redhawk::shared_bitbuffer 一起工作。
③、只读
redhawk::shared_bitbuffer 类提供对存储在后备字节数组中的打包位数据的只读访问。
可以通过索引访问单个位:
int bit = buf[0];
位以整数值返回,始终为0或1。
可以使用 getint() 方法从给定位偏移中提取大小最多为 64 位的整数。以下示例在位 36 处提取一个 24 位整数值:
int value = buf.getint(36, 24);
返回的值是一个无符号的64位整数,提取的值在最低有效位中。
④、读/写
redhawk::bitbuffer 类添加了提供对 redhawk::shared_bitbuffer 类写访问的方法。
可以通过索引设置单个位:
buf[0] = 1;
任何非零值都设置位,而零清除位。
在 C++ 中,没有表示单个位的原始类型;索引赋值是用一个私有代理类实现的。取一个索引值的地址是一个编译错误。
可以使用 setint() 方法在给定位偏移处设置最多 64 位的整数值。以下示例在位 36 处设置一个 24 位整数值:
buf.setint(36, 0xABCDEF, 24);
值的最低有效24位被存储。
⑤、创建
要分配足够空间容纳 256 位的新 bitbuffer:
redhawk::bitbuffer data(256);
新 bitbuffer中 的位值未初始化。
要创建一个现有 shared_bitbuffer 的可写副本:
redhawk::bitbuffer data = shared.copy();
要解析字符串字面量,使用静态类方法 from_string()
:
redhawk::bitbuffer data = redhawk::bitbuffer::from_string("0101101010101");
from_string()
方法解析输入字符串,并返回一个拥有内存的新 bitbuffer。
要从最多 64 位的整数字面量创建 bitbuffer,使用静态类方法 from_int()
。以下示例从一个十六进制字面量创建一个 36 位的 bitbuffer:
redhawk::bitbuffer data = redhawk::bitbuffer::from_int(0x123456789, 36);
只有字面量的最低有效 36 位被取出;任何高于最低有效36位的位都被丢弃。
⑥、共享
创建新的位数据或转换现有位数据的算法应为每次迭代分配一个新的 bitbuffer。一旦 bitbuffer 被写入输出流,就不得修改。对 bitbuffer 内容的修改对共享相同数据的所有实例可见。然而,需要历史记录的算法可以保留一个对即将输出数据的只读引用,成本较低:
redhawk::shared_bitbuffer history = data;
⑦、位操作
shared_bitbuffer 类包括几个用于位级处理的有用功能:
distance(other)
返回两个 bitbuffers 之间的汉明距离。find(pattern, maxDistance)
在最大汉明popcount()
返回人口计数(1位的数量)。takeskip(M,N)
执行一个取/跳过操作,迭代地复制 M 位并跳过 N 位,直到数据的末尾。
6、处理复数数据
如果传入数据的 StreamSRI 模式字段设置为 1,则关联的输入数据是复数的(即,它由实部和虚部组成)。复数数据以交替的实数和虚数值发送。开发人员可以以任何方式处理这些数据;然而,本节提供了将数据转换为更可操作形式的常用方法。
①、在 C++ 中转换复杂数据
在 C++ 中,传入的 Bulk Input/Output(BulkIO)数据块提供了一个 complex()
方法来检查数据是否是复数的,以及一个 cxbuffer()
方法来将样本数据重新解释为 redhawk::shared_buffer 中的 std::complex 值。例如:
bulkio::ShortDataBlock block = stream.read(); if (block.complex()) { redhawk::shared_buffer<std::complex<short> > data = block.cxbuffer(); const size_t size = data.size(); }
7、时间戳
Bulk Input/Output(BulkIO)使用 BULKIO::PrecisionUTCTime 时间戳,表示自 1970 年 1 月 1 日午夜(Unix纪元)起的 UTC 时间。时间戳包含几个元素。在 BulkIO 中,时间戳对应于被推送的数据中第一个元素的出生日期。下表描述了构成 BULKIO::PrecisionUTCTime 结构的不同元素。
上表中描述的两个元素对应于预定义的值。tcstatus 只能取两个值,TCS_INVALID(0)和TCS_VALID(1),表示时间戳是否有效。无效的时间戳不包含有效的时间数据,应该被忽略。tcmode 是获取时间戳的方法,但这种用法已经被弃用,这个值被忽略。tcmode 的默认值是1。
以下代码片段提供了如何构造要在 pushPacket()
调用中发送的时间戳的示例。now()
方法返回当前的时间。
C++:
BULKIO::PrecisionUTCTime tstamp = bulkio::time::utils::now();
①、时间戳运算符 (C++)
在 C++中,BULKIO::PrecisionUTCTime 支持常见的算术、比较和流运算符。
向时间戳添加偏移量:
/** * Add 1/8th of a second to the current time */ BULKIO::PrecisionUTCTime time1 = bulkio::time::utils::now(); time1 += 0.125;
减去两个时间戳返回以秒为单位的差值:
/** * Check if time2 is less than a second after time1 */ if (time2 - time1 < 1.0) { ... }
比较两个时间戳:
/** * Check if the second time stamp occurs before the first */ if (time2 < time1) { ... }
流格式(输出格式为“YYYY:MM:DD::HH::MM::SS.SSSSSS”):
/** * Write the current time out to the console */ std::cout << bulkio::time::utils::now() << std::endl;
8、端口统计
所有 Bulk Input/Output(BulkIO)端口都包含一个名为 statistics 的只读属性。statistics 属性的类型是 BULKIO::PortStatistics,它包含了关于端口性能的信息。下表包含了一个统计结构的描述:
提供方端口包含单个 PortStatistics 结构。使用方端口包含一系列 PortStatistics 结构;每一个都与单个连接相关联。
一个有趣的练习是创建在 REDHAWK 支持的三种语言中生成和消费数据的组件。数据生成器和消费器尽可能快地生成/消费数据。统计数据结构可以提供有关数据传输速率、平均延迟和其他相关数据的指标。通过改变 pushPacket()
调用中序列的大小来调整传输长度,并观察其对连接性能的影响,也是很有教育意义的。
REDHAWK——连接(三)https://developer.aliyun.com/article/1474049