关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928

4.4、入站适配器MqttInboundConfiguration

    @Slf4j
    @Configuration
    @IntegrationComponentScan
    public class MqttInboundConfiguration {
        @Autowired
        private MqttPahoClientFactory mqttClientFactory;
        @Resource(name = ChannelName.INBOUND)
        private MessageChannel inboundChannel;
        /**
         * Clients of inbound message channels.
         * @return
         */
        @Bean(name = "adapter")
        public MessageProducerSupport mqttInbound() {
            MqttClientOptions options = MqttConfiguration.getBasicClientOptions();
    // 此处在初始化的时候,初始化时,默认订阅了配置文件中的已经写定的 topic
    // 如果后期有需要再增加的订阅主题,调用 addTopic() 即可
            MqttPahoMessageDrivenChannelAdapter adapter = 
                      new MqttPahoMessageDrivenChannelAdapter(
                        options.getClientId() + "_consumer_" + System.currentTimeMillis(),
                        mqttClientFactory, options.getInboundTopic().split(","));
            DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
            // use byte types uniformly
            converter.setPayloadAsBytes(true);
    // 设置消息转换器
            adapter.setConverter(converter);
            adapter.setQos(1);
          // 设置在接收已经订阅的主题信息后,发送给那个通道,具体的发送方法需要翻上层的抽象类
            adapter.setOutputChannel(inboundChannel);
            return adapter;
        }
        /**
         * Define a default channel to handle messages that have no effect.
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = ChannelName.DEFAULT)
        public MessageHandler defaultInboundHandler() {
            return message -> {
                log.info("The default channel does not handle messages." +
                        "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
                        "\nPayload: " + message.getPayload());
            };
        }
    }

主要是两个地方:

1、@IntegrationComponentScan ,开启 Spring Integration 的注解扫描,扫描我们写的 @ServiceActivator(inputChannel = ChannelName.DEFAULT)、``@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)等等

2、MqttPahoMessageDrivenChannelAdapter 实现了 MessageProducerSupport 接口,同时也是最后的实现类。故此有较多的具体实现是在这个类中的。

4.5、Message Router

    @Component
    @Slf4j
    public class InboundMessageRouter extends AbstractMessageRouter {
        /**
         * All mqtt broker messages will arrive here before distributing them to different channels.
         * @param message message from mqtt broker
         * @return channel
         *
         * 全部节点的信息都会先从这里过,然后再查询TopicEnum中的方法,寻找到相应的通道(也就是代码中已经注册的Channel)
         * 举个例子:就比如我现在使用MQTTX向 mysys/envents_test (broker)发送一个消息
         * 首先会经过这里,然后我们根据 mysys/envents_test 在 TopicEnum.find(topic) 寻找,
         * 找到相应的通道为:STATE_ENVENTS(Pattern.compile("^"+MY_BASIC_PRE+ENVENTS_TEST+"$"), ChannelName.ENVENTS_INBOUND_TEST),
         * 即找到一个 ChannelName 为 ENVENTS_INBOUND_TEST 通道(这个通道我们已经注册在Spring 中啦)
         *
         * 找到这个通道后,我们会将消息投递到这个通道去
         * 接下来就是看是谁订阅了这个通道的消息,那么就会接着处理这个消息
         * 比如我们的案例中是由 EnventsTestRouter 这个二级路由订阅了消息通道,来进行消息的再次分发,
         * 在这里的时候,EnventsTestRouter 可以不再是根据节点的名称来进行处理,而是具体的消息来进行二次处理,比如指定要判断消息中的某一个字段是什么
         * 从而再交由谁处理(即再次投递到哪个 ChannelName 中去)
         *
         */
        @Override
        @Router(inputChannel = ChannelName.INBOUND)
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            MessageHeaders headers = message.getHeaders();
            String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
            byte[] payload = (byte[])message.getPayload();
            log.info("received topic :{} \t payload :{}", topic, new String(payload));
            TopicEnum topicEnum = TopicEnum.find(topic);
            MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
            return Collections.singleton(bean);
        }
    }

