MQTT物联网通讯协议入门及Demo实现

简介: MQTT物联网通讯协议入门及Demo实现

一、MQTT协议概念

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),它是一个极其轻量级发布/订阅消息传输协议,轻量级指的是较少的代码和带宽。因为在物联网行业有类似充电桩、娃娃机、遥控飞行器等等这样的设备,它们的网络可能存在不稳定的情况并且只需要传输少量的数据,MQTT就应运而生专为受限设备和低带宽、高延迟或不可靠的网络而设计。

发布/订阅机制

发布/订阅模型将发送消息的客户端(发布者)与接收消息的客户端(订阅者)分离。发布者和订阅者从不直接联系。他们甚至不知道对方的存在,它们之间由一个第三方组件(代理)处理帮助筛选所有传入消息,并将其正确分发给订阅者。消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者

这个机制最重要的是将发布者和订阅者进行解耦

  1. 发布者、订阅者不需要交换端口知道对方的主机,只需要知道代理的主机和端口
  2. 发布者、订阅者不需要同时都运行,哪怕一方下线
  3. 发布或接收期间,这两个组件上的操作都不需要中断

MQTT客户端

发布者和订阅者都是客户端,可以是设备也可以是服务器,简单来说就是网络连接到MQTT代理的任何设备

Broker代理(服务器)

代理负责接收所有消息、过滤消息、确定谁订阅了每条消息,并将消息发送到这些订阅的客户端。代理还保存具有持久会话的所有客户端的会话数据,包括订阅和丢失的消息。代理的另一个职责是客户端的身份验证和授权。通常,代理是可扩展的,这有助于自定义身份验证、授权和集成到后端系统中。

MQTT消息结构

MQTT消息包含三个部分:

  • 固定头(Fixed header)

  • 可变头(Variable header)

  • 消息体(payload)

二、MQTT协议实现原理

MQTT 客户端需要连接到代理后立即发布消息,然后订阅者从里面订阅数据,这里涉及到六个部分:CONNECTPublishSubscribeUnsubscribeSUBACKUnsuback

MQTT连接

客户端向代理发送CONNECT消息。代理响应一个CONNACK消息和一个状态码。连接建立后,代理将保持连接打开,直到客户端发送断开连接命令或连接断开

CONNECT消息主要包含以下内容:

  • ClientId:代理使用ClientId来标识客户端和客户端当前状态,对于每个客户端和代理ClientId是唯一的
  • Clean Session:标志告诉代理客户端是否想要建立一个持久会话。如果为false代理会存储客户端的所有订阅以及使用服务质量(QoS)级别1或2进行订阅的客户端的所有错过的消息。如果为true代理不为客户端存储任何内容,并清除以前任何持久会话中的所有信息
  • Username/Password:用户名和密码用于客户端身份验证和授权。强烈建议用户名和密码与安全传输使用SSL证书验证客户端,因此不需要用户名和密码
  • Will Message:遗嘱,当客户端断开连接时,此消息通知其他客户端
  • KeepAlive:客户端指定并在连接建立时与代理通信。这个间隔定义了代理和客户端在不发送消息的情况下可以忍受的最长时间
  • LWT字段:包含lastWillTopic、lastWillMessage、lastWillRetain、lastWillQos
    这个字段可以帮助了解客户端是正常断开连接(使用 MQTT 断开连接消息)还是不正常断开连接(没有断开连接消息),检测到客户端已不正常地断开连接。为了响应不正常的断开连接,代理将最后一个将消息发送到最后一个将消息主题的所有订阅客户端。如果客户端使用正确的断开连接消息正常断开连接,那么代理将丢弃存储的 LWT 消息

代理收到 CONNECT 消息时,返回连接确认标志

MQTT消息发布

每条消息都必须包含一个主题,代理可以使用该主题将消息转发给感兴趣的客户端

