Robot OS网络通信MQTT实战

简介: 最近开发的机器人操作系统ROS基于Android,在里面做一些深度定制,其中运动控制与Server的交互需要双向通道,经过权衡和讨论我们最终选用MQTT作为长连接通信方案。

image.png


1. 背景


最近开发的机器人操作系统ROS基于Android,在里面做一些深度定制,其中运动控制与Server的交互需要双向通道,经过权衡和讨论我们最终选用MQTT作为长连接通信方案。


2. MQTT介绍


image.png


MQTT 的全称为 Message Queue Telemetry Transport(消息队列遥测传输协议),是ISO 标准(ISO/IEC PRF 20922)下基于客户端-服务器的消息发布/订阅传输协议,目的是为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。目前在Iot应用广泛,主要有以下优点:


1.简单:MQTT是一种消息队列协议,使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合,相对于其他协议,开发更简单;


2.传输更加稳定:因为是工作在TCP/IP协议上,由TCP/IP协议提供稳定的网络连接;


3.轻量级:小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,降低网络流量;适合低带宽,数据量较小的应用;


4.易于实现:MQTT协议的服务端程序已经非常成熟,PHP,JAVA,Python,C,C#等系统语言都可以实现MQTT服务,客户端也同样丰富,eclipse开源了主流编程语言的MQTT客户端实现;


5.开放性:因为基于ISO标准,所以MQTT只提供标准,我们自己可以基于协议实现这个标准,而且可以和别的系统进行对接,加上市面上大量开放源代码,也进一步推动了MQTT的发展,百度云、阿里云、中国移动onenet等几乎所有的开放性物联网平台都支持MQTT;


总结下来就是:简单易用开放


3. MQTT协议分析


3.1 MQTT设计特点


为了满足低电量消耗和低网络带宽的需求,MQTT 协议在设计之初就包含了以下一些特点:


  1. 实现简单
  2. 提供数据传输的 QoS
  3. 轻量、占用带宽低
  4. 可传输任意类型的数据
  5. 可保持的会话(session)


3.2 MQTT协议特点


  1. 基于 TCP 协议的应用层协议;
  2. 采用 C/S 架构;
  3. 使用订阅/发布模式,将消息的发送方和接受方解耦;
  4. 提供 3 种消息的 QoS(Quality of Service): 至多一次,最少一次,只有一次;
  5. 收发消息都是异步的,发送方不需要等待接收方应答。


3.3 MQTT通信模型


MQTT 的通信是通过发布/订阅的方式来实现的,订阅和发布又是基于主题(Topic)的。发布方和订阅方不直接进行连接,而是用到了一个中间方,它们是通过这种方式来进行解耦。两端通信的主要流程如下:


  1. 发布方(Publisher)连接到Broker;
  2. 订阅方(Subscriber)连接到Broker,并订阅主题Topic1;
  3. 发布方(Publisher)发送给Broker一条消息,主题为Topic1;
  4. Broker收到了发布方的消息,发现订阅方(Subscriber)订阅了Topic1,然后将消息转发给订阅方(Subscriber);
  5. 订阅方从Broker接收该消息;


MQTT还支持离线消息,发布方在发布消息时并不需要订阅方也连接到Broker,只要订阅方之前订阅过相应主题,那么它在连接到Broker之后就可以收到发布方在它离线期间发布的消息。


3.3 MQTT协议数据包


MQTT 协议数据包的消息格式为:固定头|可变头|消息体。


  • 固定头(Fixed header):存在于所有的MQTT数据包中,用于表示数据包类型及对应标志、数据包大小等;
  • 可变头(Variable header):存在于部分类型的MQTT数据包中,具体内容是由相应类型的数据包决定的;
  • 消息体(Payload):存在于部分的MQTT数据包中,存储消息的具体数据。


具体的结构说明可以参考官方文档,这里截图两个MQTT协议传输时数据内容:


image.png

image.png


在我们了解MQTT开源源码或者自己实现MQTT协议的库时再着重研究协议具体内容。


4. MQTT开发环境搭建


4.1 搭建MQTT服务


我们可以使用mosquitto快速搭建mqtt server。在ubuntu上直接执行sudo apt-get install mosquitto,在Mac上执行sudo brew install mosquitto即可。


具体配置操作以ubuntu为例:


sudo service mosquitto status
sudo service mosquitto start
sudo service mosquitto stop


conf配置文件mosquitto.conf:


pid_file /var/run/mosquitto.pid
# 消息持久存储
persistence true
persistence_location /var/lib/mosquitto/
# 日志文件
log_dest file /var/log/mosquitto/mosquitto.log
# 其他配置
include_dir /etc/mosquitto/conf.d
# 禁止匿名访问
allow_anonymous false
# 认证配置
password_file /etc/mosquitto/pwfile
# 权限配置
acl_file /etc/mosquitto/aclfile


mosquitto -c /etc/mosquitto/mosquitto.conf -d


  • -c 指定配置文件
  • -d 表示后台启动


4.2 验证服务


安装mosquitto配套客户端文件:


sudo apt-get install mosquitto-clients


mosquitto-client包含pub和sub两个命令工具,对应参数说明:


mosquitto_pub命令参数说明:


  • -d   打印debug信息
  • -f    将指定文件的内容作为发送消息的内容
  • -h   指定要连接的域名  默认为localhost
  • -i    指定要给哪个clientId的用户发送消息
  • -I    指定给哪个clientId前缀的用户发送消息
  • -m  消息内容
  • -n   发送一个空(null)消息
  • -p   连接端口号
  • -q   指定QoS的值(0,1,2)
  • -t    指定topic
  • -u   指定broker访问用户
  • -P   指定broker访问密码
  • -V   指定MQTT协议版本
  • --will-payload   指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与--will-topic一起使用
  • --will-qos   Will的QoS值。该参数需要与--will-topic一起使用
  • --will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与--will-topic一起使用
  • --will-topic  用户发送Will消息的topic


mosquitto_sub命令参数说明:


  • -c  设定‘clean session’为无效状态,这样一直保持订阅状态,即便是已经失去连接,如果再次连接仍旧能够接收的断开期间发送的消息。
  • -d  打印debug信息
  • -h  指定要连接的域名  默认为localhost
  • -i   指定clientId
  • -I   指定clientId前缀
  • -k keepalive 每隔一段时间,发PING消息通知broker,仍处于连接状态。 默认为60秒。
  • -q   指定希望接收到QoS为什么的消息  默认QoS为0
  • -R   不显示陈旧的消息
  • -t    订阅topic
  • -v   打印消息
  • --will-payload  指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与--will-topic一起使用
  • --will-qos   Will的QoS值。该参数需要与--will-topic一起使用
  • --will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与--will-topic一起使用
  • --will-topic  用户发送Will消息的topic


示例:


mosquitto_pub -h localhost -p 1883 -t "demo/1" -m "test"
mosquitto_sub -h localhost -p 1883 -t "demo/1"


直接运行上面命令既可完成发布与订阅操作。


4.3 跑通Android平台客户端


上面是mosquitto提供的现成的PC命令工具,我们要在Android平台上跑通客户端需要用Java和C++实现一套,eclipse也提供了C++/C以及Java版本的库,我们直接编译到Android平台即可。已C库为例:


下载github.com/eclipse/pah…


make_minimum_required(VERSION 3.4.1)
set(CMAKE_INSTALL_PREFIX "${CMAKE_BINARY_DIR}" CACHE PATH "Installation directory" FORCE)
message(STATUS "CMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}")
project(mqtt)
SET(VERSION 0.0.1)
add_definitions(-w)
#file(READ version.major PAHO_VERSION_MAJOR)
#file(READ version.minor PAHO_VERSION_MINOR)
#file(READ version.patch PAHO_VERSION_PATCH)
SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
## build options
SET(PAHO_WITH_SSL FALSE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ")
SET(PAHO_BUILD_SHARED TRUE CACHE BOOL "Build shared library")
SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)")
SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
SET(PAHO_ENABLE_TESTING TRUE CACHE BOOL "Build tests and run")
SET(PAHO_ENABLE_CPACK TRUE CACHE BOOL "Enable CPack")
SET(PAHO_HIGH_PERFORMANCE FALSE CACHE BOOL "Disable tracing and heap tracking")
SET(PAHO_USE_SELECT FALSE CACHE BOOL "Revert to select system call instead of poll")
IF (PAHO_HIGH_PERFORMANCE)
    ADD_DEFINITIONS(-DHIGH_PERFORMANCE=1)
ENDIF()
IF (PAHO_USE_SELECT)
    ADD_DEFINITIONS(-DUSE_SELECT=1)
ENDIF()
IF (NOT PAHO_BUILD_SHARED AND NOT PAHO_BUILD_STATIC)
    MESSAGE(FATAL_ERROR "You must set either PAHO_BUILD_SHARED, PAHO_BUILD_STATIC, or both")