补充:所有的入站信息,都会率先经过这里。determineTargetChannels的实际作用并非是分发,而是找到需要接收的Channel(信道),具体的调用是在上层的抽象类 AbstractMessageRouter.handleMessageInternal 方法内,具体的分发也是在这个方法的下半部分。

4.6、IntegrationFlow Java DSL

    @Bean
    public IntegrationFlow myTestMethodRouterFlow() {
        return IntegrationFlows
                .from(ChannelName.ENVENTS_INBOUND_TEST)
                .<byte[], CommonTopicReceiver>transform(payload -> {
                    try {
                        return mapper.readValue(payload, CommonTopicReceiver.class);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return new CommonTopicReceiver();
                })
                .<CommonTopicReceiver, EnventsTestMethodEnum>route(
                        receiver -> EnventsTestMethodEnum.find(receiver.getMethod()),
                        mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(
                                methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())))
                .get();
    }

具体的API使用,我没有牵扯太多,简单的说一下方法:

1、form(),接收来自 ChannelName.ENVENTS_INBOUND_TEST 通道的消息

2、transform(),将接收的消息转换自己需要的类型,我这里是将 byte[]转换为 CommonTopicReceiver 类型

3、route(),这个方法,怎么说,坦白说我自己也想了蛮久的,源码也看了,网上资料也查了,大部分都没有解答我的疑惑。我先说作用:这是一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route**方法来完成的。

疑惑的点在哪里呢?

<CommonTopicReceiver, EnventsTestMethodEnum>route(
                    receiver -> EnventsTestMethodEnum.find(receiver.getMethod()),
                    mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(
                            methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())));

有没有人注意到route()方法的第二个参数 mapping 话说,这个 mapping 是怎么来的?我也没有定义。正确方法:问GPT(手头狗头)

第一遍解释:

.route(...) :这是整个代码片段中最复杂的部分,它定义了一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route**方法来完成的。

  • receiver -> EnventsTestMethodEnum.find(receiver.getMethod()) :这个表达式是一个Lambda表达式,它接受一个**CommonTopicReceiver对象作为输入,并根据该对象的方法(getMethod())返回一个EnventsTestMethodEnum**枚举值。它的作用是决定消息应该被路由到哪个通道。
  • mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(...) :这个表达式也是一个Lambda表达式,它接受一个**mapping参数,该参数用于定义路由规则。在这里,它遍历了所有的EnventsTestMethodEnum枚举值,然后通过.channelMapping()**方法将每个枚举值映射到相应的输出通道。

看完还是不理解,然后我又拆出来,单独询问了一遍:

image.png

看完这个就大致明白啦,这个参数是 Spring Integration 由框架自动传递给Lambda表达式的参数。

从其他博主那找了一个简单案例:Spring Integration提供了一个IntegrationFlow来定义系统继承流程,而通过IntegrationFlowsIntegrationFlowBuilder来实现使用Fluent API来定义流程。在Fulent API里,分别提供了下面方法来映射Spring Integration的端点(EndPoint)。

    transform() -> Transformer
    filter() -> Filter
    handle() -> ServiceActivator、Adapter、Gateway
    split() -> Splitter
    aggregate() -> Aggregator
    route() -> Router
    bridge() -> Bridge

一个简单的流程定义如下:

    @Bean
    public IntegrationFlow demoFlow(){
        return IntegrationFlows.from("input")  //从Channel  input获取消息
          .<String,Integer>transform(Integer::parseint) //将消息转换成整数
          .get();  //获得集成流程并注册为Bean
    }

原文链接:blog.csdn.net/qq_40929047…

4.7、Message Handler

