关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1:https://developer.aliyun.com/article/1394927

1.3、常规方式的优缺点

优点:

1、学习成本相对较低,代码理解难度低,上手快。

2、易封装,没有其他框架的限制,自定义化程度高。

缺点:

1、业务耦合性大。较少的主题下,可能还不会有什么感觉,如果后期topic慢慢多了起来,不同的业务有不同的处理方式,你这边都要进行相应处理的时候,就麻烦起来了。要是再出现,针对同一个主题的消息,根据消息体的不同,也要进行不同的处理,就….

2、没有框架,自由度大,相对也意味着代码量相对要大一些,一些没有封装的处理,都需要自己去进行处理。

二、Spring Integration 的基础概念

Spring Integration 5.5 版本文档

2.1、是什么

Spring Integration 提供了 Spring 编程模型的扩展,它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。

Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

Spring Integration 支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员也摆脱了复杂的集成责任。


也许你此刻阅读完这里,仍然会很懵,但请相信我,在你看完整篇文章之后,你会完全理解上述文字的。

2.2、什么促使了 Spring Integration 的诞生

Spring Integration 的动机如下:

  • 提供用于实施复杂企业集成解决方案的简单模型。
  • 在基于 Spring 的应用程序中促进异步、消息驱动的行为。
  • 促进现有 Spring 用户直观、增量的采用。

Spring Integration 遵循以下原则:

  • 组件应该松散耦合以实现模块化和可测试性。
  • 该框架应该强制业务逻辑和集成逻辑之间的关注点分离。
  • 扩展点本质上应该是抽象的(但在明确定义的边界内),以促进重用和可移植性。

来自官方文档。

2.3、基础概念

Spring Integraion 有几个比较重要的基础概念,理解完之后,看代码将会变得十分简单,此处只是抽取了常用且本文已经使用到的概念,完整的还请阅读 Spring Integration 文档

1、Message 见名知意就知是我们需要发送或接收的消息。

Spring Integration 中,它由有效负载和标头组成。Payload(有效负载)可以是任何类型,Header(标头)包含常用的必需信息,例如 ID、时间戳、相关 ID 和返回地址。标头还用于在连接的传输之间传递值。

image.png

2、Message Channel 消息通道代表管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。

因此,消息通道解耦了消息传递组件,并且还为消息拦截和监视提供了便利的点。

image.png

实际框架中针对Channel 的实现有多种,后文案例中暂时只使用了点对点的 DirectChannel 通道。

更多Channel的实现,请查阅:Message Channel Implementations

3、Message Transformer 消息转换器负责转换消息的内容或结构并返回修改后的消息。最常见的转换器类型可能是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为java.lang.String 或者是 byte[] 转为Java对象)。

比如后面案例中的一段代码:

image.png

4、Message Router 消息路由器负责决定接下来应该接收该消息的一个或多个通道(如果有)。通常,消息路由(Router)可根据消息体类型(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表(Recipient List Router)作为条件,来决定消息传递到的通道。

image.png

白话文就是我们可以根据信息中的某个字段,判断这条信息,到底要被我们投递到那个通道去

5、Service Activator 服务激活器是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法有返回值,还可以提供输出消息通道。

服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不需要消息类型参数)。每当服务对象的方法返回一个值时,如果需要,该返回值同样会转换为回复消息(如果它还不是消息类型)。该回复消息被发送到输出通道。

image.png

图 4.Service Activator

image.png实际上 Service Activator 在代码中是一个 @ServiceActivator()注解,如下案例:


6、Channel Adapter通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站适配器,也可以是出站适配器。通常,通道适配器在消息与从其他系统接收或发送到其他系统的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。根据传输方式,通道适配器还可以填充或提取消息标头值。

image.png

Channel Adapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

写的时候,浅浅的翻阅了下源码,大致是这三个类,等看了后面的案例,然后再看下这几个类,流程还是很容易懂的。

image.png

连接MQTT的代码在MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe() 中。

只是在官方文档中,挑选了部分概念拿出来简单的讲述了一下,有很多的文字还是直接copy 的官网文档,感兴趣的话,还是更建议你去拜读官方文档,祝你能有所收获。

三、图:Spring Integration 案例大致流程

在讲代码之前,画了一张图,简单讲述一下大致数据流转流程是什么样的,同时也便于理解后面的代码是如何的(见谅,不好改成竖图啦)

image.png

数据的大致流转过程就如上图这般,将这副图和上文中所谈及的概念,关联起来,应该能理解大部分啦。

具体的 Spring Integration 的流程图,其实远比这张图的流程要复杂(主要是牵扯到的上层抽象比较多),上图更多的是对后面的案例中的数据的一个数据流转图,让大家能更好的理解代码。

四、完整案例:使用 Spring Integration 整合 MQTT

代码主要借鉴于大疆官方开源项目 (大疆的上云API的一个DEMO项目),主体部分更是如此,可以说是弄了一个简化版,然后写下了这篇学习的博客

笔者DEMO项目地址: springboot-integration-mqtt-demo

4.1、项目结构

image.png

就常规项目结构,普通且简单~

相关依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

4.2、配置文件和MqttConfiguration

