1. 主要实现功能
- 自动配置
- 消息自动解析
- 消息分组共享订阅
- 消息不分组共享订阅
- 消息排它订阅
- 延时发布
- 多数据源
4. 快速开始
4.1 引入依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
4.2 配置
spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456
更多配置如下
spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456 # 客户端标识,需保持全局唯一 client-id: parking_server # 是否清除session clean-session: false # 连接超时时间,单位秒 connection-timeout: 10 # 心跳间隔时间,单位秒 keep-alive-interval: 10 # 全局消息质量 global-qos: 1 # 重新连接之间等待的最长时间 maxReconnect-delay: 128000 # 是否自动重新连接 automatic-reconnect: true # 最大消息并发数量,超过此数量并发时可能会丢消息 maxInflight: 1000
4.3 开启自动配置
在启动类上增加@EnableRabbitMqAutoConfiguration
注解
import com.demo.mqttclient.anno.EnableEmqAutoConfiguration; @SpringBootApplication @EnableEmqAutoConfiguration public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } }
4.4 发布消息
在生产者的业务程序中,注入MQTTClient
import com.demo.mqttclient.MQTTClient; @Resource private MQTTClient defaultMQTTClient;
为了兼容第三方及优化内部使用逻辑,所以内置提供了两种消息发送方式。
4.4.1 第三方消息发送
public String publishHeartbeatReply() { HeartbeatReplyMessage heartbeatReplyMessage = new HeartbeatReplyMessage(); heartbeatReplyMessage.setCmd(32896); heartbeatReplyMessage.setExpire(1605252875L); heartbeatReplyMessage.setDevid("095437323930030130523933"); heartbeatReplyMessage.setServer_time("1605252875"); defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage); return "success"; }
4.4.2 内部消息发送
消息实体实现Message
接口
package com.example.test.message; import com.demo.mqttclient.MQTTMessage; import lombok.Data; import java.util.UUID; @Data public class demoMessage implements MQTTMessage { private String msgId = UUID.randomUUID().toString(); private String name; private String gender; @Override public String getMsgId() { return this.msgId; } }
然后直接调用该类的publish
方法发送即可
@GetMapping("demo/publish") public String demoPublish() { demoMessage demoMessage = new demoMessage(); demoMessage.setName("点都"); demoMessage.setGender("xx"); defaultMQTTClient.publish("demo/topic", demoMessage); return "success"; }
其中存在多个重载的方法。
package com.demo.mqttclient; import com.demo.mqttclient.enums.ShareModelEnum; import com.demo.plugin.core.lang.json.JSONUtil; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * mqtt客户端 * * @author zhangliuyang * @date 2022/07/18 * @since 1.0.0 */ public interface MQTTClient { /** * 启动客户端 */ void start(); /** * 关闭客户端 */ void close(); /** * 发布 * * @param topic 主题 * @param message 消息 */ default <T extends MQTTMessage> void publish(String topic, T message) { this.publish(topic, message, 1); } /** * 发布 * * @param topic 主题 * @param message 消息 * @param qos 消息质量 */ default <T extends MQTTMessage> void publish(String topic, T message, int qos) { this.publish(topic, message, qos, 0); } /** * 发布 * * @param topic 主题 * @param message 消息 * @param qos 消息质量 * @param delay 延迟时间[unit:s, max:4294967s, condition: > 0] */ default <T extends MQTTMessage> void publish(String topic, T message, int qos, long delay) { this.publish(topic, message, qos, delay, false); } /** * 发布 * * @param topic 主题 * @param message 消息 * @param qos 消息质量 * @param delay 延迟时间[unit:s, max:4294967s, condition: > 0] * @param retained 保留消息 */ default <T extends MQTTMessage> void publish(String topic, T message, int qos, long delay, boolean retained) { MQTTMessageContext mqttMessageContext = new MQTTMessageContext(); mqttMessageContext.setId(message.getMsgId()); mqttMessageContext.setPayload(JSONUtil.write(message)); mqttMessageContext.setQos(qos); mqttMessageContext.setDelay(delay); mqttMessageContext.setRetained(retained); mqttMessageContext.setTimestamp(System.currentTimeMillis()); this.publish(topic, mqttMessageContext); } /** * 发布 * * @param topic 主题 * @param messageContext 消息上下文 */ void publish(String topic, MQTTMessageContext messageContext); /** * 发送到第三方 * * @param topic 主题 * @param message 消息 */ default void publish2ThirdParty(String topic, Object message) { this.publish2ThirdParty(topic, 1, message); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param message 消息 */ default void publish2ThirdParty(String topic, int qos, Object message) { this.publish2ThirdParty(topic, qos, message, Constant.DEFAULT_CHARSET.name()); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param message 消息 * @param charsetName 字符集名称 */ default void publish2ThirdParty(String topic, int qos, Object message, String charsetName) { this.publish2ThirdParty(topic, qos, message, charsetName, 0); } /** * publish2第三方 * * @param topic 主题 * @param qos qos * @param message 消息 * @param charsetName 字符集名称 * @param delay 延迟时间 */ default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay) { this.publish2ThirdParty(topic, qos, message, charsetName, delay, false); } /** * publish2第三方 * * @param topic 主题 * @param qos qos * @param message 消息 * @param charsetName 字符集名称 * @param delay 延迟时间 * @param retained 是否保留消息 */ default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay, boolean retained) { this.publish2ThirdParty(topic, qos, JSONUtil.toString(message).getBytes(charsetName), delay, retained); } /** * 发送到第三方 * * @param topic 主题 * @param payload 有效载荷 */ default void publish2ThirdParty(String topic, byte[] payload) { this.publish2ThirdParty(topic, 1, payload, 0); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param payload 有效载荷 */ default void publish2ThirdParty(String topic, int qos, byte[] payload) { this.publish2ThirdParty(topic, qos, payload, 0); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param payload 有效载荷 * @param delay 延迟时间 */ default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay) { this.publish2ThirdParty(topic, qos, payload, delay, false); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param payload 有效载荷 * @param delay 延迟时间 * @param retained 是否保留消息 */ default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay, boolean retained) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(payload); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); this.publish2ThirdParty(topic, delay, mqttMessage); } /** * 发送到第三方 * * @param topic 主题 * @param mqttMessage mqtt消息 * @param delay 延迟时间 */ void publish2ThirdParty(String topic, long delay, MqttMessage mqttMessage); /** * 订阅 * * @param topic 主题 * @param qos 消息质量 * @param shareModel 共享模型 * @param groupName 分组名称 * @param exclusive 排它 * @param messageHandler 消息处理程序 */ void subscribe(String topic, int qos, ShareModelEnum shareModel, String groupName, boolean exclusive, MessageHandler messageHandler); }
4.5 接收消息
消费者需要在消息处理类上添加@MQTTSubscriber(topics = {"npt/park/type1/server/10010"})注解,指定要监听topics和客户端名称即可。如果没有显示的指定客户端名称,则使用defaultMQTTClient,使用qos执行订阅消息质量。
当消息处理类中有多个public方法时,需要@MQTTConsumerMethod标记具体消费方法
package com.example.test.handler; import com.demo.mqttclient.MessageHandler; import com.demo.mqttclient.anno.MQTTConsumerMethod; import com.demo.mqttclient.anno.MQTTSubscriber; import com.demo.mqttclient.enums.ShareModelEnum; import com.demo.plugin.core.lang.json.JSONUtil; import com.example.test.message.DeviceStartMessage; import com.example.test.message.PlateRecognitionReportMessage; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Map; @Slf4j @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE) public class ParkingMessageHandler { public static final String TOPIC = "npt/park/type1/server/10010"; public static final String CMD_KEY = "cmd"; @MQTTConsumerMethod public void handle(Map<String, Object> message) { log.info("handle:{}", message); if (message.containsKey(CMD_KEY)) { Integer cmd = (Integer) message.get(CMD_KEY); switch (cmd) { case 129: handleDeviceStartMessage(message); break; case 140: handlePlateRecognitionReportMessage(message); break; default: log.warn("不支持此cmd:[{}]", cmd); break; } } else { log.warn("消息消费异常"); } } private void handleDeviceStartMessage(Map<String, Object> message) { DeviceStartMessage deviceStartMessage = JSONUtil.toObject(JSONUtil.toString(message), DeviceStartMessage.class); log.info("接收到设备启动消息:{}", deviceStartMessage); } private void handlePlateRecognitionReportMessage(Map<String, Object> message) { PlateRecognitionReportMessage plateRecognitionReportMessage = JSONUtil.toObject(JSONUtil.toString(message), PlateRecognitionReportMessage.class); log.info("接收到车牌上报识别消息:{}", plateRecognitionReportMessage); } }
4.6 发送延迟消息
要发送延迟消息,需要先开启emq
延迟发布配置。
发送延时消息的方式相比之前,仅仅增加一个延时时间。其中延时时长的单位为秒
,最大为4294967
秒
//发送一个延时时长为10s的消息 defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage, 10);
4.7 多数据源
多数据源与单数据源配置属性相同,在配置文件中声明即可
spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456 # 多数据源客户端名称 parking: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456
4.7.1 发布消息
首先注入MQTTClient
,与单数据源的唯一区别就是bean
的名称。默认向Spring
容器中添加的实现类名称为“${数据源名称}MQTTClient”
以上面的配置文件为例,默认的bean
名称为 defaultMQTTClient
和 billMQTTClient
import com.demo.mqttclient.MQTTClient; @Resource private MQTTClient defaultMQTTClient; @Resource private MQTTClient billMQTTClient;
其他操作同单数据源
4.7.2 接收消息
接收消息与单数据源基本一致,唯一的区别是在@MQTTSubscriber
中指定clientName
属性,指定当前从哪个数据源进行消费。
import com.demo.mqttclient.anno.MQTTSubscriber; @MQTTSubscriber(topics = {"npt/park/type1/server/10010"}, clientName = "parking") public class ParkingMessageHandler {}
4.8 分组共享订阅
系统默认使用spring.application.name
作为分组名称,用户可在消息消费类上指定@MQTTSubscriber
属性中groupName = "group_name"
即可
import com.demo.mqttclient.anno.MQTTSubscriber; @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE, groupName = "group_name") public class ParkingMessageHandler {}
4.9 不分组共享订阅
只需要在消息消费类上指定@MQTTSubscriber
属性中share = ShareModelEnum.UN_GROUP_SHARE
即可。
import com.demo.mqttclient.anno.MQTTSubscriber; @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.UN_GROUP_SHARE) public class ParkingMessageHandler {}
4.10 排它订阅
只需要在消息消费类上指定@MQTTSubscriber
属性中exclusive = true
即可,开启排它订阅时,默认关闭共享订阅。
import com.demo.mqttclient.anno.MQTTSubscriber; @MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, exclusive = true) public class ParkingMessageHandler {}