关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928

4.4、入站适配器MqttInboundConfiguration

    @Slf4j
    @Configuration
    @IntegrationComponentScan
    public class MqttInboundConfiguration {
        @Autowired
        private MqttPahoClientFactory mqttClientFactory;
        @Resource(name = ChannelName.INBOUND)
        private MessageChannel inboundChannel;
        /**
         * Clients of inbound message channels.
         * @return
         */
        @Bean(name = "adapter")
        public MessageProducerSupport mqttInbound() {
            MqttClientOptions options = MqttConfiguration.getBasicClientOptions();
    // 此处在初始化的时候,初始化时,默认订阅了配置文件中的已经写定的 topic
    // 如果后期有需要再增加的订阅主题,调用 addTopic() 即可
            MqttPahoMessageDrivenChannelAdapter adapter = 
                      new MqttPahoMessageDrivenChannelAdapter(
                        options.getClientId() + "_consumer_" + System.currentTimeMillis(),
                        mqttClientFactory, options.getInboundTopic().split(","));
            DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
            // use byte types uniformly
            converter.setPayloadAsBytes(true);
    // 设置消息转换器
            adapter.setConverter(converter);
            adapter.setQos(1);
          // 设置在接收已经订阅的主题信息后,发送给那个通道,具体的发送方法需要翻上层的抽象类
            adapter.setOutputChannel(inboundChannel);
            return adapter;
        }
        /**
         * Define a default channel to handle messages that have no effect.
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = ChannelName.DEFAULT)
        public MessageHandler defaultInboundHandler() {
            return message -> {
                log.info("The default channel does not handle messages." +
                        "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
                        "\nPayload: " + message.getPayload());
            };
        }
    }

主要是两个地方:

1、@IntegrationComponentScan ,开启 Spring Integration 的注解扫描,扫描我们写的 @ServiceActivator(inputChannel = ChannelName.DEFAULT)、``@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)等等

2、MqttPahoMessageDrivenChannelAdapter 实现了 MessageProducerSupport 接口,同时也是最后的实现类。故此有较多的具体实现是在这个类中的。

4.5、Message Router

    @Component
    @Slf4j
    public class InboundMessageRouter extends AbstractMessageRouter {
        /**
         * All mqtt broker messages will arrive here before distributing them to different channels.
         * @param message message from mqtt broker
         * @return channel
         *
         * 全部节点的信息都会先从这里过,然后再查询TopicEnum中的方法,寻找到相应的通道(也就是代码中已经注册的Channel)
         * 举个例子:就比如我现在使用MQTTX向 mysys/envents_test (broker)发送一个消息
         * 首先会经过这里,然后我们根据 mysys/envents_test 在 TopicEnum.find(topic) 寻找,
         * 找到相应的通道为:STATE_ENVENTS(Pattern.compile("^"+MY_BASIC_PRE+ENVENTS_TEST+"$"), ChannelName.ENVENTS_INBOUND_TEST),
         * 即找到一个 ChannelName 为 ENVENTS_INBOUND_TEST 通道(这个通道我们已经注册在Spring 中啦)
         *
         * 找到这个通道后,我们会将消息投递到这个通道去
         * 接下来就是看是谁订阅了这个通道的消息,那么就会接着处理这个消息
         * 比如我们的案例中是由 EnventsTestRouter 这个二级路由订阅了消息通道,来进行消息的再次分发,
         * 在这里的时候,EnventsTestRouter 可以不再是根据节点的名称来进行处理,而是具体的消息来进行二次处理,比如指定要判断消息中的某一个字段是什么
         * 从而再交由谁处理(即再次投递到哪个 ChannelName 中去)
         *
         */
        @Override
        @Router(inputChannel = ChannelName.INBOUND)
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            MessageHeaders headers = message.getHeaders();
            String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
            byte[] payload = (byte[])message.getPayload();
            log.info("received topic :{} \t payload :{}", topic, new String(payload));
            TopicEnum topicEnum = TopicEnum.find(topic);
            MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
            return Collections.singleton(bean);
        }
    }