yaml配置文件:

    server:
      port: 9876
    spring:
      application:
        name: spring-integration-mqtt-demo
    mqtt:
      # BASIC parameters are required.
      BASIC:
        protocol: MQTT
        host: 192.168.79.133
        port: 1883
        username:
        password:
        client-id: 123456
        # If the protocol is ws/wss, this value is required.
        path:
        # 在最初连接到mqtt时需要订阅的主题,多个主题用“,”分隔。
        inbound-topic: mysys/+/envents_test
      # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
      DRC:
        protocol: WS
        host: 192.168.79.133
        port: 8083
        path: /mqtt
    logging:
      level:
        com.com.example.mqtt: debug
      file:
        name: logs/springboot-integration-mqtt-demo.log

具体的MQTT的连接参数是在红框标记的地方整合到 MqttConnectOptions 中的,但实际上它是采用MqttUseEnum 枚举的方式将yaml配置文件的参数映射到MqttClientOptions ,坦白说,用起来是真的舒服啊

image.png

主要是两个地方:

1、一个使用枚举类来映射ymal文件,可以学习学习

2、MqttConnectOptions 是基础的一些设置,比如配置认证参数、设置超时时间等连接Broker的连接参数,细节可以等到使用的时候再进一步观察。

不过DRC 那部分(主要用于websocket),不过没整合到这个案例中,下次吧,下次吧。

4.3、MessageChannel

写了这么多,都忘记说了说明 MessageChannel 啦,实际上,诸如@ServiceActivator(inputChannel = ChannelName.DEFAULT)都是提前注册在bean当中的,否则是没法使用的。

这一点,我在前文的编写中,忘记啦。

    @Configuration
    public class MqttMessageChannel {
        @Autowired
        private Executor threadPool;
        @Bean(name = ChannelName.INBOUND)
        public MessageChannel inboundChannel() {
            return new ExecutorChannel(threadPool);
        }
        @Bean(name = ChannelName.ENVENTS_INBOUND_TEST)
        public MessageChannel enventsInboundTest() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST1)
        public MessageChannel inboundTaskTest1() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST2)
        public MessageChannel inboundTaskTest2() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST3)
        public MessageChannel inboundTaskTest3() {
            return new DirectChannel();
        }
    }

补充:DirectChannel 是其中的一种消息通道,是一个点对点的通道,它直接将消息分派给订阅者,同时也是最常用的通道。

Channel的具体的实现有多种,可参考官方文档:Message Channels

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3:https://developer.aliyun.com/article/1394929

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
数据可视化 Java API
Spring Boot与Swagger的集成
Spring Boot与Swagger的集成
|
3月前
|
Java API 开发者
在Spring Boot中集成Swagger API文档
在Spring Boot中集成Swagger API文档
|
1月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
31 2
zabbix agent集成percona监控MySQL的插件实战案例
|
2月前
|
Java 数据库连接 Spring
后端框架入门超详细 三部曲 Spring 、SpringMVC、Mybatis、SSM框架整合案例 【爆肝整理五万字】
文章是关于Spring、SpringMVC、Mybatis三个后端框架的超详细入门教程,包括基础知识讲解、代码案例及SSM框架整合的实战应用,旨在帮助读者全面理解并掌握这些框架的使用。
后端框架入门超详细 三部曲 Spring 、SpringMVC、Mybatis、SSM框架整合案例 【爆肝整理五万字】
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14657 24
|
2月前
|
NoSQL 关系型数据库 MySQL
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
93 2
|
2月前
|
JSON 数据管理 关系型数据库
【Dataphin V3.9】颠覆你的数据管理体验!API数据源接入与集成优化,如何让企业轻松驾驭海量异构数据,实现数据价值最大化?全面解析、实战案例、专业指导,带你解锁数据整合新技能!
【8月更文挑战第15天】随着大数据技术的发展,企业对数据处理的需求不断增长。Dataphin V3.9 版本提供更灵活的数据源接入和高效 API 集成能力,支持 MySQL、Oracle、Hive 等多种数据源,增强 RESTful 和 SOAP API 支持,简化外部数据服务集成。例如,可轻松从 RESTful API 获取销售数据并存储分析。此外,Dataphin V3.9 还提供数据同步工具和丰富的数据治理功能,确保数据质量和一致性,助力企业最大化数据价值。
114 1
|
2月前
|
人工智能 Java API
JeecgBoot 低代码平台快速集成 Spring AI
Spring 通过 Spring AI 项目正式启用了 AI(人工智能)生成提示功能。本文将带你了解如何在 Jeecg Boot 应用中集成生成式 AI,以及 Spring AI 如何与模型互动,包含 RAG 功能。
100 3
|
2月前
|
机器学习/深度学习 存储 搜索推荐
Elasticsearch与深度学习框架的集成案例研究
Elasticsearch 是一个强大的搜索引擎和分析引擎,广泛应用于实时数据处理和全文搜索。深度学习框架如 TensorFlow 和 PyTorch 则被用来构建复杂的机器学习模型。本文将探讨如何将 Elasticsearch 与这些深度学习框架集成,以实现高级的数据分析和预测任务。
26 0
|
2月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
144 0
RocketMQ—一次连接namesvr失败的案例分析

热门文章

最新文章

下一篇
无影云桌面