关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

适合的读者,略微了解SpringBoot、消息队列的朋友,想了解和尝试使用Spring Integration框架,想扩展知识边界。

前言

后文案例代码:GitHub 代码 github.com/ningzaichun…

MQTT我想大部分朋友应该都是知道的,即使没有使用过MQTT,肯定也使用过它的兄弟们,RabbitMQ、RocketMQ和Kafka等消息队列.

但这次的主角并非是MQTT,而是 Spring Integration ,如果是没有怎么注意Spring官网的朋友,可能甚至都没咋听过 Spring Integration 框架,它是针对类似信息流的一个上层抽象,不只是MQTT,比如AMQP、MAIL都支持,贴一张官网的截图,诸如下列是都支持的。

image.png

正如Spring的一贯风格,比如以前刚学Spring 的时候,肯定是学过Spring Data,知道它就是针对数据库的一系列抽象。Spring Integration 也是如此,不过抽象的对象换成了信息流罢啦。

本篇文章更多的是起一个抛转引玉的作用,虽将大致内容都涵盖在内了,但部分代码的细节,是有欠考虑的,写在前文中,还望各位见谅。

本文大纲如下:

image.png

一、SpringBoot常规方式集成MQTT

先抛开 Spring Integration 不管,我们先看看常规的集成方式是什么的,后面再讲一讲Spring Integration 有哪些优点。

1.1、Docker 安装 EMQX

为了快捷,我并没有做多余的设置,直接粘贴,即可在Docker环境下,运行一个 EMQX 服务器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.1.3

详细可参考 EMQX Docker 部署指南

1.2、常规方式集成 EMQX

可能一些刚使用 SpringBoot 集成 EMQX(MQTT协议的服务实现)的朋友,大部分都是使用下面所阐述的方式进行整合的(在网上冲浪找的)。

偷了个小懒,下文代码示例来源:spring boot + mqtt 物联网开发

