物联网消息队列客户端-MQTT-基本功能实现

简介: 物联网消息队列客户端-MQTT-基本功能实现

1. 主要实现功能

  • 自动配置
  • 消息自动解析
  • 消息分组共享订阅
  • 消息不分组共享订阅
  • 消息排它订阅
  • 延时发布
  • 多数据源

4. 快速开始

4.1 引入依赖

    <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

4.2 配置

spring:
  mqtt:
    emq:
      client:
        # 多数据源客户端名称,默认default
        default:
          # broker地址
          host: 127.0.0.1
          # 端口
          port: 31883
          # 用户名
          username: admin
          # 密码
          password: 123456

更多配置如下

spring:
  mqtt:
    emq:
      client:
        # 多数据源客户端名称,默认default
        default:
          # broker地址
          host: 127.0.0.1
          # 端口
          port: 31883
          # 用户名
          username: admin
          # 密码
          password: 123456
          # 客户端标识,需保持全局唯一
          client-id: parking_server
          # 是否清除session
          clean-session: false
          # 连接超时时间,单位秒
          connection-timeout: 10
          # 心跳间隔时间,单位秒
          keep-alive-interval: 10
          # 全局消息质量
          global-qos: 1
          # 重新连接之间等待的最长时间
          maxReconnect-delay: 128000
          # 是否自动重新连接
          automatic-reconnect: true
          # 最大消息并发数量,超过此数量并发时可能会丢消息
          maxInflight: 1000

4.3 开启自动配置

在启动类上增加@EnableRabbitMqAutoConfiguration注解

import com.demo.mqttclient.anno.EnableEmqAutoConfiguration;
@SpringBootApplication
@EnableEmqAutoConfiguration
public class TestApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
}

4.4 发布消息

在生产者的业务程序中,注入MQTTClient

import com.demo.mqttclient.MQTTClient;
@Resource
private MQTTClient defaultMQTTClient;

为了兼容第三方及优化内部使用逻辑,所以内置提供了两种消息发送方式。

4.4.1 第三方消息发送

public String publishHeartbeatReply() {
    HeartbeatReplyMessage heartbeatReplyMessage = new HeartbeatReplyMessage();
    heartbeatReplyMessage.setCmd(32896);
    heartbeatReplyMessage.setExpire(1605252875L);
    heartbeatReplyMessage.setDevid("095437323930030130523933");
    heartbeatReplyMessage.setServer_time("1605252875");
    defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage);
    return "success";
}

4.4.2 内部消息发送

消息实体实现Message接口

package com.example.test.message;
import com.demo.mqttclient.MQTTMessage;
import lombok.Data;
import java.util.UUID;
@Data
public class demoMessage implements MQTTMessage {
    private String msgId = UUID.randomUUID().toString();
    private String name;
    private String gender;
    @Override
    public String getMsgId() {
        return this.msgId;
    }
}

然后直接调用该类的publish方法发送即可

@GetMapping("demo/publish")
public String demoPublish() {
    demoMessage demoMessage = new demoMessage();
    demoMessage.setName("点都");
    demoMessage.setGender("xx");
    defaultMQTTClient.publish("demo/topic", demoMessage);
    return "success";
}

其中存在多个重载的方法。

package com.demo.mqttclient;
import com.demo.mqttclient.enums.ShareModelEnum;
import com.demo.plugin.core.lang.json.JSONUtil;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * mqtt客户端
 *
 * @author zhangliuyang
 * @date 2022/07/18
 * @since 1.0.0
 */