Publish消息包含以下内容:

  • packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息。数据包标识符仅与大于零的 QoS 级别相关
  • topicName:主题名称,主题区分大小写主题格式就像URL:deviceName/1638791867
  1. +:表示任意匹配某一级主题,例如deviceName/+/weaved可以匹配deviceName/1638791867/weaved,但是无法匹配deviceName/1638791867/weaving
  2. 表示匹配多级,例如deviceName/可以匹配deviceName/1638791867/weaved
  3. $:是为 MQTT 代理的内部统计信息保留的,客户端无法向这些主题发布消息
  • QOS:服务级别质量,有3 个 QoS 级别
  1. 最多一次 (0)
    只会传输一次,不能保证对方一定会收到

  2. 至少一次 (1)常用
    至少保证对方能够收到一次消息,获得接收方发来的 PUBACK数据包,如果发送方在合理的时间内未收到 PUBACK 数据包,则发送方将重新发送 PUBLISH 数据包

  3. 正好一次 (2)
    QoS 2 是最安全、最慢的服务质量级别,由发送方和接收方之间的至少两个请求/响应流(四部分握手)提供。
    (1)、当接收方从发送方获取 QoS 2 PUBLISH 数据包时,它会相应地处理发布消息,并使用确认 PUBLISH 数据包的PUBREC 数据包回复发送方。如果发送方未从接收方获取 PUBREC 数据包,它将再次发送带有重复 (DUP) 标志的 PUBLISH 数据包,直到收到确认。
    (2)、接收方收到 PUBREC 数据包,发送方就可以安全地丢弃初始 PUBLISH 数据包。
    (3)、发送方存储来自接收方的 PUBREC 数据包,并使用PUBREL数据包进行响应
    (4)、接收方获得 PUBREL 数据包后,它可以丢弃所有存储的状态并使用PUBCOMP数据包进行应答

  • 如果数据包在此过程中丢失,发件人负责在合理的时间内重新传输消息
  • retainFlag:消息是否由代理保存为指定主题的最后一个已知正确值。当新客户端订阅某个主题时,它们会收到保留在该主题上的最后一条消息
    保留的消息可帮助新订阅的客户端在订阅主题后立即获取状态更新,而不需要等到客户端下一次推送消息。保留的消息消除了等待发布客户端发送下一个更新的时间
  • payload:消息的实际内容包含图像,任何编码的文本,加密数据以及二进制的数据
  • dupFlag:标志指示邮件是重复的,这个重复发送跟QoS大于0的时候有关

客户端将消息发送到 MQTT代理进行发布时,代理将读取消息,确认消息(根据 QoS 级别),并处理消息。代理的处理包括确定哪些客户端订阅了主题并向它们发送消息

MQTT订阅机制

MQTT客户端发送了消息。如果没人接收消息将毫无意义,所以也会有客户端来订阅消息,客户端会向 MQTT 代理发送一条 SUBSCRIBE消息

Subscribe消息包含以下内容:

  • packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息。数据包标识符仅与大于零的 QoS 级别相关
  • 订阅列表:一个 SUBSCRIBE 消息可以包含一个客户端的多个订阅,每个订阅都由一个主题和一个 QoS 级别组成

MQTT订阅确认

为了确认每个订阅,代理向客户端发送 SUBACK确认消息

SUBACK消息包含以下内容:

  • packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息
  • rerurnCode:每订阅一个主题发送一个返回代码
返回代码 返回代码响应
0 成功 - 最大 QoS 0
1 成功 - 最大 QoS 1
2 成功 - 最大 QoS 2
128 失败

客户端成功发送 SUBSCRIBE 消息并接收 SUBACK 消息后,它将获取与 SUBSCRIBE 消息包含的订阅中的主题匹配的每个已发布消息

MQTT取消订阅

消息可以订阅那么也可以取消订阅,会删除代理上客户端的现有预订

Unsubscribe消息包含以下内容:

  • packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息
  • List of Topic(主题列表):主题列表可以包含多个客户要取消订阅的主题。只需发送主题

MQTT确认取消订阅

要确认取消订阅,代理会向客户端发送 Unsuback确认消息

Unsuback消息包含以下内容:

  • packetID:数据包标识符在消息在客户端和代理之间流动时唯一标识消息,这与取消订阅消息中的数据包标识符相同

三、MQTT基本功能

持久会话

客户端需要连接到代理并且订阅主题,但是客户端和代理之间如果连接在非持久会话中中断,那么主题会丢失,需要在重新连接时再次订阅。为了避免这个问题可以使用持久会话功能,它主要是在代理中存储了:

  • 客户端的会话以及订阅
  • QOS为1和2中没有确认的消息
  • 客户端在断联时候错过的消息
  • 客户端接收到的所有尚未完全确认的 QoS 2 消息

为了开启代理上的持久会话,在MQTT客户端连接到代理服务器的时候有个cleanSession字段设置为false表示开启持久会话,所有信息和消息都将保留,代理存储会话,直到客户端重新联机并收到消息,如果长时间不联机,那么会消耗内存

客户端上的持久会话,当客户端请求服务器保存会话数据时,客户端负责存储以下信息:

  • QoS 1 或 2 流中尚未由代理确认的所有消息
  • 从代理接收到的所有尚未完全确认的 QoS 2 消息

四、MQTT Demo

搭建MQTT服务器

官方文档:产品概览 | EMQX 文档

EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。

Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。

MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。

