关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

适合的读者,略微了解SpringBoot、消息队列的朋友,想了解和尝试使用Spring Integration框架,想扩展知识边界。

前言

后文案例代码:GitHub 代码 github.com/ningzaichun…

MQTT我想大部分朋友应该都是知道的,即使没有使用过MQTT,肯定也使用过它的兄弟们,RabbitMQ、RocketMQ和Kafka等消息队列.

但这次的主角并非是MQTT,而是 Spring Integration ,如果是没有怎么注意Spring官网的朋友,可能甚至都没咋听过 Spring Integration 框架,它是针对类似信息流的一个上层抽象,不只是MQTT,比如AMQP、MAIL都支持,贴一张官网的截图,诸如下列是都支持的。

image.png

正如Spring的一贯风格,比如以前刚学Spring 的时候,肯定是学过Spring Data,知道它就是针对数据库的一系列抽象。Spring Integration 也是如此,不过抽象的对象换成了信息流罢啦。

本篇文章更多的是起一个抛转引玉的作用,虽将大致内容都涵盖在内了,但部分代码的细节,是有欠考虑的,写在前文中,还望各位见谅。

本文大纲如下:

image.png

一、SpringBoot常规方式集成MQTT

先抛开 Spring Integration 不管,我们先看看常规的集成方式是什么的,后面再讲一讲Spring Integration 有哪些优点。

1.1、Docker 安装 EMQX

为了快捷,我并没有做多余的设置,直接粘贴,即可在Docker环境下,运行一个 EMQX 服务器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.1.3

详细可参考 EMQX Docker 部署指南

1.2、常规方式集成 EMQX

可能一些刚使用 SpringBoot 集成 EMQX(MQTT协议的服务实现)的朋友,大部分都是使用下面所阐述的方式进行整合的(在网上冲浪找的)。

偷了个小懒,下文代码示例来源:spring boot + mqtt 物联网开发

