QUIC 多流桥接、新增 DDS 协议转换代理

简介: 即将发布的超轻量 MQTT Broker NanoMQ 0.16为用户提供了2个重要新功能:MQTT over QUIC的多流桥接和DDS协议转换代理,拓宽了其弱网桥接传输性能和在边缘端的使用场景。

图1.jpg

驽马十驾,功在不舍。新春之交,NanoMQ 继续保持稳步更新,最新的 0.16 版本将于三月初发布。NanoMQ 为用户提供了 2 个重要新功能:MQTT over QUIC 的多流桥接和 DDS 协议转换代理,拓宽了 NanoMQ 的弱网桥接传输性能和在边缘端的使用场景。同时 NanoMQ 项目也在不懈努力提高项目的鲁棒性和安全性,积极快速响应社区提出的 Issue 和使用问题,新增了模糊测试用例和自动化的代码覆盖测试脚本。另外还新增了绿色安装版的 Windows 平台安装包。

*关于 MQTT over QUIC 的技术解析可参考:MQTT over QUIC:物联网消息传输还有更多可能

QUIC 多流桥接

QUIC 协议相较于 TCP 的一大优势在于解决了队首阻塞的问题,但这是依赖于 QUIC 的单链接多 Stream 特性的。NanoMQ 之前发布的 MQTT over QUIC 桥接功能中暂时只支持单流模式,所有的 MQTT 包都在单一消息流(Stream)上面传输。单流传输当遇到网络拥塞或者网络抖动的情况时,会引起大量的消息重传,一样会阻塞后续到达的消息,从而造成逐步提高的消息延时,甚至是连接断开。 为了解决这一问题,NanoMQ 和 EMQX 5.0 一起设计和引入了 Mutli-stream QUIC 协议标准,以提供更好消息传输体验。

何为 MQTT over QUIC + Mutli-Stream?

Stream 是 QUIC 协议中传输层轻量级的有序字节流抽象,流可以是双工或半双工的。每个 QUIC 链接必须有大于等于一个的 Stream。客户端发起连接和创建 Stream 后,在 Stream 的基础上建立 MQTT 连接和进行报文交互。

QUIC Multi-Stream 图2.png

而在 0.16 版本中 NanoMQ 正式支持了多流桥接,当用户使用 MQTT over QUIC 桥接功能并开启多流选项时,NanoMQ 会根据用户配置的桥接上下行主题自动创建对应的 Topic-Stream 配对,每个主题之间的消息互相隔离。

目前多流桥接将 Stream 分为以下两种类型

  • 控制流: 对于每个 MQTT over QUIC 连接,首次建立时必须先建立此 Stream,所有 MQTT 控制信令如 CONNECT/PINGREQ/PINGRESP 都默认在此流上传输。连接以控制流作为探测当前网络环境和连接健康度的唯一指标,控制流断开将导致连接重连。但用户也可以选择在控制流上传输 PUBLISH 包。
  • 数据流: 桥接客户端每次进行 PUBLISH 和 SUBSCRIBE 操作都会根据使用的主题创建一个对应的数据流。此流由订阅或发布行为开启,服务端与客户端都会标识记录 PUBLISH 和 SUBSCRIBE 包中 Topic 和 此 Stream 的对应关系。所有发布到此 Topic 的数据都会被定向到此数据流。有别于控制流,数据流断开不会导致连接断开,而是下次自动重建。

NanoMQ 多流桥接 图3.png

如上图所示,单路 Stream 的情况下若 Topic 1 QoS 1 消息重传,会导致 Topic 2 和 Topic 3 的消息被阻塞。若使用多 Stream 桥接则可以在带宽未耗尽的情况下让多个主题的消息并行传输。当然相对应而言只能够保证在同一个主题内部的 QoS 消息的传输和到达顺序。

如何使用多流桥接?

目前使用多流桥接只需打开对应的配置选项:

旧配置文件格式:

## multi-stream: enable or disable the multi-stream bridging mode
## Value: true/false
## Default: false
bridge.mqtt.emqx.quic_multi_stream=false

## 在流中是否赋予Qos消息高传输优先级
## 针对每个流单独生效,非主题优先级
## Value: true/false
## Default: true
bridge.mqtt.emqx.quic_qos_priority=true

HOCON 格式:

quic_multi_stream = false
quic_qos_priority=true

之后根据用户 Pub/Sub 的具体主题会建立对应的 Stream,可以在 log 中检查功能是否生效,如订阅 nanomq/1 主题就会自动创建一个 data stream:

quic_ack_cb: Quic bridge client subscribe to topic (QoS 1)nanomq/1.
mqtt_sub_stream: topic nanomq/1 qos 1
bridge client is connected!
quic_pipe_open: [strm][0x618000020080] Starting...
quic_pipe_open: [strm][0x618000020080] Done...
quic_strm_cb: quic_strm_cb triggered! 0
decode_pub_message: topic: [$SYS/brokers/connected], qos: 0
mqtt_sub_stream: create new pipe 0x61c000020080 for topic nanomq/1
quic_strm_cb: QUIC_STREAM_EVENT_START_COMPLETE [0x618000020080] ID: 4 Status: 0

之后 NanoMQ 就会自动根据 Topic 将数据包导流至不同的 Stream 发送。经过内部测试,在使用模拟 2s 延迟和 40% 丢包的弱网环境时,能够得到 stream 数量倍数的延时降低。

Sub 包优先传输

在上一个版本中,NanoMQ 支持了 MQTT over QUIC 桥接中的 QoS 消息的优先传输。从 0.16 版本开始,Subscribe/UnSubscribe 也将优先得到传输,来保证弱网下业务的正常运行。

DDS 协议代理

DDS 即 Data Distribution Service 数据分发服务,是以数据为中心的分布式实时通信中间件协议,采用发布/订阅体系架构,强调以数据为中心,提供丰富的 QoS 服务质量策略,以保障数据进行实时、高效、灵活地分发,可满足各种去中心化的实时通信应用需求。

DDS 多用于点对点形式将数据在进程间通信。DDS 虽然也可以允许发布者发布数据,订阅者订阅数据,以及发布者和订阅者之间的双向通信,但仅局限在同一个域内,难以进行跨域通信。而在云边一体化的消息场景中,结合 MQTT + DDS 两种协议可以完美融合 broker + brokerless 两种消息模式。

NanoMQ 0.16 发布了 DDS Proxy 插件,其基于 Cyclone DDS (基于 OMG(Object Management Group)DDS 规范的开源 DDS 实现)开发,能够完成将 DDS 消息转换为 MQTT 消息并桥接上云的功能,以支持用户将 DDS 的数据通过 NanoMQ 来完成跨域传输并通过 MQTT 和云端互通。

DDS 协议代理 图4.png

DDS Proxy 使用方法

NanoMQ 的 DDS 协议转换插件通过 nanomq_cli 启动。目前该功能尚未集成到 Release 安装包中,如需使用还需要编译安装。

NanoMQ 并未直接引入 DDS 库,所以需要在运行环境中安装 CycloneDDS。在下一版本中,会在常见的平台版本的官方发行安装包里内置 CycloneDDS 的静态库,但如需交叉编译还需按照以下流程:

安装 Iceoryx

iceoryx 是 CycloneDDS 通过共享内存通信所需的依赖,如不需共享内存 IPC 可跳过本步骤
$ git clone https://github.com/eclipse-iceoryx/iceoryx.git
$ cd iceoryx
$ git checkout release_2.0
$ mkdir build && cd build
$ cmake -G Ninja -DCMAKE_INSTALL_PREFIX={USER_LIBRARY_PATH} ../iceoryx_meta
$ ninja
$ sudo ninja install

安装 CycloneDDS

$ git clone https://github.com/eclipse-cyclonedds/cyclonedds.git
$ cd cyclonedds
$ mkdir build && cd build
$ cmake -G Ninja -DCMAKE_INSTALL_PREFIX={USER_LIBRARY_PATH} -DCMAKE_PREFIX_PATH={YOUR_LIBRARY_PATH} -DBUILD_EXAMPLES=ON ..
$ ninja
$ sudo ninja install

配置 DDS 的 IDL

DDS 相较于 MQTT 对于 payload 的定义方式不同,MQTT 协议并不关心消息的 Payload 内容, 而 DDS 通过用户编写的 IDL 文件来定义 DDS 消息的数据格式和类型。IDL(Interface Description Language)是一种通用的描述语言,用于在不同的编程语言之间定义数据类型,以保证不同节点之间以正确的格式通信。

CycloneDDS 库中通过 CMake 工具来自动根据用户内置的 IDL 文件来生成消息解析代码,其函数名和变量名会根据 IDL 中的定义改变。NanoMQ 保留了这一流程,每次编译时会根据 nanomq_cli/dds2mqtt/dds_type.idl 这一 IDL 文件来生成消息序列化和反序列化的代码。目前还需要用户自行将生成的代码 dds_type.hdds_type.c 替换 dds_mqtt_type_conversion.h/dds_mqtt_type_conversion.c/dds_client.c 文件中的函数名和引用。

在后续的 NanoMQ 版本规划中,将提供一个自动化的代码生成工具,能够根据 IDL 来自动替换源文件完成这部分工作,不再需要用户手动修改源码适配 DDS 结构体定义。

编译安装 NanoMQ + DDS Proxy

