适合的读者,略微了解SpringBoot、消息队列的朋友,想了解和尝试使用Spring Integration框架,想扩展知识边界。
前言
后文案例代码:GitHub 代码 github.com/ningzaichun…
MQTT我想大部分朋友应该都是知道的,即使没有使用过MQTT,肯定也使用过它的兄弟们,RabbitMQ、RocketMQ和Kafka等消息队列.
但这次的主角并非是MQTT,而是 Spring Integration
,如果是没有怎么注意Spring官网的朋友,可能甚至都没咋听过 Spring Integration
框架,它是针对类似信息流的一个上层抽象,不只是MQTT,比如AMQP、MAIL都支持,贴一张官网的截图,诸如下列是都支持的。
正如Spring的一贯风格,比如以前刚学Spring 的时候,肯定是学过Spring Data,知道它就是针对数据库的一系列抽象。Spring Integration 也是如此,不过抽象的对象换成了信息流罢啦。
本篇文章更多的是起一个抛转引玉的作用,虽将大致内容都涵盖在内了,但部分代码的细节,是有欠考虑的,写在前文中,还望各位见谅。
本文大纲如下:
一、SpringBoot常规方式集成MQTT
先抛开 Spring Integration 不管,我们先看看常规的集成方式是什么的,后面再讲一讲Spring Integration 有哪些优点。
1.1、Docker 安装 EMQX
为了快捷,我并没有做多余的设置,直接粘贴,即可在Docker环境下,运行一个 EMQX 服务器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.1.3
详细可参考 EMQX Docker 部署指南
1.2、常规方式集成 EMQX
可能一些刚使用 SpringBoot 集成 EMQX(MQTT协议的服务实现)的朋友,大部分都是使用下面所阐述的方式进行整合的(在网上冲浪找的)。
偷了个小懒,下文代码示例来源:spring boot + mqtt 物联网开发
@Slf4j public class MqttPushClient { private static MqttClient client; public static MqttClient getClient() { return client; } public static void setClient(MqttClient client) { MqttPushClient.client = client; } private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) { // MQTT连接设置 MqttConnectOptions option = new MqttConnectOptions(); // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接 option.setCleanSession(false); // 设置连接的用户名 option.setUserName(userName); // 设置连接的密码 option.setPassword(password.toCharArray()); // 设置超时时间 单位为秒 option.setConnectionTimeout(outTime); // 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 option.setKeepAliveInterval(KeepAlive); // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 // option.setWill(topic, "close".getBytes(), 2, true); option.setMaxInflight(1000); log.info("================>>>MQTT连接认证成功<<======================"); return option; } /** * 连接 */ public void connect(MqttConfig mqttConfig) { MqttClient client; try { String clientId = mqttConfig.getClientId(); clientId += System.currentTimeMillis(); client = new MqttClient(mqttConfig.getUrl(), clientId, new MemoryPersistence()); MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(), mqttConfig.getTimeout(), mqttConfig.getKeepAlive()); MqttPushClient.setClient(client); try { client.setCallback(new PushCallback<Object>(this, mqttConfig)); if (!client.isConnected()) { client.connect(options); log.info("================>>>MQTT连接成功<<======================"); //订阅主题 subscribe(mqttConfig.getTopic(), mqttConfig.getQos()); } else {// 这里的逻辑是如果连接不成功就重新连接 client.disconnect(); client.connect(options); log.info("===================>>>MQTT断连成功<<<======================"); } } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 断线重连 * * @throws Exception */ public Boolean reConnect() throws Exception { Boolean isConnected = false; if (null != client) { client.connect(); if (client.isConnected()) { isConnected = true; } } return isConnected; } /** * 发布,默认qos为0,非持久化 * * @param topic * @param pushMessage */ public void publish(String topic, String pushMessage) { publish(0, false, topic, pushMessage); } /** * 发布 * * @param qos * @param retained * @param topic * @param pushMessage */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == mTopic) { log.error("===============>>>MQTT topic 不存在<<======================="); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费) * * @param topic * @param pushMessage */ public void publish(int qos, String topic, String pushMessage) { publish(qos, false, topic, pushMessage); } /** * 订阅某个主题,qos默认为0 * * @param topic */ public void subscribe(String[] topic) { subscribe(topic, null); } /** * 订阅某个主题 * * @param topic * @param qos */ public void subscribe(String[] topic, int[] qos) { try { MqttPushClient.getClient().unsubscribe(topic); MqttPushClient.getClient().subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
@Component(value = "mqttSender") @Slf4j public class MqttSender { @Async public void send(String queueName, String msg) { log.debug("=====================>>>>发送主题:{}, msg:{}", queueName,msg); publish(2, queueName, msg); } /** * 发布,默认qos为0,非持久化 * * @param topic * @param pushMessage */ public void publish(String topic, String pushMessage) { publish(1, false, topic, pushMessage); } /** * 发布 * * @param qos * @param retained * @param topic * @param pushMessage */ public void publish(int qos, boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic); if (null == mTopic) { log.error("===================>>>MQTT topic 不存在<<================="); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { log.error("============>>>publish fail", e); e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费) * * @param topic * @param pushMessage */ public void publish(int qos, String topic, String pushMessage) { publish(qos, false, topic, pushMessage); } }
@Slf4j @Component public class PushCallback<component> implements MqttCallback { private MqttPushClient client; private MqttConfig mqttConfiguration; @Resource MqttService mqttService; public PushCallback(MqttPushClient client, MqttConfig mqttConfiguration) { this.client = client; this.mqttConfiguration = mqttConfiguration; } @Override public void connectionLost(Throwable cause) { /** 连接丢失后,一般在这里面进行重连 **/ if (client != null) { while (true) { try { log.info("==============》》》[MQTT] 连接丢失,尝试重连..."); MqttPushClient mqttPushClient = new MqttPushClient(); mqttPushClient.connect(mqttConfiguration); if (MqttPushClient.getClient().isConnected()) { log.info("=============>>重连成功"); } break; } catch (Exception e) { log.error("=============>>>[MQTT] 连接断开,重连失败!<<============="); continue; } } } log.info(cause.getMessage()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // publish后会执行到这里 log.info("pushComplete==============>>>" + token.isComplete()); } /** * 监听对应的主题消息 * * @param topic * @param message * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 log.info("============》》接收消息主题 : " + topic); log.info("============》》接收消息Qos : " + message.getQos()); log.info("============》》接收消息内容原始内容 : " + new String(message.getPayload())); log.info("============》》接收消息内容GB2312 : " + new String(message.getPayload(), "GB2312")); log.info("============》》接收消息内容UTF-8 : " + new String(message.getPayload(), "UTF-8")); try { if (topic.equals("datapoint")) { MqttResponseBody mqttResponseBody = JSONUtils.jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseBody.class); MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class); mqttService.messageArrived(mqttResponseBody); } else if (topic.equals("heartbeat")) { MqttResponseHeartbeat mqttResponseHeartbeat = JSONUtils .jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseHeartbeat.class); MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class); mqttService.messageHeartbeat(mqttResponseHeartbeat); } } catch (Exception e) { e.printStackTrace(); log.info("============》》接收消息主题异常 : " + e.getMessage()); } } }
我只抽离了部分代码(主要是订阅、处理订阅消息和发送消息部分),详细的可以点进原文看看。
这种整合方式理解起来是非常简单的,都是直接编码的,没有什么抽象的操作,业务不大的情况下,也是可以正常玩的。
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928