@Slf4j
public class MqttPushClient {
    private static MqttClient client;
    public static MqttClient getClient() {
        return client;
    }
    public static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }
    private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
        // MQTT连接设置
        MqttConnectOptions option = new MqttConnectOptions();
        // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
        option.setCleanSession(false);
        // 设置连接的用户名
        option.setUserName(userName);
        // 设置连接的密码
        option.setPassword(password.toCharArray());
        // 设置超时时间 单位为秒
        option.setConnectionTimeout(outTime);
        // 设置会话心跳时间 单位为秒 服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        option.setKeepAliveInterval(KeepAlive);
        // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
        // option.setWill(topic, "close".getBytes(), 2, true);
        option.setMaxInflight(1000);
        log.info("================>>>MQTT连接认证成功<<======================");
        return option;
    }
    /**
     * 连接
     */
    public void connect(MqttConfig mqttConfig) {
        MqttClient client;
        try {
            String clientId = mqttConfig.getClientId();
            clientId += System.currentTimeMillis();
            client = new MqttClient(mqttConfig.getUrl(), clientId, new MemoryPersistence());
            MqttConnectOptions options = getOption(mqttConfig.getUsername(), mqttConfig.getPassword(),
                    mqttConfig.getTimeout(), mqttConfig.getKeepAlive());
            MqttPushClient.setClient(client);
            try {
                client.setCallback(new PushCallback<Object>(this, mqttConfig));
                if (!client.isConnected()) {
                    client.connect(options);
                    log.info("================>>>MQTT连接成功<<======================");
                     //订阅主题
                    subscribe(mqttConfig.getTopic(), mqttConfig.getQos());
                } else {// 这里的逻辑是如果连接不成功就重新连接
                    client.disconnect();
                    client.connect(options);
                    log.info("===================>>>MQTT断连成功<<<======================");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 断线重连
     *
     * @throws Exception
     */
    public Boolean reConnect() throws Exception {
        Boolean isConnected = false;
        if (null != client) {
            client.connect();
            if (client.isConnected()) {
                isConnected = true;
            }
        }
        return isConnected;
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic, String pushMessage) {
        publish(0, false, topic, pushMessage);
    }
    /**
     * 发布
     *
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===============>>>MQTT topic 不存在<<=======================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
     * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage) {
        publish(qos, false, topic, pushMessage);
    }
    /**
     * 订阅某个主题,qos默认为0
     *
     * @param topic
     */
    public void subscribe(String[] topic) {
        subscribe(topic, null);
    }
    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String[] topic, int[] qos) {
        try {
            MqttPushClient.getClient().unsubscribe(topic);
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
@Component(value = "mqttSender")
@Slf4j
public class MqttSender {
    @Async
    public void send(String queueName, String msg) {
        log.debug("=====================>>>>发送主题:{},  msg:{}", queueName,msg);
        publish(2, queueName, msg);
    }
    /**
     * 发布,默认qos为0,非持久化
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(String topic, String pushMessage) {
        publish(1, false, topic, pushMessage);
    }
    /**
     * 发布
     *
     * @param qos
     * @param retained
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            log.error("===================>>>MQTT topic 不存在<<=================");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            log.error("============>>>publish fail", e);
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布消息的服务质量(推荐为:2-确保消息到达一次。0-至多一次到达;1-至少一次到达,可能重复), retained
     * 默认:false-非持久化(是指一条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费)
     *
     * @param topic
     * @param pushMessage
     */
    public void publish(int qos, String topic, String pushMessage) {
        publish(qos, false, topic, pushMessage);
    }
}
    @Slf4j
    @Component
    public class PushCallback<component> implements MqttCallback {
        private MqttPushClient client;
        private MqttConfig mqttConfiguration;
        @Resource
        MqttService mqttService;
        public PushCallback(MqttPushClient client, MqttConfig mqttConfiguration) {
            this.client = client;
            this.mqttConfiguration = mqttConfiguration;
        }
        @Override
        public void connectionLost(Throwable cause) {
            /** 连接丢失后,一般在这里面进行重连 **/
            if (client != null) {
                while (true) {
                    try {
                        log.info("==============》》》[MQTT] 连接丢失,尝试重连...");
                        MqttPushClient mqttPushClient = new MqttPushClient();
                        mqttPushClient.connect(mqttConfiguration);
                        if (MqttPushClient.getClient().isConnected()) {
                            log.info("=============>>重连成功");
                        }
                        break;
                    } catch (Exception e) {
                        log.error("=============>>>[MQTT] 连接断开,重连失败!<<=============");
                        continue;
                    }
                }
            }
            log.info(cause.getMessage());
        }
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // publish后会执行到这里
            log.info("pushComplete==============>>>" + token.isComplete());
        }
        /**
         * 监听对应的主题消息
         *
         * @param topic
         * @param message
         * @throws Exception
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            // subscribe后得到的消息会执行到这里面
            log.info("============》》接收消息主题 : " + topic);
            log.info("============》》接收消息Qos : " + message.getQos());
            log.info("============》》接收消息内容原始内容 : " + new String(message.getPayload()));
            log.info("============》》接收消息内容GB2312 : " + new String(message.getPayload(), "GB2312"));
            log.info("============》》接收消息内容UTF-8 : " + new String(message.getPayload(), "UTF-8"));
            try {
                if (topic.equals("datapoint")) {
                    MqttResponseBody mqttResponseBody = JSONUtils.jsonToBean(new String(message.getPayload(), "UTF-8"),
                            MqttResponseBody.class);
                    MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
                    mqttService.messageArrived(mqttResponseBody);
                } else if (topic.equals("heartbeat")) {
                    MqttResponseHeartbeat mqttResponseHeartbeat = JSONUtils
                            .jsonToBean(new String(message.getPayload(), "UTF-8"), MqttResponseHeartbeat.class);
                    MqttService mqttService = SpringUtil.getBean(MqttServiceImpl.class);
                    mqttService.messageHeartbeat(mqttResponseHeartbeat);
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.info("============》》接收消息主题异常 : " + e.getMessage());
            }
        }
    }

我只抽离了部分代码(主要是订阅、处理订阅消息和发送消息部分),详细的可以点进原文看看。

这种整合方式理解起来是非常简单的,都是直接编码的,没有什么抽象的操作,业务不大的情况下,也是可以正常玩的。

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1天前
|
安全 Java 测试技术
Spring Boot集成支付宝支付:概念与实战
【4月更文挑战第29天】在电子商务和在线业务应用中,集成有效且安全的支付解决方案是至关重要的。支付宝作为中国领先的支付服务提供商,其支付功能的集成可以显著提升用户体验。本篇博客将详细介绍如何在Spring Boot应用中集成支付宝支付功能,并提供一个实战示例。
37 2
|
1天前
|
NoSQL Java MongoDB
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
【5月更文挑战第11天】本文介绍了如何将非关系型数据库MongoDB与Spring Boot框架集成,以实现高效灵活的数据管理。Spring Boot简化了Spring应用的构建和部署,MongoDB则以其对灵活数据结构的处理能力受到青睐。集成步骤包括:添加MongoDB依赖、配置连接信息、创建数据访问对象(DAO)以及进行数据操作。通过这种方式,开发者可以充分利用两者优势,应对各种数据需求。在实际应用中,结合微服务架构等技术,可以构建高性能、可扩展的系统。掌握MongoDB与Spring Boot集成对于提升开发效率和项目质量至关重要,未来有望在更多领域得到广泛应用。
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
|
1天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
1天前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
6 0
|
1天前
|
安全 Java 数据库连接
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
|
1天前
|
Android开发
Android 高通平台集成无源码apk示例
Android 高通平台集成无源码apk示例
16 0
|
1天前
|
数据采集 Web App开发 Java
Python 爬虫:Spring Boot 反爬虫的成功案例
Python 爬虫:Spring Boot 反爬虫的成功案例
|
1天前
|
Java Maven Nacos
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
31 0
|
1天前
|
缓存 Java Spring
单体项目中资源管理模块集成Spring Cache
该内容是关于将Spring Cache集成到资源管理模块以实现缓存同步的说明。主要策略包括:查询时添加到缓存,增删改时删除相关缓存。示例代码展示了@Service类中使用@Transactional和@Cacheable注解进行缓存操作,以及在RedisTemplate中处理缓存的示例。
24 5
|
1天前
|
Java 测试技术 Spring
Spring系列文章:Spring集成Log4j2⽇志框架、整合JUnit
Spring系列文章:Spring集成Log4j2⽇志框架、整合JUnit