关于 Message Handler 我在入站适配器的配置类中,有配置过一个默认的消息处理器(通常用来兜底的)

    /**
     * Define a default channel to handle messages that have no effect.
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.DEFAULT)
    public MessageHandler defaultInboundHandler() {
        return message -> {
            log.info("The default channel does not handle messages." +
                    "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
                    "\nPayload: " + message.getPayload());
        };
    }

上面的@ServiceActivator(inputChannel = ChannelName.DEFAULT) 就是表明这是处理从 DEFAULT通道的消息处理方法。

但实际上能够处理消息的并非只有 MessageHandler 类,只要在 xxxxxServiceImpl (已经注册到bean)上标记@ServiceActivator(inputChannel = ChannelName.xxxx) 即可处理来自 xxxx 的消息,如果消息不再需要继续传递,那么到这里即是消息的终点啦

比如案例中:

@Service
@Slf4j
public class EnventsTestServiceImpl implements EnventsTestService {
    @Autowired
    private IMessageSenderService messageSenderService;
    @Autowired
    private ObjectMapper mapper;
    @Override
    @ServiceActivator(inputChannel = ChannelName.INBOUND_TASK_TEST2, outputChannel = ChannelName.OUTBOUND_TEST_REPLY)
    public CommonTopicReceiver handleInboundTest1Reply(CommonTopicReceiver receiver, MessageHeaders headers) {
        String dockSn  = receiver.getGateway();
        log.info("handleInboundTest1");
        log.info("dockSn:{}",dockSn);
        log.info("receiver:{}",receiver);
        log.info("headers:{}",headers);
        return receiver;
    }
    @ServiceActivator(inputChannel = ChannelName.OUTBOUND_TEST_REPLY,outputChannel = ChannelName.OUTBOUND)
    @Override
    public void handleOutboundTestReply(CommonTopicReceiver receiver, MessageHeaders headers) {
        log.info("handleOutboundTest");
        log.info("receiver:{}",receiver);
        log.info("headers:{}",headers);
        CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                .tid("receiver.getTid()")
                .bid("receiver.getBid()")
                .method("reply")
                .timestamp(System.currentTimeMillis())
                .data(RequestsReply.success())
                .build();
        messageSenderService.publish("envents_test/response", build);
    }
}

4.8、订阅主题

不知道看到这里的小伙伴是否还记得基础概念的这张图:

image.png

与外界信息来源进行交互的ChannelAdapter(入站适配器)来做的,在谈到入站适配器的配置时,我们也看到了连接也是它来做的,包括初始化时,可以订阅配置文件中配置的主题。

image.png

那么自然添加新的主题,也是通过它来完成啦,以下为具体实现,调用则是在上层抽象类 AbstractMqttMessageDrivenChannelAdapter 中

image.png

image.png

案例中的应用:

image.png

4.9、向某个主题发送消息和@MessagingGateway注解

坦白说,在我刚看下面这段代码的时候,我也是有些懵的,虽然意思很好猜,就是发送消息,但为啥是这样写,却是完全不懂啦。不过正是因为这些好奇,最后才组成了这篇文章吧

    @Component
    @MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
    public interface IMqttMessageGateway {
        /**
         * Publish a message to a specific topic.
         * @param topic target
         * @param payload   message
         */
        void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
        /**
         * Use a specific qos to push messages to a specific topic.
         * @param topic     target
         * @param payload   message
         * @param qos   qos
         */
        void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
    }

1、简要来说,Messaging Gateway 就是在项目中只定义消息端点的接口(使用 Xml 或者 java 注解标识这个接口),接口的具体实现由 spring 容器实现(具体是 GatewayProxyFactoryBean 来创建接口实现)。Messaging Gateway 产生的消息将根据消息头中的 request-channel 发送到对应的 channel,并由 reply-channel 中获取响应。

GatewayProxyFactoryBean 创建动态代理对象,拦截发送Mqtt消息的处理,委托给对应的MessageChannel(消息通道),此消息通道是通过@MessagingGateway注解的defaultRequestChannel属性来配置的。 后面再由订阅这个消息通道的出站适配器进行处理,从而发送到MQTT Broker