@Slf4j
public class MqttPushClient {
    private static MqttClient client;
    public static MqttClient getClient() {
        return client;
    }
    public static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }
    private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
        // MQTT连接设置
        MqttConnectOptions option = new MqttConnectOptions();
        // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        option.setCleanSession(false);
        // 设置连接的用户名
        option.setUserName(userName);
        // 设置连接的密码
        option.setPassword(password.toCharArray());
        // 设置超时时间 单位为秒
        option.setConnectionTimeout(outTime);
        // 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        option.setKeepAliveInterval(KeepAlive);
        // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
        // option.setWill(topic, "close".getBytes(), 2, true);
        option.setMaxInflight(1000);
        log.info("================>>>MQTT连接认证成功<<======================");
        return option;
    }
    /**
     * 连接
     */
    public void connect(MqttConfig mqttConfig) {
        MqttClient client;
        try {
            String clientId = mqttConfig.getClientId();
            clientId += System.currentTimeMillis();
            client = new MqttClient(mqttConfig.getUrl(), clientId, new MemoryPersistence());
            MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
                    mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
            MqttPushClient.setClient(client);
            try {
                client.setCallback(new PushCallback<Object>(this, mqttConfig));
                if (!client.isConnected()) {
                    client.connect(options);
                    log.info("================>>>MQTT连接成功<<======================");
                     //订阅主题
                    subscribe(mqttConfig.getTopic(), mqttConfig.getQos());
                } else {// 这里的逻辑是如果连接不成功就重新连接
                    client.disconnect();
                    client.connect(options);
                    log.info("===================>>>MQTT断连成功<<<======================");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 断线重连
     *
     * @throws Exception
     */
    public Boolean reConnect() throws Exception {
        Boolean isConnected = false;
        if (null != client) {
            client.connect();
            if (client.isConnected()) {
                isConnected = true;
            }
        }
        return isConnected;
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic, String pushMessage) {
        publish(0, false, topic, pushMessage);
    }
    /**
     * 发布
     *
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===============>>>MQTT topic 不存在<<=======================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
     * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage) {
        publish(qos, false, topic, pushMessage);
    }
    /**
     * 订阅某个主题,qos默认为0
     *
     * @param topic
     */
    public void subscribe(String[] topic) {
        subscribe(topic, null);
    }
    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String[] topic, int[] qos) {
        try {
            MqttPushClient.getClient().unsubscribe(topic);
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
@Component(value = "mqttSender")
@Slf4j
public class MqttSender {
    @Async
    public void send(String queueName, String msg) {
        log.debug("=====================>>>>发送主题:{},  msg:{}", queueName,msg);
        publish(2, queueName, msg);
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic, String pushMessage) {
        publish(1, false, topic, pushMessage);
    }
    /**
     * 发布
     *
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===================>>>MQTT topic 不存在<<=================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            log.error("============>>>publish fail", e);
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
     * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage) {
        publish(qos, false, topic, pushMessage);
    }
}
    @Slf4j
    @Component
    public class PushCallback<component> implements MqttCallback {
        private MqttPushClient client;
        private MqttConfig mqttConfiguration;
        @Resource
        MqttService mqttService;
        public PushCallback(MqttPushClient client, MqttConfig mqttConfiguration) {
            this.client = client;
            this.mqttConfiguration = mqttConfiguration;
        }
        @Override
        public void connectionLost(Throwable cause) {
            /** 连接丢失后,一般在这里面进行重连 **/
            if (client != null) {
                while (true) {
                    try {
                        log.info("==============》》》[MQTT] 连接丢失,尝试重连...");
                        MqttPushClient mqttPushClient = new MqttPushClient();
                        mqttPushClient.connect(mqttConfiguration);
                        if (MqttPushClient.getClient().isConnected()) {
                            log.info("=============>>重连成功");
                        }
                        break;
                    } catch (Exception e) {
                        log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
                        continue;
                    }
                }
            }
            log.info(cause.getMessage());
        }
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // publish后会执行到这里
            log.info("pushComplete==============>>>" + token.isComplete());
        }
        /**
         * 监听对应的主题消息
         *
         * @param topic
         * @param message
         * @throws Exception
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            // subscribe后得到的消息会执行到这里面
            log.info("============》》接收消息主题 : " + topic);
            log.info("============》》接收消息Qos : " + message.getQos());
            log.info("============》》接收消息内容原始内容 : " + new String(message.getPayload()));
            log.info("============》》接收消息内容GB2312 : " + new String(message.getPayload(), "GB2312"));
            log.info("============》》接收消息内容UTF-8 : " + new String(message.getPayload(), "UTF-8"));
            try {
                if (topic.equals("datapoint")) {
                    MqttResponseBody mqttResponseBody = JSONUtils.jsonToBean(new String(message.getPayload(), "UTF-8"),
                            MqttResponseBody.class);
                    MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
                    mqttService.messageArrived(mqttResponseBody);
                } else if (topic.equals("heartbeat")) {
                    MqttResponseHeartbeat mqttResponseHeartbeat = JSONUtils
                            .jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseHeartbeat.class);
                    MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
                    mqttService.messageHeartbeat(mqttResponseHeartbeat);
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.info("============》》接收消息主题异常 : " + e.getMessage());
            }
        }
    }

我只抽离了部分代码(主要是订阅、处理订阅消息和发送消息部分),详细的可以点进原文看看。

这种整合方式理解起来是非常简单的,都是直接编码的,没有什么抽象的操作,业务不大的情况下,也是可以正常玩的。

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
3月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
52 3
|
16天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
3天前
|
存储 Java 调度
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
13 2
|
21天前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
45 1
|
1月前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
201 11
|
2月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
46 2
zabbix agent集成percona监控MySQL的插件实战案例
|
21天前
|
Java Spring
springboot 学习十一:Spring Boot 优雅的集成 Lombok
这篇文章是关于如何在Spring Boot项目中集成Lombok,以简化JavaBean的编写,避免冗余代码,并提供了相关的配置步骤和常用注解的介绍。
73 0
|
24天前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
38 0
|
3月前
|
消息中间件 分布式计算 大数据
RabbitMQ与大数据平台的集成
【8月更文第28天】在现代的大数据处理架构中,消息队列作为数据传输的关键组件扮演着重要的角色。RabbitMQ 是一个开源的消息代理软件,它支持多种消息协议,能够为分布式系统提供可靠的消息传递服务。本篇文章将探讨如何使用 RabbitMQ 与 Hadoop 和 Spark 进行集成,以实现高效的数据处理和分析。
33 1