关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1:https://developer.aliyun.com/article/1394927

1.3、常规方式的优缺点

优点:

1、学习成本相对较低,代码理解难度低,上手快。

2、易封装,没有其他框架的限制,自定义化程度高。

缺点:

1、业务耦合性大。较少的主题下,可能还不会有什么感觉,如果后期topic慢慢多了起来,不同的业务有不同的处理方式,你这边都要进行相应处理的时候,就麻烦起来了。要是再出现,针对同一个主题的消息,根据消息体的不同,也要进行不同的处理,就….

2、没有框架,自由度大,相对也意味着代码量相对要大一些,一些没有封装的处理,都需要自己去进行处理。

二、Spring Integration 的基础概念

Spring Integration 5.5 版本文档

2.1、是什么

Spring Integration 提供了 Spring 编程模型的扩展,它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。

Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

Spring Integration 支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员也摆脱了复杂的集成责任。


也许你此刻阅读完这里,仍然会很懵,但请相信我,在你看完整篇文章之后,你会完全理解上述文字的。

2.2、什么促使了 Spring Integration 的诞生

Spring Integration 的动机如下:

  • 提供用于实施复杂企业集成解决方案的简单模型。
  • 在基于 Spring 的应用程序中促进异步、消息驱动的行为。
  • 促进现有 Spring 用户直观、增量的采用。

Spring Integration 遵循以下原则:

  • 组件应该松散耦合以实现模块化和可测试性。
  • 该框架应该强制业务逻辑和集成逻辑之间的关注点分离。
  • 扩展点本质上应该是抽象的(但在明确定义的边界内),以促进重用和可移植性。

来自官方文档。

2.3、基础概念

Spring Integraion 有几个比较重要的基础概念,理解完之后,看代码将会变得十分简单,此处只是抽取了常用且本文已经使用到的概念,完整的还请阅读 Spring Integration 文档

1、Message 见名知意就知是我们需要发送或接收的消息。

Spring Integration 中,它由有效负载和标头组成。Payload(有效负载)可以是任何类型,Header(标头)包含常用的必需信息,例如 ID、时间戳、相关 ID 和返回地址。标头还用于在连接的传输之间传递值。

image.png

2、Message Channel 消息通道代表管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。

因此,消息通道解耦了消息传递组件,并且还为消息拦截和监视提供了便利的点。

image.png

实际框架中针对Channel 的实现有多种,后文案例中暂时只使用了点对点的 DirectChannel 通道。

更多Channel的实现,请查阅:Message Channel Implementations

3、Message Transformer 消息转换器负责转换消息的内容或结构并返回修改后的消息。最常见的转换器类型可能是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为java.lang.String 或者是 byte[] 转为Java对象)。

比如后面案例中的一段代码:

image.png

4、Message Router 消息路由器负责决定接下来应该接收该消息的一个或多个通道(如果有)。通常,消息路由(Router)可根据消息体类型(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表(Recipient List Router)作为条件,来决定消息传递到的通道。

image.png

白话文就是我们可以根据信息中的某个字段,判断这条信息,到底要被我们投递到那个通道去

5、Service Activator 服务激活器是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法有返回值,还可以提供输出消息通道。

服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不需要消息类型参数)。每当服务对象的方法返回一个值时,如果需要,该返回值同样会转换为回复消息(如果它还不是消息类型)。该回复消息被发送到输出通道。

image.png

图 4.Service Activator

image.png实际上 Service Activator 在代码中是一个 @ServiceActivator()注解,如下案例:


6、Channel Adapter通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站适配器,也可以是出站适配器。通常,通道适配器在消息与从其他系统接收或发送到其他系统的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。根据传输方式,通道适配器还可以填充或提取消息标头值。

image.png

Channel Adapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

写的时候,浅浅的翻阅了下源码,大致是这三个类,等看了后面的案例,然后再看下这几个类,流程还是很容易懂的。

image.png

连接MQTT的代码在MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe() 中。

只是在官方文档中,挑选了部分概念拿出来简单的讲述了一下,有很多的文字还是直接copy 的官网文档,感兴趣的话,还是更建议你去拜读官方文档,祝你能有所收获。

三、图:Spring Integration 案例大致流程

在讲代码之前,画了一张图,简单讲述一下大致数据流转流程是什么样的,同时也便于理解后面的代码是如何的(见谅,不好改成竖图啦)

image.png

数据的大致流转过程就如上图这般,将这副图和上文中所谈及的概念,关联起来,应该能理解大部分啦。

具体的 Spring Integration 的流程图,其实远比这张图的流程要复杂(主要是牵扯到的上层抽象比较多),上图更多的是对后面的案例中的数据的一个数据流转图,让大家能更好的理解代码。

四、完整案例:使用 Spring Integration 整合 MQTT

代码主要借鉴于大疆官方开源项目 (大疆的上云API的一个DEMO项目),主体部分更是如此,可以说是弄了一个简化版,然后写下了这篇学习的博客

