关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1:https://developer.aliyun.com/article/1394927

1.3、常规方式的优缺点

优点:

1、学习成本相对较低,代码理解难度低,上手快。

2、易封装,没有其他框架的限制,自定义化程度高。

缺点:

1、业务耦合性大。较少的主题下,可能还不会有什么感觉,如果后期topic慢慢多了起来,不同的业务有不同的处理方式,你这边都要进行相应处理的时候,就麻烦起来了。要是再出现,针对同一个主题的消息,根据消息体的不同,也要进行不同的处理,就….

2、没有框架,自由度大,相对也意味着代码量相对要大一些,一些没有封装的处理,都需要自己去进行处理。

二、Spring Integration 的基础概念

Spring Integration 5.5 版本文档

2.1、是什么

Spring Integration 提供了 Spring 编程模型的扩展,它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。

Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

Spring Integration 支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员也摆脱了复杂的集成责任。


也许你此刻阅读完这里,仍然会很懵,但请相信我,在你看完整篇文章之后,你会完全理解上述文字的。

2.2、什么促使了 Spring Integration 的诞生

Spring Integration 的动机如下:

  • 提供用于实施复杂企业集成解决方案的简单模型。
  • 在基于 Spring 的应用程序中促进异步、消息驱动的行为。
  • 促进现有 Spring 用户直观、增量的采用。

Spring Integration 遵循以下原则:

  • 组件应该松散耦合以实现模块化和可测试性。
  • 该框架应该强制业务逻辑和集成逻辑之间的关注点分离。
  • 扩展点本质上应该是抽象的(但在明确定义的边界内),以促进重用和可移植性。

来自官方文档。

2.3、基础概念

Spring Integraion 有几个比较重要的基础概念,理解完之后,看代码将会变得十分简单,此处只是抽取了常用且本文已经使用到的概念,完整的还请阅读 Spring Integration 文档

1、Message 见名知意就知是我们需要发送或接收的消息。

Spring Integration 中,它由有效负载和标头组成。Payload(有效负载)可以是任何类型,Header(标头)包含常用的必需信息,例如 ID、时间戳、相关 ID 和返回地址。标头还用于在连接的传输之间传递值。

image.png

2、Message Channel 消息通道代表管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。

因此,消息通道解耦了消息传递组件,并且还为消息拦截和监视提供了便利的点。

image.png

实际框架中针对Channel 的实现有多种,后文案例中暂时只使用了点对点的 DirectChannel 通道。

更多Channel的实现,请查阅:Message Channel Implementations

3、Message Transformer 消息转换器负责转换消息的内容或结构并返回修改后的消息。最常见的转换器类型可能是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为java.lang.String 或者是 byte[] 转为Java对象)。

比如后面案例中的一段代码:

image.png

4、Message Router 消息路由器负责决定接下来应该接收该消息的一个或多个通道(如果有)。通常,消息路由(Router)可根据消息体类型(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表(Recipient List Router)作为条件,来决定消息传递到的通道。

image.png

白话文就是我们可以根据信息中的某个字段,判断这条信息,到底要被我们投递到那个通道去

5、Service Activator 服务激活器是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法有返回值,还可以提供输出消息通道。

服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不需要消息类型参数)。每当服务对象的方法返回一个值时,如果需要,该返回值同样会转换为回复消息(如果它还不是消息类型)。该回复消息被发送到输出通道。

image.png

图 4.Service Activator

image.png实际上 Service Activator 在代码中是一个 @ServiceActivator()注解,如下案例:


6、Channel Adapter通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站适配器,也可以是出站适配器。通常,通道适配器在消息与从其他系统接收或发送到其他系统的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。根据传输方式,通道适配器还可以填充或提取消息标头值。

image.png

Channel Adapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

写的时候,浅浅的翻阅了下源码,大致是这三个类,等看了后面的案例,然后再看下这几个类,流程还是很容易懂的。

image.png

连接MQTT的代码在MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe() 中。

只是在官方文档中,挑选了部分概念拿出来简单的讲述了一下,有很多的文字还是直接copy 的官网文档,感兴趣的话,还是更建议你去拜读官方文档,祝你能有所收获。

三、图:Spring Integration 案例大致流程

在讲代码之前,画了一张图,简单讲述一下大致数据流转流程是什么样的,同时也便于理解后面的代码是如何的(见谅,不好改成竖图啦)

image.png

数据的大致流转过程就如上图这般,将这副图和上文中所谈及的概念,关联起来,应该能理解大部分啦。

具体的 Spring Integration 的流程图,其实远比这张图的流程要复杂(主要是牵扯到的上层抽象比较多),上图更多的是对后面的案例中的数据的一个数据流转图,让大家能更好的理解代码。