public interface MQTTClient {
    /**
     * 启动客户端
     */
    void start();
    /**
     * 关闭客户端
     */
    void close();
    /**
     * 发布
     *
     * @param topic   主题
     * @param message 消息
     */
    default <T extends MQTTMessage> void publish(String topic, T message) {
        this.publish(topic, message, 1);
    }
    /**
     * 发布
     *
     * @param topic   主题
     * @param message 消息
     * @param qos     消息质量
     */
    default <T extends MQTTMessage> void publish(String topic, T message, int qos) {
        this.publish(topic, message, qos, 0);
    }
    /**
     * 发布
     *
     * @param topic   主题
     * @param message 消息
     * @param qos     消息质量
     * @param delay   延迟时间[unit:s, max:4294967s, condition: > 0]
     */
    default <T extends MQTTMessage> void publish(String topic, T message, int qos, long delay) {
        this.publish(topic, message, qos, delay, false);
    }
    /**
     * 发布
     *
     * @param topic    主题
     * @param message  消息
     * @param qos      消息质量
     * @param delay    延迟时间[unit:s, max:4294967s, condition: > 0]
     * @param retained 保留消息
     */
    default <T extends MQTTMessage> void publish(String topic, T message, int qos, long delay, boolean retained) {
        MQTTMessageContext mqttMessageContext = new MQTTMessageContext();
        mqttMessageContext.setId(message.getMsgId());
        mqttMessageContext.setPayload(JSONUtil.write(message));
        mqttMessageContext.setQos(qos);
        mqttMessageContext.setDelay(delay);
        mqttMessageContext.setRetained(retained);
        mqttMessageContext.setTimestamp(System.currentTimeMillis());
        this.publish(topic, mqttMessageContext);
    }
    /**
     * 发布
     *
     * @param topic          主题
     * @param messageContext 消息上下文
     */
    void publish(String topic, MQTTMessageContext messageContext);
    /**
     * 发送到第三方
     *
     * @param topic   主题
     * @param message 消息
     */
    default void publish2ThirdParty(String topic, Object message) {
        this.publish2ThirdParty(topic, 1, message);
    }
    /**
     * 发送到第三方
     *
     * @param topic   主题
     * @param qos     消息质量
     * @param message 消息
     */
    default void publish2ThirdParty(String topic, int qos, Object message) {
        this.publish2ThirdParty(topic, qos, message, Constant.DEFAULT_CHARSET.name());
    }
    /**
     * 发送到第三方
     *
     * @param topic       主题
     * @param qos         消息质量
     * @param message     消息
     * @param charsetName 字符集名称
     */
    default void publish2ThirdParty(String topic, int qos, Object message, String charsetName) {
        this.publish2ThirdParty(topic, qos, message, charsetName, 0);
    }
    /**
     * publish2第三方
     *
     * @param topic       主题
     * @param qos         qos
     * @param message     消息
     * @param charsetName 字符集名称
     * @param delay       延迟时间
     */
    default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay) {
        this.publish2ThirdParty(topic, qos, message, charsetName, delay, false);
    }
    /**
     * publish2第三方
     *
     * @param topic       主题
     * @param qos         qos
     * @param message     消息
     * @param charsetName 字符集名称
     * @param delay       延迟时间
     * @param retained    是否保留消息
     */
    default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay, boolean retained) {
        this.publish2ThirdParty(topic, qos, JSONUtil.toString(message).getBytes(charsetName), delay, retained);
    }
    /**
     * 发送到第三方
     *
     * @param topic   主题
     * @param payload 有效载荷
     */
    default void publish2ThirdParty(String topic, byte[] payload) {
        this.publish2ThirdParty(topic, 1, payload, 0);
    }
    /**
     * 发送到第三方
     *
     * @param topic   主题
     * @param qos     消息质量
     * @param payload 有效载荷
     */
    default void publish2ThirdParty(String topic, int qos, byte[] payload) {
        this.publish2ThirdParty(topic, qos, payload, 0);
    }
    /**
     * 发送到第三方
     *
     * @param topic   主题
     * @param qos     消息质量
     * @param payload 有效载荷
     * @param delay   延迟时间
     */
    default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay) {
        this.publish2ThirdParty(topic, qos, payload, delay, false);
    }
    /**
     * 发送到第三方
     *
     * @param topic    主题
     * @param qos      消息质量
     * @param payload  有效载荷
     * @param delay    延迟时间
     * @param retained 是否保留消息
     */
    default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay, boolean retained) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(payload);
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        this.publish2ThirdParty(topic, delay, mqttMessage);
    }
    /**
     * 发送到第三方
     *
     * @param topic       主题
     * @param mqttMessage mqtt消息
     * @param delay       延迟时间
     */
    void publish2ThirdParty(String topic, long delay, MqttMessage mqttMessage);
    /**
     * 订阅
     *
     * @param topic          主题
     * @param qos            消息质量
     * @param shareModel     共享模型
     * @param groupName      分组名称
     * @param exclusive      排它
     * @param messageHandler 消息处理程序
     */
    void subscribe(String topic, int qos, ShareModelEnum shareModel, String groupName, boolean exclusive, MessageHandler messageHandler);
}

