关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1:https://developer.aliyun.com/article/1394927

1.3、常规方式的优缺点

优点:

1、学习成本相对较低,代码理解难度低,上手快。

2、易封装,没有其他框架的限制,自定义化程度高。

缺点:

1、业务耦合性大。较少的主题下,可能还不会有什么感觉,如果后期topic慢慢多了起来,不同的业务有不同的处理方式,你这边都要进行相应处理的时候,就麻烦起来了。要是再出现,针对同一个主题的消息,根据消息体的不同,也要进行不同的处理,就….

2、没有框架,自由度大,相对也意味着代码量相对要大一些,一些没有封装的处理,都需要自己去进行处理。

二、Spring Integration 的基础概念

Spring Integration 5.5 版本文档

2.1、是什么

Spring Integration 提供了 Spring 编程模型的扩展,它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。

Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

Spring Integration 支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员也摆脱了复杂的集成责任。


也许你此刻阅读完这里,仍然会很懵,但请相信我,在你看完整篇文章之后,你会完全理解上述文字的。

2.2、什么促使了 Spring Integration 的诞生

Spring Integration 的动机如下:

  • 提供用于实施复杂企业集成解决方案的简单模型。
  • 在基于 Spring 的应用程序中促进异步、消息驱动的行为。
  • 促进现有 Spring 用户直观、增量的采用。

Spring Integration 遵循以下原则:

  • 组件应该松散耦合以实现模块化和可测试性。
  • 该框架应该强制业务逻辑和集成逻辑之间的关注点分离。
  • 扩展点本质上应该是抽象的(但在明确定义的边界内),以促进重用和可移植性。

来自官方文档。

2.3、基础概念

Spring Integraion 有几个比较重要的基础概念,理解完之后,看代码将会变得十分简单,此处只是抽取了常用且本文已经使用到的概念,完整的还请阅读 Spring Integration 文档

1、Message 见名知意就知是我们需要发送或接收的消息。

Spring Integration 中,它由有效负载和标头组成。Payload(有效负载)可以是任何类型,Header(标头)包含常用的必需信息,例如 ID、时间戳、相关 ID 和返回地址。标头还用于在连接的传输之间传递值。

image.png

2、Message Channel 消息通道代表管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。

因此,消息通道解耦了消息传递组件,并且还为消息拦截和监视提供了便利的点。

image.png

实际框架中针对Channel 的实现有多种,后文案例中暂时只使用了点对点的 DirectChannel 通道。

更多Channel的实现,请查阅:Message Channel Implementations

3、Message Transformer 消息转换器负责转换消息的内容或结构并返回修改后的消息。最常见的转换器类型可能是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为java.lang.String 或者是 byte[] 转为Java对象)。

比如后面案例中的一段代码:

image.png

4、Message Router 消息路由器负责决定接下来应该接收该消息的一个或多个通道(如果有)。通常,消息路由(Router)可根据消息体类型(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表(Recipient List Router)作为条件,来决定消息传递到的通道。

image.png

白话文就是我们可以根据信息中的某个字段,判断这条信息,到底要被我们投递到那个通道去

5、Service Activator 服务激活器是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法有返回值,还可以提供输出消息通道。

服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不需要消息类型参数)。每当服务对象的方法返回一个值时,如果需要,该返回值同样会转换为回复消息(如果它还不是消息类型)。该回复消息被发送到输出通道。

image.png

图 4.Service Activator

image.png实际上 Service Activator 在代码中是一个 @ServiceActivator()注解,如下案例:


6、Channel Adapter通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站适配器,也可以是出站适配器。通常,通道适配器在消息与从其他系统接收或发送到其他系统的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。根据传输方式,通道适配器还可以填充或提取消息标头值。

image.png

Channel Adapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

写的时候,浅浅的翻阅了下源码,大致是这三个类,等看了后面的案例,然后再看下这几个类,流程还是很容易懂的。

image.png

连接MQTT的代码在MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe() 中。

只是在官方文档中,挑选了部分概念拿出来简单的讲述了一下,有很多的文字还是直接copy 的官网文档,感兴趣的话,还是更建议你去拜读官方文档,祝你能有所收获。

三、图:Spring Integration 案例大致流程

在讲代码之前,画了一张图,简单讲述一下大致数据流转流程是什么样的,同时也便于理解后面的代码是如何的(见谅,不好改成竖图啦)

image.png

数据的大致流转过程就如上图这般,将这副图和上文中所谈及的概念,关联起来,应该能理解大部分啦。