补充:所有的入站信息,都会率先经过这里。determineTargetChannels的实际作用并非是分发,而是找到需要接收的Channel(信道),具体的调用是在上层的抽象类 AbstractMessageRouter.handleMessageInternal 方法内,具体的分发也是在这个方法的下半部分。

4.6、IntegrationFlow Java DSL

    @Bean
    public IntegrationFlow myTestMethodRouterFlow() {
        return IntegrationFlows
                .from(ChannelName.ENVENTS_INBOUND_TEST)
                .<byte[], CommonTopicReceiver>transform(payload -> {
                    try {
                        return mapper.readValue(payload, CommonTopicReceiver.class);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return new CommonTopicReceiver();
                })
                .<CommonTopicReceiver, EnventsTestMethodEnum>route(
                        receiver -> EnventsTestMethodEnum.find(receiver.getMethod()),
                        mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(
                                methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())))
                .get();
    }

具体的API使用,我没有牵扯太多,简单的说一下方法:

1、form(),接收来自 ChannelName.ENVENTS_INBOUND_TEST 通道的消息

2、transform(),将接收的消息转换自己需要的类型,我这里是将 byte[]转换为 CommonTopicReceiver 类型

3、route(),这个方法,怎么说,坦白说我自己也想了蛮久的,源码也看了,网上资料也查了,大部分都没有解答我的疑惑。我先说作用:这是一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route**方法来完成的。

疑惑的点在哪里呢?

<CommonTopicReceiver, EnventsTestMethodEnum>route(
                    receiver -> EnventsTestMethodEnum.find(receiver.getMethod()),
                    mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(
                            methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())));

有没有人注意到route()方法的第二个参数 mapping 话说,这个 mapping 是怎么来的?我也没有定义。正确方法:问GPT(手头狗头)

第一遍解释:

.route(...) :这是整个代码片段中最复杂的部分,它定义了一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route**方法来完成的。

  • receiver -> EnventsTestMethodEnum.find(receiver.getMethod()) :这个表达式是一个Lambda表达式,它接受一个**CommonTopicReceiver对象作为输入,并根据该对象的方法(getMethod())返回一个EnventsTestMethodEnum**枚举值。它的作用是决定消息应该被路由到哪个通道。
  • mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(...) :这个表达式也是一个Lambda表达式,它接受一个**mapping参数,该参数用于定义路由规则。在这里,它遍历了所有的EnventsTestMethodEnum枚举值,然后通过.channelMapping()**方法将每个枚举值映射到相应的输出通道。

看完还是不理解,然后我又拆出来,单独询问了一遍:

image.png

看完这个就大致明白啦,这个参数是 Spring Integration 由框架自动传递给Lambda表达式的参数。

从其他博主那找了一个简单案例:Spring Integration提供了一个IntegrationFlow来定义系统继承流程,而通过IntegrationFlowsIntegrationFlowBuilder来实现使用Fluent API来定义流程。在Fulent API里,分别提供了下面方法来映射Spring Integration的端点(EndPoint)。

    transform() -> Transformer
    filter() -> Filter
    handle() -> ServiceActivator、Adapter、Gateway
    split() -> Splitter
    aggregate() -> Aggregator
    route() -> Router
    bridge() -> Bridge

一个简单的流程定义如下:

    @Bean
    public IntegrationFlow demoFlow(){
        return IntegrationFlows.from("input")  //从Channel  input获取消息
          .<String,Integer>transform(Integer::parseint) //将消息转换成整数
          .get();  //获得集成流程并注册为Bean
    }

原文链接:blog.csdn.net/qq_40929047…

4.7、Message Handler