总的来说就是定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGatewaydefaultRequestChannel 参数用于绑定具体的 MessageChannel

2、对于接口方法中的参数,默认是以 Map 作为消息头而具体的类作为消息的负载(payload),也可以使用 @Header,@Payload 参数注解指定。

3、对于没有参数的方法,这意味着不需要调用者传入而是借由 Messaging Gateway 自动生成。

4、对于消息处理过程中的异常,默认情况下会层层的向上传递,为了捕获相应的异常,可以在接口的方法上添加 throws 关键字定义需要捕获的异常。除此之外,还可以通过指定一个 errorChannel 将错误由指定的消息消费者处理。

案例中的应用:

image.png

4.10、出站适配器

谈到这个,还是把上面的图扒拉下来:

image.png

不过这里的出站适配器是由MqttPahoMessageHandler实现的。

    @Configuration
    public class MqttOutboundConfiguration {
        @Autowired
        private MqttPahoClientFactory mqttClientFactory;
        /**
         * Clients of outbound message channels.
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                    MqttConfiguration.getBasicClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(),
                    mqttClientFactory);
            DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
            // use byte types uniformly
            converter.setPayloadAsBytes(true);
            messageHandler.setAsync(true);
            messageHandler.setDefaultQos(0);
            messageHandler.setConverter(converter);
            return messageHandler;
        }
    }

也可以使用 Java DSL 的方式配置出站适配器,如下示例:

    @Bean
    public IntegrationFlow mqttOutboundFlow() {
      return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

4.11、测试效果

image.png

如果启动项目后,要订阅新的主题:

image.png

image.png


具体代码:

    @RestController
    @RequestMapping("/topic")
    public class MqttTopicController {
        @Autowired
        private IMqttTopicService mqttTopicService;
        @Autowired
        private IMessageSenderService messageSenderService;
        @GetMapping("/add")
        public String add(String topic){
            mqttTopicService.subscribe(topic);
            return topic+"添加成功";
        }
        @GetMapping("/pulish")
        public String pulish(String topic){
            CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                    .tid("receiver.getTid()")
                    .bid("receiver.getBid()")
                    .method("reply")
                    .timestamp(System.currentTimeMillis())
                    .data(RequestsReply.success())
                    .build();
            messageSenderService.publish(topic, build);
            return "向"+topic+"发送消息";
        }
        @GetMapping("/reply")
        public CommonTopicResponse reply(){
            CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                    .tid("receiver.getTid()")
                    .bid("receiver.getBid()")
                    .method("reply")
                    .timestamp(System.currentTimeMillis())
                    .data(RequestsReply.success())
                    .build();
            messageSenderService.publish("test/9876", build);
            return build;
        }
    }

只是进行了非常的简单的测试,更多需要使用的,还是需要自己亲自去测试更佳。

五、Spring Integration 的优缺点

看到这里,不知道你有感受到了哪些关于 Spring Integration 的优缺点呢

优点

  1. 解耦。借助官网的这句话“业务逻辑和集成逻辑之间的关注点分离“,业务逻辑是我们处理消息的部分,集成逻辑是消息传递的部分,在使用Spring Integration 后,消息生产者和消息消费者不再具有强藕性。
  2. 模块化: Spring Integration 使用模块化的设计,你可以根据需要选择性地添加不同的模块,例如消息通道、消息路由、消息转换等,以满足你的集成需求。
  3. 多种通信协议支持: Spring Integration 支持多种通信协议,包括HTTP、FTP、JMS、AMQP、SMTP等,使得你可以轻松地与不同系统进行通信。
  4. 与Spring生态集成度高,因为本身就出自于Spring家族,从集成度而言,比其他第三方框架要好的多。
  5. 应对复杂场景更轻松。可以根据自己的需求定制消息处理器、消息通道和路由规则,以满足复杂的集成场景。

缺点

  • 学习成本。如果是没接触过的朋友,Spring Integration 还是有一定的学习曲线的。
  • 过度 。Spring Integration 确实优点不少,但是如果你的项目并不是十分复杂,那么使用它其实有可能是繁琐和复杂的。
  • 复杂。虽然能更好的应对复杂场景,但是复杂场景下,它的配置也会变得复杂,它的维护和管理也会逐渐变得困难。这其实是一个系统发展不可避免的一个问题,当你遇上此问题时,那么也是你该进行知识输入的时候啦。
  • 适用于特定场景

总的来说,Spring Integration 是一个强大的企业集成框架,可以帮助你解决复杂的集成问题。但在选择使用它之前,你需要考虑你的具体需求、团队的经验和项目的复杂性。如果你的集成需求相对简单,可能有更轻量级的替代方案可供选择。

补充:此处多参考于ChatGPT。

参考

  1. EMQX Docker 部署指南
  2. Spring Integration 文档
  3. Spring 5 实战
  4. spring MessagingGateway 简介
  5. spring boot + mqtt 物联网开发
  6. ChatGPT

最后

这篇更多的是一个学习过程中的记录,代码的实现也是Demo,规范以及实用性仍然是有不足之处,更多的是提供参考,而非直接使用的。如果你有更好的方式,那么不妨在评论区中写下你的想法,还望朋友们不吝赐教。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
16 2
|
20天前
|
数据采集 监控 前端开发
二级公立医院绩效考核系统源码,B/S架构,前后端分别基于Spring Boot和Avue框架
医院绩效管理系统通过与HIS系统的无缝对接,实现数据网络化采集、评价结果透明化管理及奖金分配自动化生成。系统涵盖科室和个人绩效考核、医疗质量考核、数据采集、绩效工资核算、收支核算、工作量统计、单项奖惩等功能,提升绩效评估的全面性、准确性和公正性。技术栈采用B/S架构,前后端分别基于Spring Boot和Avue框架。
|
10天前
|
前端开发 Java 开发者
Spring生态学习路径与源码深度探讨
【11月更文挑战第13天】Spring框架作为Java企业级开发中的核心框架,其丰富的生态系统和强大的功能吸引了无数开发者的关注。学习Spring生态不仅仅是掌握Spring Framework本身,更需要深入理解其周边组件和工具,以及源码的底层实现逻辑。本文将从Spring生态的学习路径入手,详细探讨如何系统地学习Spring,并深入解析各个重点的底层实现逻辑。
36 9
|
30天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
17天前
|
存储 Java 调度
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
Sppring集成Quartz简单案例详解 包括(添加、停止、恢复、删除任务、获取下次执行时间等)
19 2
|
1月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
110 5
|
1月前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
1月前
|
存储 数据可视化 JavaScript
可视化集成API接口请求+变量绑定+源码输出
可视化集成API接口请求+变量绑定+源码输出
45 4
|
1月前
|
设计模式 JavaScript Java
Spring 事件监听机制源码
Spring 提供了事件发布订阅机制,广泛应用于项目中。本文介绍了如何通过自定义事件类、订阅类和发布类实现这一机制,并展示了如何监听 SpringBoot 启动过程中的多个事件(如 `ApplicationStartingEvent`、`ApplicationEnvironmentPreparedEvent` 等)。通过掌握这些事件,可以更好地理解 SpringBoot 的启动流程。示例代码展示了从事件发布到接收的完整过程。
|
1月前
|
缓存 Java Spring
源码解读:Spring如何解决构造器注入的循环依赖?
本文详细探讨了Spring框架中的循环依赖问题,包括构造器注入和字段注入两种情况,并重点分析了构造器注入循环依赖的解决方案。文章通过具体示例展示了循环依赖的错误信息及常见场景,提出了三种解决方法:重构代码、使用字段依赖注入以及使用`@Lazy`注解。其中,`@Lazy`注解通过延迟初始化和动态代理机制有效解决了循环依赖问题。作者建议优先使用`@Lazy`注解,并提供了详细的源码解析和调试截图,帮助读者深入理解其实现机制。
30 1