关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码1:https://developer.aliyun.com/article/1394927
1.3、常规方式的优缺点
优点:
1、学习成本相对较低,代码理解难度低,上手快。
2、易封装,没有其他框架的限制,自定义化程度高。
缺点:
1、业务耦合性大。较少的主题下,可能还不会有什么感觉,如果后期topic慢慢多了起来,不同的业务有不同的处理方式,你这边都要进行相应处理的时候,就麻烦起来了。要是再出现,针对同一个主题的消息,根据消息体的不同,也要进行不同的处理,就….
2、没有框架,自由度大,相对也意味着代码量相对要大一些,一些没有封装的处理,都需要自己去进行处理。
二、Spring Integration 的基础概念
2.1、是什么
Spring Integration 提供了 Spring 编程模型的扩展,它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度的支持更高级别的抽象。
Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
Spring Integration 支持消息驱动的体系结构,其中控制反转适用于运行时问题,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员也摆脱了复杂的集成责任。
也许你此刻阅读完这里,仍然会很懵,但请相信我,在你看完整篇文章之后,你会完全理解上述文字的。
2.2、什么促使了 Spring Integration 的诞生
Spring Integration 的动机如下:
- 提供用于实施复杂企业集成解决方案的简单模型。
- 在基于 Spring 的应用程序中促进异步、消息驱动的行为。
- 促进现有 Spring 用户直观、增量的采用。
Spring Integration 遵循以下原则:
- 组件应该松散耦合以实现模块化和可测试性。
- 该框架应该强制业务逻辑和集成逻辑之间的关注点分离。
- 扩展点本质上应该是抽象的(但在明确定义的边界内),以促进重用和可移植性。
来自官方文档。
2.3、基础概念
Spring Integraion 有几个比较重要的基础概念,理解完之后,看代码将会变得十分简单,此处只是抽取了常用且本文已经使用到的概念,完整的还请阅读 Spring Integration 文档
1、Message
见名知意就知是我们需要发送或接收的消息。
在 Spring Integration
中,它由有效负载和标头组成。Payload(有效负载)可以是任何类型,Header(标头)包含常用的必需信息,例如 ID、时间戳、相关 ID 和返回地址。标头还用于在连接的传输之间传递值。
2、Message Channel
消息通道代表管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。
因此,消息通道解耦了消息传递组件,并且还为消息拦截和监视提供了便利的点。
实际框架中针对Channel 的实现有多种,后文案例中暂时只使用了点对点的 DirectChannel
通道。
更多Channel的实现,请查阅:Message Channel Implementations
3、Message Transformer
消息转换器负责转换消息的内容或结构并返回修改后的消息。最常见的转换器类型可能是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为java.lang.String
或者是 byte[]
转为Java对象)。
比如后面案例中的一段代码:
4、Message Router
消息路由器负责决定接下来应该接收该消息的一个或多个通道(如果有)。通常,消息路由(Router)可根据消息体类型(Payload Type Router)、消息头的值(Header Value Router)以及定义好的接收表(Recipient List Router)作为条件,来决定消息传递到的通道。
白话文就是我们可以根据信息中的某个字段,判断这条信息,到底要被我们投递到那个通道去。
5、Service Activator
服务激活器是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法有返回值,还可以提供输出消息通道。
服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不需要消息类型参数)。每当服务对象的方法返回一个值时,如果需要,该返回值同样会转换为回复消息(如果它还不是消息类型)。该回复消息被发送到输出通道。
图 4.Service Activator
实际上 Service Activator
在代码中是一个 @ServiceActivator()
注解,如下案例:
6、Channel Adapter
通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站适配器,也可以是出站适配器。通常,通道适配器在消息与从其他系统接收或发送到其他系统的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。根据传输方式,通道适配器还可以填充或提取消息标头值。
Channel Adapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。
写的时候,浅浅的翻阅了下源码,大致是这三个类,等看了后面的案例,然后再看下这几个类,流程还是很容易懂的。
连接MQTT的代码在MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe()
中。
只是在官方文档中,挑选了部分概念拿出来简单的讲述了一下,有很多的文字还是直接copy 的官网文档,感兴趣的话,还是更建议你去拜读官方文档,祝你能有所收获。
三、图:Spring Integration 案例大致流程
在讲代码之前,画了一张图,简单讲述一下大致数据流转流程是什么样的,同时也便于理解后面的代码是如何的(见谅,不好改成竖图啦)
数据的大致流转过程就如上图这般,将这副图和上文中所谈及的概念,关联起来,应该能理解大部分啦。
具体的 Spring Integration 的流程图,其实远比这张图的流程要复杂(主要是牵扯到的上层抽象比较多),上图更多的是对后面的案例中的数据的一个数据流转图,让大家能更好的理解代码。
四、完整案例:使用 Spring Integration 整合 MQTT
代码主要借鉴于大疆官方开源项目 (大疆的上云API的一个DEMO项目),主体部分更是如此,可以说是弄了一个简化版,然后写下了这篇学习的博客
笔者DEMO项目地址: springboot-integration-mqtt-demo
4.1、项目结构
就常规项目结构,普通且简单~
相关依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
4.2、配置文件和MqttConfiguration
yaml配置文件:
server: port: 9876 spring: application: name: spring-integration-mqtt-demo mqtt: # BASIC parameters are required. BASIC: protocol: MQTT host: 192.168.79.133 port: 1883 username: password: client-id: 123456 # If the protocol is ws/wss, this value is required. path: # 在最初连接到mqtt时需要订阅的主题,多个主题用“,”分隔。 inbound-topic: mysys/+/envents_test # 此部分是提供给后端生成token返回给前端,让前端使用websocket方式去和MQTT实现交互的,笔者此文的案例中并没有去实现 DRC: protocol: WS host: 192.168.79.133 port: 8083 path: /mqtt logging: level: com.com.example.mqtt: debug file: name: logs/springboot-integration-mqtt-demo.log
具体的MQTT的连接参数是在红框标记的地方整合到 MqttConnectOptions
中的,但实际上它是采用MqttUseEnum
枚举的方式将yaml配置文件的参数映射到MqttClientOptions
,坦白说,用起来是真的舒服啊
主要是两个地方:
1、一个使用枚举类来映射ymal文件,可以学习学习
2、MqttConnectOptions
是基础的一些设置,比如配置认证参数、设置超时时间等连接Broker的连接参数,细节可以等到使用的时候再进一步观察。
不过DRC 那部分(主要用于websocket),不过没整合到这个案例中,下次吧,下次吧。
4.3、MessageChannel
写了这么多,都忘记说了说明 MessageChannel 啦,实际上,诸如@ServiceActivator(inputChannel = ChannelName.DEFAULT)
都是提前注册在bean当中的,否则是没法使用的。
这一点,我在前文的编写中,忘记啦。
@Configuration public class MqttMessageChannel { @Autowired private Executor threadPool; @Bean(name = ChannelName.INBOUND) public MessageChannel inboundChannel() { return new ExecutorChannel(threadPool); } @Bean(name = ChannelName.ENVENTS_INBOUND_TEST) public MessageChannel enventsInboundTest() { return new DirectChannel(); } @Bean(name = ChannelName.INBOUND_TASK_TEST1) public MessageChannel inboundTaskTest1() { return new DirectChannel(); } @Bean(name = ChannelName.INBOUND_TASK_TEST2) public MessageChannel inboundTaskTest2() { return new DirectChannel(); } @Bean(name = ChannelName.INBOUND_TASK_TEST3) public MessageChannel inboundTaskTest3() { return new DirectChannel(); } }
补充:DirectChannel
是其中的一种消息通道,是一个点对点的通道,它直接将消息分派给订阅者,同时也是最常用的通道。
Channel的具体的实现有多种,可参考官方文档:Message Channels
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3:https://developer.aliyun.com/article/1394929