关于 Message Handler 我在入站适配器的配置类中,有配置过一个默认的消息处理器(通常用来兜底的)

    /**
     * Define a default channel to handle messages that have no effect.
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.DEFAULT)
    public MessageHandler defaultInboundHandler() {
        return message -> {
            log.info("The default channel does not handle messages." +
                    "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
                    "\nPayload: " + message.getPayload());
        };
    }

上面的@ServiceActivator(inputChannel = ChannelName.DEFAULT) 就是表明这是处理从 DEFAULT通道的消息处理方法。

但实际上能够处理消息的并非只有 MessageHandler 类,只要在 xxxxxServiceImpl (已经注册到bean)上标记@ServiceActivator(inputChannel = ChannelName.xxxx) 即可处理来自 xxxx 的消息,如果消息不再需要继续传递,那么到这里即是消息的终点啦

比如案例中:

@Service
@Slf4j
public class EnventsTestServiceImpl implements EnventsTestService {
    @Autowired
    private IMessageSenderService messageSenderService;
    @Autowired
    private ObjectMapper mapper;
    @Override
    @ServiceActivator(inputChannel = ChannelName.INBOUND_TASK_TEST2, outputChannel = ChannelName.OUTBOUND_TEST_REPLY)
    public CommonTopicReceiver handleInboundTest1Reply(CommonTopicReceiver receiver, MessageHeaders headers) {
        String dockSn  = receiver.getGateway();
        log.info("handleInboundTest1");
        log.info("dockSn:{}",dockSn);
        log.info("receiver:{}",receiver);
        log.info("headers:{}",headers);
        return receiver;
    }
    @ServiceActivator(inputChannel = ChannelName.OUTBOUND_TEST_REPLY,outputChannel = ChannelName.OUTBOUND)
    @Override
    public void handleOutboundTestReply(CommonTopicReceiver receiver, MessageHeaders headers) {
        log.info("handleOutboundTest");
        log.info("receiver:{}",receiver);
        log.info("headers:{}",headers);
        CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                .tid("receiver.getTid()")
                .bid("receiver.getBid()")
                .method("reply")
                .timestamp(System.currentTimeMillis())
                .data(RequestsReply.success())
                .build();
        messageSenderService.publish("envents_test/response", build);
    }
}

4.8、订阅主题

不知道看到这里的小伙伴是否还记得基础概念的这张图:

image.png

与外界信息来源进行交互的ChannelAdapter(入站适配器)来做的,在谈到入站适配器的配置时,我们也看到了连接也是它来做的,包括初始化时,可以订阅配置文件中配置的主题。

image.png

那么自然添加新的主题,也是通过它来完成啦,以下为具体实现,调用则是在上层抽象类 AbstractMqttMessageDrivenChannelAdapter 中

image.png

image.png

案例中的应用:

image.png

4.9、向某个主题发送消息和@MessagingGateway注解

坦白说,在我刚看下面这段代码的时候,我也是有些懵的,虽然意思很好猜,就是发送消息,但为啥是这样写,却是完全不懂啦。不过正是因为这些好奇,最后才组成了这篇文章吧

    @Component
    @MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
    public interface IMqttMessageGateway {
        /**
         * Publish a message to a specific topic.
         * @param topic target
         * @param payload   message
         */
        void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
        /**
         * Use a specific qos to push messages to a specific topic.
         * @param topic     target
         * @param payload   message
         * @param qos   qos
         */
        void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
    }

1、简要来说,Messaging Gateway 就是在项目中只定义消息端点的接口(使用 Xml 或者 java 注解标识这个接口),接口的具体实现由 spring 容器实现(具体是 GatewayProxyFactoryBean 来创建接口实现)。Messaging Gateway 产生的消息将根据消息头中的 request-channel 发送到对应的 channel,并由 reply-channel 中获取响应。

GatewayProxyFactoryBean 创建动态代理对象,拦截发送Mqtt消息的处理,委托给对应的MessageChannel(消息通道),此消息通道是通过@MessagingGateway注解的defaultRequestChannel属性来配置的。 后面再由订阅这个消息通道的出站适配器进行处理,从而发送到MQTT Broker

