关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

适合的读者,略微了解SpringBoot、消息队列的朋友,想了解和尝试使用Spring Integration框架,想扩展知识边界。

前言

后文案例代码:GitHub 代码 github.com/ningzaichun…

MQTT我想大部分朋友应该都是知道的,即使没有使用过MQTT,肯定也使用过它的兄弟们,RabbitMQ、RocketMQ和Kafka等消息队列.

但这次的主角并非是MQTT,而是 Spring Integration ,如果是没有怎么注意Spring官网的朋友,可能甚至都没咋听过 Spring Integration 框架,它是针对类似信息流的一个上层抽象,不只是MQTT,比如AMQP、MAIL都支持,贴一张官网的截图,诸如下列是都支持的。

image.png

正如Spring的一贯风格,比如以前刚学Spring 的时候,肯定是学过Spring Data,知道它就是针对数据库的一系列抽象。Spring Integration 也是如此,不过抽象的对象换成了信息流罢啦。

本篇文章更多的是起一个抛转引玉的作用,虽将大致内容都涵盖在内了,但部分代码的细节,是有欠考虑的,写在前文中,还望各位见谅。

本文大纲如下:

image.png

一、SpringBoot常规方式集成MQTT

先抛开 Spring Integration 不管,我们先看看常规的集成方式是什么的,后面再讲一讲Spring Integration 有哪些优点。

1.1、Docker 安装 EMQX

为了快捷,我并没有做多余的设置,直接粘贴,即可在Docker环境下,运行一个 EMQX 服务器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.1.3

详细可参考 EMQX Docker 部署指南

1.2、常规方式集成 EMQX

可能一些刚使用 SpringBoot 集成 EMQX(MQTT协议的服务实现)的朋友,大部分都是使用下面所阐述的方式进行整合的(在网上冲浪找的)。

偷了个小懒,下文代码示例来源:spring boot + mqtt 物联网开发