$ git clone https://github.com/emqx/nanomq.git
$ cd nanomq
$ mkdir build && cd build
$ cmake -G Ninja -DCMAKE_PREFIX_PATH={USER_LIBRARY_PATH} -DBUILD_DDS_PROXY=ON ..
$ ninja 
$ sudo ninja install

配置打开 DDS Proxy

配置 /etc/nanomq_dds_gateway.conf 来设置需要进行桥接和转发的 MQTT 和 DDS 主题。

## 转发规则配置
forward_rules = {
    ## DDS to MQTT 主题
    dds_to_mqtt = {
        from_dds = "MQTTCMD/topic1"
        to_mqtt = "DDS/topic1"
    }
    ## MQTT to DDS 主题
    mqtt_to_dds = {
        from_mqtt = "DDSCMD/topic1"
        to_dds = "MQTT/topic1"
    }
}

## DDS 配置参数
dds {
    idl_type = topic1Type
    domain_id = 0

    shared_memory = {
        enable = false
        log_level = info
    }
}

## MQTT 配置参数
mqtt {
    connector {
        server = "mqtt-tcp://127.0.0.1:1883"
        proto_ver = 4
        # clientid="bridge_client"
        keepalive = 60s
        clean_start = false
        username = username
        password = passwd

        ssl {
            enable = false
            key_password = "yourpass"
            keyfile = "/etc/certs/key.pem"
            certfile = "/etc/certs/cert.pem"
            cacertfile = "/etc/certs/cacert.pem"
        }
    }
}

启用 iceoryx 传输可参考文档:点击查看

启动和测试

此处以 DDS - MQTT 双向消息桥接为例。

  • 启动 MQTT Broker
$ nanomq start

$ emqx start
  • 启动 DDS Proxy 桥接 MQTT 的DDSCMD/topic1 主题到 DDS 的 MQTT/topic1 主题。
$ ./nanomq_cli ddsproxy proxy --conf nanomq_dds_gateway.conf
  • 启动 MQTT 客户端订阅主题 DDS/topic1 以验证消息是否桥接成功。
$ ./nanomq_cli sub --url "mqtt-tcp://127.0.0.1:1883" -t "DDS/topic1"

现在我们发布一些消息到 DDS 主题来验证桥接是否成功。

  • 启动 DDS 客户端订阅 DDS 主题 MQTT/topic1
$ ./nanomq_cli ddsproxy sub -t "MQTT/topic1"
  • 启动 MQTT 客户端发布消息到 MQTT 主题 DDSCMD/topic1
$ ./nanomq_cli pub --url "mqtt-tcp://127.0.0.1:1883" -t "DDSCMD/topic1" -m '{
  "int8_test":    1,
  "uint8_test":   50,
  "int16_test":   27381,
  "uint16_test":  1,
  "int32_test":   0,
  "uint32_test":  32,
  "int64_test":   6820785120,
  "uint64_test":  25855901936,
  "message":      "aaabbbddd",
  "example_enum": 0,
  "example_stru": {
    "message":      "abc"
    }
}'

就能在订阅 MQTT/topic1 的 DDS 客户端处收到对应消息。如此就完成了将 MQTT 消息转化为 DDS 消息的过程,反之亦可。

其他优化

增加覆盖率测试和模糊测试

为了继续提高 NanoMQ 的工程质量和安全性,我们从 0.15.1 版本开始引入了 CodeCOV 来完成自动化代码覆盖率测试。

在 2 月中,NanoMQ 收到了许多有价值的模糊测试数据集和 Issue,在用户的帮助下找到了许多潜在安全隐患,在此对 Nereuxofficial 和 realsung 两位用户致以真挚的感谢。所有 Issue 都已经在一天内完全修复,所有他们提供的数据集都已纳入项目的自动模糊测试。

推出 Windows 绿色版

为了更好的支持 Windows 平台的用户,我们基于之前发布的 msi 安装包基础上,推出了绿色版的安装包,解压即可用。目前 Windows 版本只支持基础的 MQTT broker 桥接、断网续传等功能。

问题修复

  1. 修复 0.15.1 版本中多路桥接中消息回环触发异步 IO 越界的问题。
  2. 修复若干由模糊测试发现的安全隐患和内存泄漏,提高鲁棒性。
  3. 修复 nanomq_cli 的 quic工具使用。
  4. 修复 nanomq_cli 的无法将输出重定向至文件的问题。
  5. 修复 TSAN 发现的 MQTT over QUIC 传输层的线程竞争问题。
  6. 修复 NanoMQ 0.15 版本在 Windows 平台的兼容问题。
  7. 增加了可以在 MQTT 5 桥接连接中定义连接属性。

即将到来