具体的 Spring Integration 的流程图,其实远比这张图的流程要复杂(主要是牵扯到的上层抽象比较多),上图更多的是对后面的案例中的数据的一个数据流转图,让大家能更好的理解代码。

四、完整案例:使用 Spring Integration 整合 MQTT

代码主要借鉴于大疆官方开源项目 (大疆的上云API的一个DEMO项目),主体部分更是如此,可以说是弄了一个简化版,然后写下了这篇学习的博客

笔者DEMO项目地址: springboot-integration-mqtt-demo

4.1、项目结构

image.png

就常规项目结构,普通且简单~

相关依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

4.2、配置文件和MqttConfiguration

yaml配置文件:

    server:
      port: 9876
    spring:
      application:
        name: spring-integration-mqtt-demo
    mqtt:
      # BASIC parameters are required.
      BASIC:
        protocol: MQTT
        host: 192.168.79.133
        port: 1883
        username:
        password:
        client-id: 123456
        # If the protocol is ws/wss, this value is required.
        path:
        # 在最初连接到mqtt时需要订阅的主题,多个主题用“,”分隔。
        inbound-topic: mysys/+/envents_test
      # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现
      DRC:
        protocol: WS
        host: 192.168.79.133
        port: 8083
        path: /mqtt
    logging:
      level:
        com.com.example.mqtt: debug
      file:
        name: logs/springboot-integration-mqtt-demo.log

具体的MQTT的连接参数是在红框标记的地方整合到 MqttConnectOptions 中的,但实际上它是采用MqttUseEnum 枚举的方式将yaml配置文件的参数映射到MqttClientOptions ,坦白说,用起来是真的舒服啊

image.png

主要是两个地方:

1、一个使用枚举类来映射ymal文件,可以学习学习

2、MqttConnectOptions 是基础的一些设置,比如配置认证参数、设置超时时间等连接Broker的连接参数,细节可以等到使用的时候再进一步观察。

不过DRC 那部分(主要用于websocket),不过没整合到这个案例中,下次吧,下次吧。

4.3、MessageChannel

写了这么多,都忘记说了说明 MessageChannel 啦,实际上,诸如@ServiceActivator(inputChannel = ChannelName.DEFAULT)都是提前注册在bean当中的,否则是没法使用的。

这一点,我在前文的编写中,忘记啦。

    @Configuration
    public class MqttMessageChannel {
        @Autowired
        private Executor threadPool;
        @Bean(name = ChannelName.INBOUND)
        public MessageChannel inboundChannel() {
            return new ExecutorChannel(threadPool);
        }
        @Bean(name = ChannelName.ENVENTS_INBOUND_TEST)
        public MessageChannel enventsInboundTest() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST1)
        public MessageChannel inboundTaskTest1() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST2)
        public MessageChannel inboundTaskTest2() {
            return new DirectChannel();
        }
        @Bean(name = ChannelName.INBOUND_TASK_TEST3)
        public MessageChannel inboundTaskTest3() {
            return new DirectChannel();
        }
    }

补充:DirectChannel 是其中的一种消息通道,是一个点对点的通道,它直接将消息分派给订阅者,同时也是最常用的通道。

Channel的具体的实现有多种,可参考官方文档:Message Channels

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3:https://developer.aliyun.com/article/1394929

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
273 70
|
4月前
|
人工智能 Cloud Native 安全
DeepSeek + Higress AI 网关/Spring AI Alibaba 案例征集
诚挚地感谢每一位持续关注并使用 Higress 和 Spring AI Alibaba 的朋友,DeepSeek + Higress AI 网关/Spring AI Alibaba 案例征集中。
474 51
|
7月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
203 2
|
3月前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
|
5月前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
210 7
|
6月前
|
存储 缓存 Java
Spring面试必问:手写Spring IoC 循环依赖底层源码剖析
在Spring框架中,IoC(Inversion of Control,控制反转)是一个核心概念,它允许容器管理对象的生命周期和依赖关系。然而,在实际应用中,我们可能会遇到对象间的循环依赖问题。本文将深入探讨Spring如何解决IoC中的循环依赖问题,并通过手写源码的方式,让你对其底层原理有一个全新的认识。
147 2
|
7月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
227 12
|
7月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
126 2
|
7月前
|
Java Maven Spring
Spring 小案例体验创建对象的快感
本文介绍了如何在IDEA中创建一个Spring项目,包括项目创建、配置pom.xml文件以引入必要的依赖、编写实体类HelloSpring及其配置文件applicationContext.xml,最后通过测试类TestHelloSpring展示如何使用Spring的bean创建对象并调用方法。
62 0
|
8月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
636 53

热门文章

最新文章