@Slf4j
public class MqttPushClient {
    private static MqttClient client;
    public static MqttClient getClient() {
        return client;
    }
    public static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }
    private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
        // MQTT连接设置
        MqttConnectOptions option = new MqttConnectOptions();
        // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        option.setCleanSession(false);
        // 设置连接的用户名
        option.setUserName(userName);
        // 设置连接的密码
        option.setPassword(password.toCharArray());
        // 设置超时时间 单位为秒
        option.setConnectionTimeout(outTime);
        // 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        option.setKeepAliveInterval(KeepAlive);
        // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
        // option.setWill(topic, "close".getBytes(), 2, true);
        option.setMaxInflight(1000);
        log.info("================>>>MQTT连接认证成功<<======================");
        return option;
    }
    /**
     * 连接
     */
    public void connect(MqttConfig mqttConfig) {
        MqttClient client;
        try {
            String clientId = mqttConfig.getClientId();
            clientId += System.currentTimeMillis();
            client = new MqttClient(mqttConfig.getUrl(), clientId, new MemoryPersistence());
            MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
                    mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
            MqttPushClient.setClient(client);
            try {
                client.setCallback(new PushCallback<Object>(this, mqttConfig));
                if (!client.isConnected()) {
                    client.connect(options);
                    log.info("================>>>MQTT连接成功<<======================");
                     //订阅主题
                    subscribe(mqttConfig.getTopic(), mqttConfig.getQos());
                } else {// 这里的逻辑是如果连接不成功就重新连接
                    client.disconnect();
                    client.connect(options);
                    log.info("===================>>>MQTT断连成功<<<======================");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 断线重连
     *
     * @throws Exception
     */
    public Boolean reConnect() throws Exception {
        Boolean isConnected = false;
        if (null != client) {
            client.connect();
            if (client.isConnected()) {
                isConnected = true;
            }
        }
        return isConnected;
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic, String pushMessage) {
        publish(0, false, topic, pushMessage);
    }
    /**
     * 发布
     *
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===============>>>MQTT topic 不存在<<=======================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
     * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage) {
        publish(qos, false, topic, pushMessage);
    }
    /**
     * 订阅某个主题,qos默认为0
     *
     * @param topic
     */
    public void subscribe(String[] topic) {
        subscribe(topic, null);
    }
    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String[] topic, int[] qos) {
        try {
            MqttPushClient.getClient().unsubscribe(topic);
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
@Component(value = "mqttSender")
@Slf4j
public class MqttSender {
    @Async
    public void send(String queueName, String msg) {
        log.debug("=====================>>>>发送主题:{},  msg:{}", queueName,msg);
        publish(2, queueName, msg);
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic, String pushMessage) {
        publish(1, false, topic, pushMessage);
    }
    /**
     * 发布
     *
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===================>>>MQTT topic 不存在<<=================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            log.error("============>>>publish fail", e);
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
     * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage) {
        publish(qos, false, topic, pushMessage);
    }
}
    @Slf4j
    @Component
    public class PushCallback<component> implements MqttCallback {
        private MqttPushClient client;
        private MqttConfig mqttConfiguration;
        @Resource
        MqttService mqttService;
        public PushCallback(MqttPushClient client, MqttConfig mqttConfiguration) {
            this.client = client;
            this.mqttConfiguration = mqttConfiguration;
        }
        @Override
        public void connectionLost(Throwable cause) {
            /** 连接丢失后,一般在这里面进行重连 **/
            if (client != null) {
                while (true) {
                    try {
                        log.info("==============》》》[MQTT] 连接丢失,尝试重连...");
                        MqttPushClient mqttPushClient = new MqttPushClient();
                        mqttPushClient.connect(mqttConfiguration);
                        if (MqttPushClient.getClient().isConnected()) {
                            log.info("=============>>重连成功");
                        }
                        break;
                    } catch (Exception e) {
                        log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
                        continue;
                    }
                }
            }
            log.info(cause.getMessage());
        }
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // publish后会执行到这里
            log.info("pushComplete==============>>>" + token.isComplete());
        }
        /**
         * 监听对应的主题消息
         *
         * @param topic
         * @param message
         * @throws Exception
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            // subscribe后得到的消息会执行到这里面
            log.info("============》》接收消息主题 : " + topic);
            log.info("============》》接收消息Qos : " + message.getQos());
            log.info("============》》接收消息内容原始内容 : " + new String(message.getPayload()));
            log.info("============》》接收消息内容GB2312 : " + new String(message.getPayload(), "GB2312"));
            log.info("============》》接收消息内容UTF-8 : " + new String(message.getPayload(), "UTF-8"));
            try {
                if (topic.equals("datapoint")) {
                    MqttResponseBody mqttResponseBody = JSONUtils.jsonToBean(new String(message.getPayload(), "UTF-8"),
                            MqttResponseBody.class);
                    MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
                    mqttService.messageArrived(mqttResponseBody);
                } else if (topic.equals("heartbeat")) {
                    MqttResponseHeartbeat mqttResponseHeartbeat = JSONUtils
                            .jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseHeartbeat.class);
                    MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
                    mqttService.messageHeartbeat(mqttResponseHeartbeat);
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.info("============》》接收消息主题异常 : " + e.getMessage());
            }
        }
    }

我只抽离了部分代码(主要是订阅、处理订阅消息和发送消息部分),详细的可以点进原文看看。

这种整合方式理解起来是非常简单的,都是直接编码的,没有什么抽象的操作,业务不大的情况下,也是可以正常玩的。

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
115 2
|
2月前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
117 5
|
11天前
|
缓存 安全 Java
Spring Boot 3 集成 Spring Security + JWT
本文详细介绍了如何使用Spring Boot 3和Spring Security集成JWT,实现前后端分离的安全认证概述了从入门到引入数据库,再到使用JWT的完整流程。列举了项目中用到的关键依赖,如MyBatis-Plus、Hutool等。简要提及了系统配置表、部门表、字典表等表结构。使用Hutool-jwt工具类进行JWT校验。配置忽略路径、禁用CSRF、添加JWT校验过滤器等。实现登录接口,返回token等信息。
175 12
|
17天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
17天前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
69 8
|
1月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
66 2
|
2月前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
81 9
|
2月前
|
存储 Java 调度
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
39 2
|
2月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
67 0
|
3月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
219 5

热门文章

最新文章