EMQX 设计目标是实现高可靠,并支持承载海量物联网终端的 MQTT 连接,支持在海量物联网设备间低延时消息路由:

  1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持 200 万连接。
  2. 分布式节点集群,快速低延时的消息路由。
  3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
  4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持

使用Docker安装EMQX

1、获取Docker镜像

docker pull emqx/emqx:4.4.3

2、启动Docker

docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:4.4.3

3、访问Web管理控制台

控制台地址: http://XXXXXX:18083,默认用户: admin,密码:public

各个服务端口说明:

1883:MQTT 协议端口

8883:MQTT/SSL 端口

8083:MQTT/WebSocket 端口

8080:HTTP API 端口

18083:Dashboard 管理控制台端口

搭建MQTT消息推送客户端

引入相关依赖包

<dependencies>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <optional>true</optional>
        </dependency>
    </dependencies>

MQTT客户端

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
 * 消息推送客户端
 *
 * @author yanglingcong
 */
@Slf4j
@Component
public class MyMqttClient {
    private final static int QOS_1 = 1;
    private final static String USER_NAME = "ylc";
    private final static int PASSWORLD = 123456;
    private final static int KEEP_ALIVE = 60;
    /**
     * 连接地址
     * */
    public static final String HOST = "tcp://XXXXX:1883";
    /**
    * 订阅主题
    * */
    public static final String TOPIC = "deviceName/";
    //客户端唯一ID
    private static final String clientid = "pubClient";
    public static void main(String[] args) {
        MqttClient mqtt = createMqtt();
        publishMessage("Hello", TOPIC, mqtt);
    }
    public static MqttClient createMqtt() {
        MqttClient client = null;
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        //断开之后自动重联
        connectOptions.setAutomaticReconnect(true);
        //设置会话心跳时间 代理和客户端在不发送消息的情况下可以忍受的最长时间
        connectOptions.setKeepAliveInterval(KEEP_ALIVE);
        //不建立持久会话
        connectOptions.setCleanSession(true);
        //用户名
        connectOptions.setUserName(USER_NAME);
        //密码
        connectOptions.setPassword(String.valueOf(PASSWORLD).toCharArray());
        try {
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            //MQTT连接
            client.connect(connectOptions);
            //消息回调
            client.setCallback(new MqttCallBackHandle(client));
        } catch (MqttException e) {
            log.warn("MQTT消息异常{}", e);
        }
        return client;
    }
    /**
     * 消息推送
     *
     * @param message 消息内容
     * @param topic   发送的主题
     * @author yanglingcong
     * @date 2022/4/18 21:25
     */
    public static void publishMessage(String message, String topic, MqttClient mqttClient) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(QOS_1);
        //保留在该主题上的最后一条消息
        //mqttMessage.setRetained(true);
        mqttMessage.setPayload(message.getBytes());
        try {
            mqttClient.publish(topic, mqttMessage);
            log.info("MQTT消息发送成功:{}", message);
        } catch (MqttException e) {
            log.warn("MQTT消息推送失败");
            e.printStackTrace();
        }
    }
}

MQTT回调接口

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttClient;
/**
 * MQTT消息回调方法
 */