笔者DEMO项目地址: springboot-integration-mqtt-demo

4.1、项目结构

image.png

就常规项目结构,普通且简单~

相关依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

4.2、配置文件和MqttConfiguration

yaml配置文件:

    server:
      port: 9876
    spring:
      application:
        name: spring-integration-mqtt-demo
    mqtt:
      # BASIC parameters are required.
      BASIC:
        protocol: MQTT
        host: 192.168.79.133
        port: 1883
        username:
        password:
        client-id: 123456
        # If the protocol is ws/wss, this value is required.
        path:
        # 在最初连接到mqtt时需要订阅的主题,多个主题用“,”分隔。
        inbound-topic: mysys/+/envents_test
      # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
      DRC:
        protocol: WS
        host: 192.168.79.133
        port: 8083
        path: /mqtt
    logging:
      level:
        com.com.example.mqtt: debug
      file:
        name: logs/springboot-integration-mqtt-demo.log

具体的MQTT的连接参数是在红框标记的地方整合到 MqttConnectOptions 中的,但实际上它是采用MqttUseEnum 枚举的方式将yaml配置文件的参数映射到MqttClientOptions ,坦白说,用起来是真的舒服啊

image.png

主要是两个地方:

1、一个使用枚举类来映射ymal文件,可以学习学习

2、MqttConnectOptions 是基础的一些设置,比如配置认证参数、设置超时时间等连接Broker的连接参数,细节可以等到使用的时候再进一步观察。

不过DRC 那部分(主要用于websocket),不过没整合到这个案例中,下次吧,下次吧。

4.3、MessageChannel

写了这么多,都忘记说了说明 MessageChannel 啦,实际上,诸如@ServiceActivator(inputChannel = ChannelName.DEFAULT)都是提前注册在bean当中的,否则是没法使用的。

这一点,我在前文的编写中,忘记啦。

    @Configuration
    public class MqttMessageChannel {
        @Autowired
        private Executor threadPool;
        @Bean(name = ChannelName.INBOUND)
        public MessageChannel inboundChannel() {
            return new ExecutorChannel(threadPool);
        }
        @Bean(name = ChannelName.ENVENTS_INBOUND_TEST)
        public MessageChannel enventsInboundTest() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST1)
        public MessageChannel inboundTaskTest1() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST2)
        public MessageChannel inboundTaskTest2() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST3)
        public MessageChannel inboundTaskTest3() {
            return new DirectChannel();
        }
    }

补充:DirectChannel 是其中的一种消息通道,是一个点对点的通道,它直接将消息分派给订阅者,同时也是最常用的通道。

Channel的具体的实现有多种,可参考官方文档:Message Channels

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3:https://developer.aliyun.com/article/1394929

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7天前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
17天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
缓存 Java 开发工具
Spring是如何解决循环依赖的?从底层源码入手,详细解读Spring框架的三级缓存
三级缓存是Spring框架里,一个经典的技术点,它很好地解决了循环依赖的问题,也是很多面试中会被问到的问题,本文从源码入手,详细剖析Spring三级缓存的来龙去脉。
164 24
Spring是如何解决循环依赖的?从底层源码入手,详细解读Spring框架的三级缓存
|
2月前
|
缓存 安全 Java
Spring框架中Bean是如何加载的?从底层源码入手,详细解读Bean的创建流程
从底层源码入手,通过代码示例,追踪AnnotationConfigApplicationContext加载配置类、启动Spring容器的整个流程,并对IOC、BeanDefinition、PostProcesser等相关概念进行解释
160 24
Spring框架中Bean是如何加载的?从底层源码入手,详细解读Bean的创建流程
|
23天前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
|
23天前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
25天前
|
存储 数据可视化 JavaScript
可视化集成API接口请求+变量绑定+源码输出
可视化集成API接口请求+变量绑定+源码输出
37 4
|
1月前
|
Java Spring 容器
Spring IOC、AOP与事务管理底层原理及源码解析
【10月更文挑战第1天】Spring框架以其强大的控制反转(IOC)和面向切面编程(AOP)功能,成为Java企业级开发中的首选框架。本文将深入探讨Spring IOC和AOP的底层原理,并通过源码解析来揭示其实现机制。同时,我们还将探讨Spring事务管理的核心原理,并给出相应的源码示例。
117 9
|
29天前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
29天前
|
缓存 Java Spring
源码解读:Spring如何解决构造器注入的循环依赖?
本文详细探讨了Spring框架中的循环依赖问题,包括构造器注入和字段注入两种情况,并重点分析了构造器注入循环依赖的解决方案。文章通过具体示例展示了循环依赖的错误信息及常见场景,提出了三种解决方法:重构代码、使用字段依赖注入以及使用`@Lazy`注解。其中,`@Lazy`注解通过延迟初始化和动态代理机制有效解决了循环依赖问题。作者建议优先使用`@Lazy`注解,并提供了详细的源码解析和调试截图,帮助读者深入理解其实现机制。
20 1