总的来说就是定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGatewaydefaultRequestChannel 参数用于绑定具体的 MessageChannel

2、对于接口方法中的参数,默认是以 Map 作为消息头而具体的类作为消息的负载(payload),也可以使用 @Header,@Payload 参数注解指定。

3、对于没有参数的方法,这意味着不需要调用者传入而是借由 Messaging Gateway 自动生成。

4、对于消息处理过程中的异常,默认情况下会层层的向上传递,为了捕获相应的异常,可以在接口的方法上添加 throws 关键字定义需要捕获的异常。除此之外,还可以通过指定一个 errorChannel 将错误由指定的消息消费者处理。

案例中的应用:

image.png

4.10、出站适配器

谈到这个,还是把上面的图扒拉下来:

image.png

不过这里的出站适配器是由MqttPahoMessageHandler实现的。

    @Configuration
    public class MqttOutboundConfiguration {
        @Autowired
        private MqttPahoClientFactory mqttClientFactory;
        /**
         * Clients of outbound message channels.
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                    MqttConfiguration.getBasicClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(),
                    mqttClientFactory);
            DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
            // use byte types uniformly
            converter.setPayloadAsBytes(true);
            messageHandler.setAsync(true);
            messageHandler.setDefaultQos(0);
            messageHandler.setConverter(converter);
            return messageHandler;
        }
    }

也可以使用 Java DSL 的方式配置出站适配器,如下示例:

    @Bean
    public IntegrationFlow mqttOutboundFlow() {
      return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

4.11、测试效果

image.png

如果启动项目后,要订阅新的主题:

image.png

image.png


具体代码:

    @RestController
    @RequestMapping("/topic")
    public class MqttTopicController {
        @Autowired
        private IMqttTopicService mqttTopicService;
        @Autowired
        private IMessageSenderService messageSenderService;
        @GetMapping("/add")
        public String add(String topic){
            mqttTopicService.subscribe(topic);
            return topic+"添加成功";
        }
        @GetMapping("/pulish")
        public String pulish(String topic){
            CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                    .tid("receiver.getTid()")
                    .bid("receiver.getBid()")
                    .method("reply")
                    .timestamp(System.currentTimeMillis())
                    .data(RequestsReply.success())
                    .build();
            messageSenderService.publish(topic, build);
            return "向"+topic+"发送消息";
        }
        @GetMapping("/reply")
        public CommonTopicResponse reply(){
            CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                    .tid("receiver.getTid()")
                    .bid("receiver.getBid()")
                    .method("reply")
                    .timestamp(System.currentTimeMillis())
                    .data(RequestsReply.success())
                    .build();
            messageSenderService.publish("test/9876", build);
            return build;
        }
    }

只是进行了非常的简单的测试,更多需要使用的,还是需要自己亲自去测试更佳。

五、Spring Integration 的优缺点

看到这里,不知道你有感受到了哪些关于 Spring Integration 的优缺点呢

优点

  1. 解耦。借助官网的这句话“业务逻辑和集成逻辑之间的关注点分离“,业务逻辑是我们处理消息的部分,集成逻辑是消息传递的部分,在使用Spring Integration 后,消息生产者和消息消费者不再具有强藕性。
  2. 模块化: Spring Integration 使用模块化的设计,你可以根据需要选择性地添加不同的模块,例如消息通道、消息路由、消息转换等,以满足你的集成需求。
  3. 多种通信协议支持: Spring Integration 支持多种通信协议,包括HTTP、FTP、JMS、AMQP、SMTP等,使得你可以轻松地与不同系统进行通信。
  4. 与Spring生态集成度高,因为本身就出自于Spring家族,从集成度而言,比其他第三方框架要好的多。
  5. 应对复杂场景更轻松。可以根据自己的需求定制消息处理器、消息通道和路由规则,以满足复杂的集成场景。

缺点

  • 学习成本。如果是没接触过的朋友,Spring Integration 还是有一定的学习曲线的。
  • 过度 。Spring Integration 确实优点不少,但是如果你的项目并不是十分复杂,那么使用它其实有可能是繁琐和复杂的。
  • 复杂。虽然能更好的应对复杂场景,但是复杂场景下,它的配置也会变得复杂,它的维护和管理也会逐渐变得困难。这其实是一个系统发展不可避免的一个问题,当你遇上此问题时,那么也是你该进行知识输入的时候啦。
  • 适用于特定场景

总的来说,Spring Integration 是一个强大的企业集成框架,可以帮助你解决复杂的集成问题。但在选择使用它之前,你需要考虑你的具体需求、团队的经验和项目的复杂性。如果你的集成需求相对简单,可能有更轻量级的替代方案可供选择。

补充:此处多参考于ChatGPT。

参考

  1. EMQX Docker 部署指南
  2. Spring Integration 文档
  3. Spring 5 实战
  4. spring MessagingGateway 简介
  5. spring boot + mqtt 物联网开发
  6. ChatGPT

最后

这篇更多的是一个学习过程中的记录,代码的实现也是Demo,规范以及实用性仍然是有不足之处,更多的是提供参考,而非直接使用的。如果你有更好的方式,那么不妨在评论区中写下你的想法,还望朋友们不吝赐教。


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
27天前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
27天前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
28天前
|
监控 Cloud Native Java
Spring Integration 企业集成模式技术详解与实践指南
本文档全面介绍 Spring Integration 框架的核心概念、架构设计和实际应用。作为 Spring 生态系统中的企业集成解决方案,Spring Integration 基于著名的 Enterprise Integration Patterns(EIP)提供了轻量级的消息驱动架构。本文将深入探讨其消息通道、端点、过滤器、转换器等核心组件,以及如何构建可靠的企业集成解决方案。
93 0
|
2月前
|
设计模式 Java 开发者
如何快速上手【Spring AOP】?从动态代理到源码剖析(下篇)
Spring AOP的实现本质上依赖于代理模式这一经典设计模式。代理模式通过引入代理对象作为目标对象的中间层,实现了对目标对象访问的控制与增强,其核心价值在于解耦核心业务逻辑与横切关注点。在框架设计中,这种模式广泛用于实现功能扩展(如远程调用、延迟加载)、行为拦截(如权限校验、异常处理)等场景,为系统提供了更高的灵活性和可维护性。
|
3月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
892 5
|
6月前
|
前端开发 Java 物联网
智慧班牌源码,采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署
智慧班牌系统是一款基于信息化与物联网技术的校园管理工具,集成电子屏显示、人脸识别及数据交互功能,实现班级信息展示、智能考勤与家校互通。系统采用Java + Spring Boot后端框架,搭配Vue2前端技术,支持SaaS云部署与私有化定制。核心功能涵盖信息发布、考勤管理、教务处理及数据分析,助力校园文化建设与教学优化。其综合性和可扩展性有效打破数据孤岛,提升交互体验并降低管理成本,适用于日常教学、考试管理和应急场景,为智慧校园建设提供全面解决方案。
400 70
|
5月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
194 32
|
7月前
|
存储 监控 数据可视化
SaaS云计算技术的智慧工地源码,基于Java+Spring Cloud框架开发
智慧工地源码基于微服务+Java+Spring Cloud +UniApp +MySql架构,利用传感器、监控摄像头、AI、大数据等技术,实现施工现场的实时监测、数据分析与智能决策。平台涵盖人员、车辆、视频监控、施工质量、设备、环境和能耗管理七大维度,提供可视化管理、智能化报警、移动智能办公及分布计算存储等功能,全面提升工地的安全性、效率和质量。
129 0