4.5 接收消息

消费者需要在消息处理类上添加@MQTTSubscriber(topics = {"npt/park/type1/server/10010"})注解,指定要监听topics和客户端名称即可。如果没有显示的指定客户端名称,则使用defaultMQTTClient,使用qos执行订阅消息质量。


当消息处理类中有多个public方法时,需要@MQTTConsumerMethod标记具体消费方法

package com.example.test.handler;
import com.demo.mqttclient.MessageHandler;
import com.demo.mqttclient.anno.MQTTConsumerMethod;
import com.demo.mqttclient.anno.MQTTSubscriber;
import com.demo.mqttclient.enums.ShareModelEnum;
import com.demo.plugin.core.lang.json.JSONUtil;
import com.example.test.message.DeviceStartMessage;
import com.example.test.message.PlateRecognitionReportMessage;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Map;
@Slf4j
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE)
public class ParkingMessageHandler {
    public static final String TOPIC = "npt/park/type1/server/10010";
    public static final String CMD_KEY = "cmd";
    @MQTTConsumerMethod
    public void handle(Map<String, Object> message) {
        log.info("handle:{}", message);
        if (message.containsKey(CMD_KEY)) {
            Integer cmd = (Integer) message.get(CMD_KEY);
            switch (cmd) {
                case 129:
                    handleDeviceStartMessage(message);
                    break;
                case 140:
                    handlePlateRecognitionReportMessage(message);
                    break;
                default:
                    log.warn("不支持此cmd:[{}]", cmd);
                    break;
            }
        } else {
            log.warn("消息消费异常");
        }
    }
    private void handleDeviceStartMessage(Map<String, Object> message) {
        DeviceStartMessage deviceStartMessage = JSONUtil.toObject(JSONUtil.toString(message), DeviceStartMessage.class);
        log.info("接收到设备启动消息:{}", deviceStartMessage);
    }
    private void handlePlateRecognitionReportMessage(Map<String, Object> message) {
        PlateRecognitionReportMessage plateRecognitionReportMessage = JSONUtil.toObject(JSONUtil.toString(message), PlateRecognitionReportMessage.class);
        log.info("接收到车牌上报识别消息:{}", plateRecognitionReportMessage);
    }
}

4.6 发送延迟消息

要发送延迟消息,需要先开启emq延迟发布配置。

发送延时消息的方式相比之前,仅仅增加一个延时时间。其中延时时长的单位为,最大为4294967

//发送一个延时时长为10s的消息
defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage, 10);

4.7 多数据源

多数据源与单数据源配置属性相同,在配置文件中声明即可

spring:
  mqtt:
    emq:
      client:
        # 多数据源客户端名称,默认default
        default:
          # broker地址
          host: 127.0.0.1
          # 端口
          port: 31883
          # 用户名
          username: admin
          # 密码
          password: 123456
        # 多数据源客户端名称
        parking:
          # broker地址
          host: 127.0.0.1
          # 端口
          port: 31883
          # 用户名
          username: admin
          # 密码
          password: 123456

4.7.1 发布消息