ENDIF()
SET(ROOT ${CMAKE_SOURCE_DIR})
SET(BUILD_PATH "${CMAKE_BINARY_DIR}")
MESSAGE(STATUS "current project name is ${PROJECTNAME}")
SET(CMAKE_BUILD_TYPE "DEBUG")
#SET(CMAKE_BUILD_TYPE "RELEASE")
set(SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/main/cpp/paho.mqtt.c/src)
set(JNI_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/main/cpp)
find_library(log-lib log)
include_directories(${SOURCE_DIR})
include_directories(${JNI_SOURCE_DIR})
set(common_src
        ${SOURCE_DIR}/MQTTTime.c
        ${SOURCE_DIR}/MQTTProtocolClient.c
        ${SOURCE_DIR}/Clients.c
        ${SOURCE_DIR}/utf-8.c
        ${SOURCE_DIR}/MQTTPacket.c
        ${SOURCE_DIR}/MQTTPacketOut.c
        ${SOURCE_DIR}/Messages.c
        ${SOURCE_DIR}/Tree.c
        ${SOURCE_DIR}/Socket.c
        ${SOURCE_DIR}/Log.c
        ${SOURCE_DIR}/MQTTPersistence.c
        ${SOURCE_DIR}/Thread.c
        ${SOURCE_DIR}/MQTTProtocolOut.c
        ${SOURCE_DIR}/MQTTPersistenceDefault.c
        ${SOURCE_DIR}/SocketBuffer.c
        ${SOURCE_DIR}/LinkedList.c
        ${SOURCE_DIR}/MQTTProperties.c
        ${SOURCE_DIR}/MQTTReasonCodes.c
        ${SOURCE_DIR}/Base64.c
        ${SOURCE_DIR}/SHA1.c
        ${SOURCE_DIR}/WebSocket.c
        ${SOURCE_DIR}/Proxy.c
        ${SOURCE_DIR}/MQTTClient.c
        ${JNI_SOURCE_DIR}/JNI_OnLoad.cc
        ${JNI_SOURCE_DIR}/jni_utils.cc
        ${JNI_SOURCE_DIR}/keutil.cc
        )
IF (NOT PAHO_HIGH_PERFORMANCE)
    SET(common_src ${common_src}
            ${SOURCE_DIR}/StackTrace.c
            ${SOURCE_DIR}/Heap.c
            )
ENDIF()
SET(LIBS_SYSTEM c dl)
set(SELF_LIB_NAME kemqtt)
add_library(${SELF_LIB_NAME} SHARED ${common_src})
target_link_libraries(${SELF_LIB_NAME}
        ${log-lib}
        )


封装JNI接口进行Pub:


static jlong
_start(JNIEnv *env, jclass cls) {
    LOGI("start...");
  MQTTClient client;
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  conn_opts.username = "A1_TEST_TOKEN";
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  MQTTClient_deliveryToken token;
  int rc;
  if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
                MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
  {
  LOGI("Failed to create client, return code %d\n", rc);
  return -1;
  }
  conn_opts.keepAliveInterval = 20;
  conn_opts.cleansession = 1;
  if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
  {
  LOGI("Failed to connect, return code %d\n", rc);
  return -2;
  }
  pubmsg.payload = (void *)PAYLOAD;
  pubmsg.payloadlen = (int)strlen(PAYLOAD);
  pubmsg.qos = QOS;
  pubmsg.retained = 0;
  if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
  {
  LOGI("Failed to publish message, return code %d\n", rc);
  return -3;
  }
  LOGI("Waiting for up to %d seconds for publication of %s\n"
     "on topic %s for client with ClientID: %s\n",
     (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
  rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
  LOGI("Message with delivery token %d delivered\n", token);
  if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
  LOGI("Failed to disconnect, return code %d\n", rc);
  MQTTClient_destroy(&client);
    return (jlong) 0;
}


这里直接改写官方demo,最开始怎么也连不上,连接错误返回-1,最后发现是测试机好久没用,没有联网了,浪费了好长时间去查代码问题;网络修复后又遇到返回错误5的问题,是因为server开启了用户验证,所以需要配置username,配置好后成功完成了发布。


5. 参考资源

1.MQTT Client各语言实现大全

2.MQTT协议报文格式解析

6. 总结


本文介绍了MQTT和MQTT协议的数据包结构,并且介绍了MQTT开发环境的搭建,都比较粗浅,后续文章深入分析MQTT协议内容以及MQTT Java和C版本代码实现细节,从代码角度分析MQTT协议的优点,并完成实现MQTT的Android版本交互以及与Protbuf的结合等。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
机器学习/深度学习 PyTorch 算法框架/工具
目标检测实战(一):CIFAR10结合神经网络加载、训练、测试完整步骤
这篇文章介绍了如何使用PyTorch框架,结合CIFAR-10数据集,通过定义神经网络、损失函数和优化器,进行模型的训练和测试。
92 2
目标检测实战(一):CIFAR10结合神经网络加载、训练、测试完整步骤
|
17天前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
60 6
|
1月前
|
机器学习/深度学习 数据可视化 测试技术
YOLO11实战:新颖的多尺度卷积注意力(MSCA)加在网络不同位置的涨点情况 | 创新点如何在自己数据集上高效涨点,解决不涨点掉点等问题
本文探讨了创新点在自定义数据集上表现不稳定的问题,分析了不同数据集和网络位置对创新效果的影响。通过在YOLO11的不同位置引入MSCAAttention模块,展示了三种不同的改进方案及其效果。实验结果显示,改进方案在mAP50指标上分别提升了至0.788、0.792和0.775。建议多尝试不同配置,找到最适合特定数据集的解决方案。
294 0
|
18天前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
41 4
|
18天前
|
网络协议 物联网 API
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第26天】Python 是一门功能强大且易于学习的编程语言,Twisted 框架以其事件驱动和异步IO处理能力,在网络编程领域独树一帜。本文深入探讨 Twisted 的异步IO机制,并通过实战示例展示其强大功能。示例包括创建简单HTTP服务器,展示如何高效处理大量并发连接。
39 1
|
19天前
|
网络协议 安全 NoSQL
网络空间安全之一个WH的超前沿全栈技术深入学习之路(8-2):scapy 定制 ARP 协议 、使用 nmap 进行僵尸扫描-实战演练、就怕你学成黑客啦!
scapy 定制 ARP 协议 、使用 nmap 进行僵尸扫描-实战演练等具体操作详解步骤;精典图示举例说明、注意点及常见报错问题所对应的解决方法IKUN和I原们你这要是学不会我直接退出江湖;好吧!!!
网络空间安全之一个WH的超前沿全栈技术深入学习之路(8-2):scapy 定制 ARP 协议 、使用 nmap 进行僵尸扫描-实战演练、就怕你学成黑客啦!
|
19天前
|
网络协议 安全 算法
网络空间安全之一个WH的超前沿全栈技术深入学习之路(9):WireShark 简介和抓包原理及实战过程一条龙全线分析——就怕你学成黑客啦!
实战:WireShark 抓包及快速定位数据包技巧、使用 WireShark 对常用协议抓包并分析原理 、WireShark 抓包解决服务器被黑上不了网等具体操作详解步骤;精典图示举例说明、注意点及常见报错问题所对应的解决方法IKUN和I原们你这要是学不会我直接退出江湖;好吧!!!
网络空间安全之一个WH的超前沿全栈技术深入学习之路(9):WireShark 简介和抓包原理及实战过程一条龙全线分析——就怕你学成黑客啦!
|
17天前
|
网络协议 调度 开发者
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第27天】本文介绍了Python网络编程中的Twisted框架,重点讲解了其异步IO处理机制。通过反应器模式,Twisted能够在单线程中高效处理多个网络连接。文章提供了两个实战示例:一个简单的Echo服务器和一个HTTP服务器,展示了Twisted的强大功能和灵活性。
28 0
|
19天前
|
网络协议 安全 算法
网络空间安全之一个WH的超前沿全栈技术深入学习之路(9-2):WireShark 简介和抓包原理及实战过程一条龙全线分析——就怕你学成黑客啦!
实战:WireShark 抓包及快速定位数据包技巧、使用 WireShark 对常用协议抓包并分析原理 、WireShark 抓包解决服务器被黑上不了网等具体操作详解步骤;精典图示举例说明、注意点及常见报错问题所对应的解决方法IKUN和I原们你这要是学不会我直接退出江湖;好吧!!!
|
1月前
|
SQL 安全 算法
网络安全的盾牌与剑:漏洞防御与加密技术的实战应用
【9月更文挑战第30天】在数字时代的浪潮中,网络安全成为守护信息资产的关键防线。本文深入浅出地探讨了网络安全中的两大核心议题——安全漏洞与加密技术,并辅以实例和代码演示,旨在提升公众的安全意识和技术防护能力。