NanoMQ 项目进入了第三个年头,感谢大家对项目的使用和支持。在下一个版本,NanoMQ 项目计划引入 SOME/IP 协议的桥接转换。同时还会继续改进 DDS Proxy 功能的易用性,推出包含此功能的安装包,并为其加入 IDL 自动代码生成功能。此外,在 MQTT over QUIC 桥接部分还会增加双向认证和证书配置功能。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/nanomq-newsletter-202302

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
C语言 C++
boost库asio编译及配置
boost库asio编译及配置
811 0
|
8月前
|
机器学习/深度学习 数据可视化 搜索推荐
基于python的汽车数据可视化、推荐及预测系统
本研究围绕汽车数据可视化、推荐及预测系统展开,结合大数据与人工智能技术,旨在提升用户体验与市场竞争力。内容涵盖研究背景、意义、相关技术如 Python、ECharts、协同过滤及随机森林回归等,探讨如何挖掘汽车数据价值,实现个性化推荐与智能预测,为汽车行业智能化发展提供支持。
|
5月前
|
运维 监控 安全
工厂人员定位工卡从部署实施、典型应用、成本与ROI分析等详解(二)
本文详解工厂人员定位工卡的部署实施、典型应用与成本ROI。涵盖UWB/蓝牙/RFID融合技术的安装流程、化工等高危场景应用案例,实现精准定位、电子围栏报警、快速救援,提升安全与管理效率,并提供硬件、软件及文档交付清单,助力企业高效落地。如果您想进一步了解工厂人员定位工卡的技术和案例,欢迎搜索维构lbs智能定位~
|
数据可视化 Ubuntu 机器人
WebViz可视化工具的应用
WebViz可视化 Webviz是一个基于Web的可视化工具,意味着您可以通过浏览器/APP访问它,而不需要安装额外的软件。这对于远程访问和团队协作非常方便。 Foxglove是一个开源的工具包,包括线上和线下版。旨在简化机器人系统的开发和调试。它提供了一系列用于构建机器人应用程序的功能。 本节将介绍如何使用Foxglove进行数据查看,以及话题通信。 为了实现OriginBot与Foxglove的连接,我们需要在OriginBot上搭建ROS环境。请确保您的机器人是OriginBot(视觉版/导航版),并且您的PC运行的是Ubuntu(≥20.04)或Windows(>=10)。
427 6
|
传感器 人工智能 安全
人工智能与物联网:智能家居的新时代
【10月更文挑战第31天】随着科技的发展,人工智能(AI)和物联网(IoT)的融合正引领我们进入全新的智能家居时代。本文探讨了这一技术趋势如何改变生活方式,提升家居的便捷性、高效性和安全性,并展望了未来的挑战和前景。
|
前端开发 Java 数据库
💡Android开发者必看!掌握这5大框架,轻松打造爆款应用不是梦!🏆
在Android开发领域,框架犹如指路明灯,助力开发者加速应用开发并提升品质。本文将介绍五大必备框架:Retrofit简化网络请求,Room优化数据库访问,MVVM架构提高代码可维护性,Dagger 2管理依赖注入,Jetpack Compose革新UI开发。掌握这些框架,助你在竞争激烈的市场中脱颖而出,打造爆款应用。
1612 3
|
程序员 Linux 开发工具
老程序员分享:OpenCPN介绍及编译
老程序员分享:OpenCPN介绍及编译
1065 4
|
数据挖掘 C# 开发工具
医院体检信息系统
体检系统,是专为体检中心/医院体检科等体检机构,专门开发的全流程管理系统,通过软件实现检测仪器数据自动提取,内置多级医生工作台,细化工作将体检检查结果汇总,生成体检报告登记到计算机系统中。通过软件系统进行数据分析统计与评判以及建立体检相关的体检档案。从而实现体检流程的信息化,提高工作效率,减少手动结果录入的一些常犯错误。 在实际应用中,医院体检系统能够解决传统体检中手工操作带来的问题,如工作量大、效率低下、易漏检、重检或错检等。通过与医院信息系统(如HIS、LIS、PACS等)的连接,系统能够满足体检中心的日常工作流程,提供更好的管理、统计和查询分析功能。同时,基于网络基础的系统可以在网上传输
375 1
医院体检信息系统
|
人工智能 搜索推荐 测试技术
基于人工智能的代码分析与 Bug 检测实战
在人工智能(AI)尚未普及之时,检测程序错误主要依赖单元测试、代码扫描工具如SonarQube和FindBugs,以及人工集成测试。如今,AI技术显著提升了这一过程的效率,不仅能辅助开发者编写更高质量的代码,还能在单元测试与集成测试阶段提供支持,如通过Copilot+Codex优化单元测试,或利用ChatGPT等工具自动化生成测试脚本。本章将介绍如何运用AI工具识别三种常见错误:代码执行异常、未满足需求及变量命名不当,并通过实例演示Copilot如何高效定位并修正这些问题。