首先注入MQTTClient,与单数据源的唯一区别就是bean的名称。默认向Spring容器中添加的实现类名称为“${数据源名称}MQTTClient”

以上面的配置文件为例,默认的bean名称为 defaultMQTTClientbillMQTTClient

import com.demo.mqttclient.MQTTClient;
@Resource
private MQTTClient defaultMQTTClient;
@Resource
private MQTTClient billMQTTClient;

其他操作同单数据源

4.7.2 接收消息

接收消息与单数据源基本一致,唯一的区别是在@MQTTSubscriber中指定clientName属性,指定当前从哪个数据源进行消费。

import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = {"npt/park/type1/server/10010"}, clientName = "parking")
public class ParkingMessageHandler {}

4.8 分组共享订阅

系统默认使用spring.application.name作为分组名称,用户可在消息消费类上指定@MQTTSubscriber属性中groupName = "group_name"即可

import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE, groupName = "group_name")
public class ParkingMessageHandler {}

4.9 不分组共享订阅

只需要在消息消费类上指定@MQTTSubscriber属性中share = ShareModelEnum.UN_GROUP_SHARE即可。

import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.UN_GROUP_SHARE)
public class ParkingMessageHandler {}

4.10 排它订阅

只需要在消息消费类上指定@MQTTSubscriber属性中exclusive = true即可,开启排它订阅时,默认关闭共享订阅。

import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, exclusive = true)
public class ParkingMessageHandler {}
相关实践学习
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
7月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
5月前
|
消息中间件 安全 物联网
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
易易互联科技有限公司是吉利集团旗下专注于换电生态的全资子公司,致力于打造安全、便捷、便宜的智能换电网络。公司依托吉利GBRC换电平台,基于电池共享与车辆全生命周期运营,已布局超470座换电站,覆盖40多个城市,计划2027年达2000座。面对海量设备高并发连接、高实时性要求及数据洪峰挑战,易易互联采用阿里云MQTT与RocketMQ构建高效物联网通信架构,实现稳定接入、低延迟通信与弹性处理,全面支撑其全国换电网络规模化运营与智能化升级。
370 1
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
|
5月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
367 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
11月前
|
消息中间件 存储 数据采集
4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)
本文介绍了基于状态机驱动的MQTT客户端快速接入OneNet平台的实现方法,通过4步完成模块设计。文章以开源项目`Sparrow`为基础,引入`OneNetMqtt`业务模块,采用事件驱动模型和双层状态机设计,实现设备状态管理、消息处理及定时任务等功能。模块分为三层:`OneNetManager`负责核心逻辑,`OneNetDevice`管理设备信息,`OneNetDriver`处理Socket与MQTT通信。验证结果显示设备连接、数据上报及下线功能正常,稳定性良好。该设计简化了复杂条件判断,增强了系统灵活性与可扩展性,适用于实际项目参考。文末提供源码获取方式,助力读者实践与学习。
663 104
|
9月前
|
物联网
(手把手)在华为云、阿里云搭建自己的物联网MQTT消息服务器,免费IOT平台
本文介绍如何在阿里云搭建自己的物联网MQTT消息服务器,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
2978 42
|
9月前
|
物联网
如何在腾讯云等平台搭建自己的物联网MQTT服务器Broker
物联网技术及MQTT协议被广泛应用于各种场景。本文介绍物联网MQTT服务助手下载,如何搭建自己的物联网平台,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
697 37
|
9月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
302 32
|
11月前
|
监控 物联网 网络性能优化
【杂谈】-MQTT与HTTP在物联网中的比较:为什么MQTT是更好的选择
通过上述分析,可以看出MQTT在物联网应用中的确是更好的选择。其高效的通信模型、低带宽消耗、稳定的连接保持机制以及可靠的消息质量保证,使其在各种物联网场景中都能表现出色。开发者在设计和实现物联网系统时,应优先考虑采用MQTT协议,以充分发挥其在资源受限环境下的优势,提升系统的整体性能和可靠性。
2224 26
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