四、完整案例:使用 Spring Integration 整合 MQTT

代码主要借鉴于大疆官方开源项目 (大疆的上云API的一个DEMO项目),主体部分更是如此,可以说是弄了一个简化版,然后写下了这篇学习的博客

笔者DEMO项目地址: springboot-integration-mqtt-demo

4.1、项目结构

image.png

就常规项目结构,普通且简单~

相关依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

4.2、配置文件和MqttConfiguration

yaml配置文件:

    server:
      port: 9876
    spring:
      application:
        name: spring-integration-mqtt-demo
    mqtt:
      # BASIC parameters are required.
      BASIC:
        protocol: MQTT
        host: 192.168.79.133
        port: 1883
        username:
        password:
        client-id: 123456
        # If the protocol is ws/wss, this value is required.
        path:
        # 在最初连接到mqtt时需要订阅的主题,多个主题用“,”分隔。
        inbound-topic: mysys/+/envents_test
      # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
      DRC:
        protocol: WS
        host: 192.168.79.133
        port: 8083
        path: /mqtt
    logging:
      level:
        com.com.example.mqtt: debug
      file:
        name: logs/springboot-integration-mqtt-demo.log

具体的MQTT的连接参数是在红框标记的地方整合到 MqttConnectOptions 中的,但实际上它是采用MqttUseEnum 枚举的方式将yaml配置文件的参数映射到MqttClientOptions ,坦白说,用起来是真的舒服啊

image.png

主要是两个地方:

1、一个使用枚举类来映射ymal文件,可以学习学习

2、MqttConnectOptions 是基础的一些设置,比如配置认证参数、设置超时时间等连接Broker的连接参数,细节可以等到使用的时候再进一步观察。

不过DRC 那部分(主要用于websocket),不过没整合到这个案例中,下次吧,下次吧。

4.3、MessageChannel

写了这么多,都忘记说了说明 MessageChannel 啦,实际上,诸如@ServiceActivator(inputChannel = ChannelName.DEFAULT)都是提前注册在bean当中的,否则是没法使用的。

这一点,我在前文的编写中,忘记啦。

    @Configuration
    public class MqttMessageChannel {
        @Autowired
        private Executor threadPool;
        @Bean(name = ChannelName.INBOUND)
        public MessageChannel inboundChannel() {
            return new ExecutorChannel(threadPool);
        }
        @Bean(name = ChannelName.ENVENTS_INBOUND_TEST)
        public MessageChannel enventsInboundTest() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST1)
        public MessageChannel inboundTaskTest1() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST2)
        public MessageChannel inboundTaskTest2() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST3)
        public MessageChannel inboundTaskTest3() {
            return new DirectChannel();
        }
    }

补充:DirectChannel 是其中的一种消息通道,是一个点对点的通道,它直接将消息分派给订阅者,同时也是最常用的通道。

Channel的具体的实现有多种,可参考官方文档:Message Channels

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3:https://developer.aliyun.com/article/1394929

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8天前
|
Java 应用服务中间件 Nacos
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
21 0
|
11天前
|
监控 数据可视化 安全
一套成熟的Spring Cloud智慧工地平台源码,自主版权,开箱即用
这是一套基于Spring Cloud的智慧工地管理平台源码,具备自主版权,易于使用。平台运用现代技术如物联网、大数据等改进工地管理,服务包括建设各方,提供人员、车辆、视频监控等七大维度的管理。特色在于可视化管理、智能报警、移动办公和分布计算存储。功能涵盖劳务实名制管理、智能考勤、视频监控AI识别、危大工程监控、环境监测、材料管理和进度管理等,实现工地安全、高效的智慧化管理。
|
1天前
|
设计模式 安全 Java
【初学者慎入】Spring源码中的16种设计模式实现
以上是威哥给大家整理了16种常见的设计模式在 Spring 源码中的运用,学习 Spring 源码成为了 Java 程序员的标配,你还知道Spring 中哪些源码中运用了设计模式,欢迎留言与威哥交流。
|
4天前
|
SQL Java 数据库连接
Spring脚手架集成分页插件
Spring脚手架集成分页插件
6 0
|
4天前
|
Java Spring
Spring Boot脚手架集成校验框架
Spring Boot脚手架集成校验框架
12 0
|
6天前
|
XML 人工智能 Java
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
|
12天前
|
Java Maven Nacos
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
Spring Cloud Eureka 服务注册和服务发现超详细(附加--源码实现案例--及实现逻辑图)
24 0
|
13天前
|
Java 关系型数据库 MySQL
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。一套UWB精确定位系统,最高定位精度可达10cm,具有高精度,高动态,高容量,低功耗的应用。
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
|
2月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
2月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
76 0