@Slf4j
public class MqttCallBackHandle implements MqttCallbackExtended {
    private MqttClient client;
    public  MqttCallBackHandle(MqttClient client){
        this.client=client;
    }
    //订阅主题
    private final static String CMD_TOP_FORMAT = "deviceName/";
    /**
     * 连接成功后调用该方法
     * @param reconnect
     * @param serverURI
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        try {
            //重新订阅主题
            client.subscribe(CMD_TOP_FORMAT);
            log.info("=====MQTT重联成功=====");
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /** 
     * 断开连接后回调方法
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("=====MQTT连接断开=====");
    }
    /**
     * 接收订阅到的消息
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("=====MQTT消息订阅成功=====");
        log.info("主题:{},内容:{}",topic,message);
    }
    /**
     * 发送完成
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("=====MQTT消息发送完毕=====");
    }
}

搭建MQTT消息订阅客户端

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
 * 消息订阅客户端
 *
 * @author yanglingcong*/
@Component
@Slf4j
public class MyMqttSubClient {
    private final static int QOS_1 = 1;
    private final static String USER_NAME = "ylc";
    private final static int PASSWORLD = 123456;
    private final static int KEEP_ALIVE = 60;
    //连接地址
    public static final String HOST = "tcp://xxxx:1883";
    // 订阅主题
    public static final String TOPIC = "deviceName/";
    //客户端唯一ID
    private static final String clientid = "subClient";
    public static void main(String[] args) {
        subscribe();
    }
    public MyMqttSubClient() throws MqttException {
        //订阅
        subscribe();
    }
    public  static void subscribe()  {
        MqttClient client=null;
        MqttConnectOptions connectOptions=new MqttConnectOptions();
        //断开之后自动重联
        connectOptions.setAutomaticReconnect(true);
        //设置会话心跳时间 代理和客户端在不发送消息的情况下可以忍受的最长时间
        connectOptions.setKeepAliveInterval(KEEP_ALIVE);
        //不建立持久会话
        connectOptions.setCleanSession(true);
        //用户名
        connectOptions.setUserName(USER_NAME);
        //密码
        connectOptions.setPassword(String.valueOf(PASSWORLD).toCharArray());
        try {
            client=new MqttClient(HOST,clientid, new MemoryPersistence());
            //MQTT连接
            client.connect(connectOptions);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        //消息回调
        client.setCallback(new MqttCallBackHandle(client));
        try {
            client.subscribe(TOPIC,QOS_1);
        } catch (MqttException e) {
            log.warn("MQTT消息订阅异常{}",e);
            e.printStackTrace();
        }
    }
}

环境测试

1、MQTT客户端pubClient向服务器推送消息

2、MQTT客户端subClient从服务器订阅消息

3、踢除客户端,会自动重联,因为设置了MQTT断开自动重联

五、MQTT常见问题

MQTT消息持久化

如果 cleanSession 设为true,一旦掉线客户端不会存储任何内容,并清除以前任何持久会话中的所有信息

如果 cleanSession 设为false,重连后可以接收之前订阅主题的消息,还有离线时期未接收的消息

MQTT订阅恢复机制

MQTT掉线设置自动重联之后,无法再进行订阅。MqttCallbackExtended接口有一个connectComplete方法用于重新订阅主题

MQTT和消息队列的区别

  • 消息队列可以存储消息,直到被消费为止
  • 消息队列只能被消费处理一次,不像MQTT订阅的人都可以收到消息
  • 消息队列需要先创建队列,MQTT可以使用时候创建
  • MQTT是一种通信协议,MQ是消息通道
  • MQTT面向海量设备连接、MQ是面向海量数据
相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
3月前
|
机器学习/深度学习 自然语言处理 物联网
深度学习入门:从理论到实践新技术趋势与应用:探讨新兴技术如区块链、物联网、虚拟现实等的发展趋势和应用场景
【8月更文挑战第30天】本文将介绍深度学习的基本原理和实践应用。我们将从深度学习的定义、历史和发展开始,然后深入探讨其工作原理和关键技术。接着,我们将通过一个简单的代码示例来展示如何实现深度学习模型。最后,我们将讨论深度学习在现实世界中的应用和挑战。无论你是初学者还是有经验的开发者,这篇文章都将为你提供深度学习的全面理解。
|
28天前
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
51 5
|
1月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
|
2月前
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【9月更文挑战第3天】物联网(IoT)的兴起催生了多种通信协议,如MQTT、CoAP、RESTful/HTTP和XMPP,各自适用于不同场景。本文将对比这些协议的特点、优缺点,并提供示例代码。MQTT轻量级且支持QoS,适合大规模部署;CoAP基于UDP,适用于低功耗网络;RESTful/HTTP易于集成但不适合资源受限设备;XMPP支持双向通信,适合复杂交互应用。通过本文,开发者可更好地选择合适的物联网通信协议。
40 2
|
3月前
|
消息中间件 存储 Java
分享一下rocketmq入门小知识
分享一下rocketmq入门小知识
51 0
分享一下rocketmq入门小知识
|
3月前
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【8月更文挑战第14天】本文概览了MQTT、CoAP、RESTful/HTTP及XMPP四种物联网通信协议。MQTT采用发布/订阅模式,轻量高效;CoAP针对资源受限设备,基于UDP,低延迟;RESTful/HTTP易于集成现有Web基础设施;XMPP支持双向通信,扩展性强。每种协议均附有示例代码,助您根据不同场景和设备特性作出最佳选择。
39 5
|
3月前
|
网络协议 物联网 测试技术
App Inventor 2 MQTT拓展入门(保姆级教程)
本文演示的是App和一个测试客户端进行消息交互的案例,实际应用中,我们的测试客户端可以看着是任意的、支持MQTT协议的硬件,通过订阅及发布消息,联网硬件与我们的App进行双向数据通信,以实现万物互联的智能控制效果。
200 2
|
3月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
118 0
|
3月前
|
物联网 网络性能优化 Python
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
88 0
|
4月前
|
消息中间件 物联网 API
消息队列 MQ使用问题之如何在物联网项目中搭配使用 MQTT、AMQP 与 RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 物联网平台