关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3

简介: 关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码

关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928

4.4、入站适配器MqttInboundConfiguration

    @Slf4j
    @Configuration
    @IntegrationComponentScan
    public class MqttInboundConfiguration {
        @Autowired
        private MqttPahoClientFactory mqttClientFactory;
        @Resource(name = ChannelName.INBOUND)
        private MessageChannel inboundChannel;
        /**
         * Clients of inbound message channels.
         * @return
         */
        @Bean(name = "adapter")
        public MessageProducerSupport mqttInbound() {
            MqttClientOptions options = MqttConfiguration.getBasicClientOptions();
    // 此处在初始化的时候,初始化时,默认订阅了配置文件中的已经写定的 topic
    // 如果后期有需要再增加的订阅主题,调用 addTopic() 即可
            MqttPahoMessageDrivenChannelAdapter adapter = 
                      new MqttPahoMessageDrivenChannelAdapter(
                        options.getClientId() + "_consumer_" + System.currentTimeMillis(),
                        mqttClientFactory, options.getInboundTopic().split(","));
            DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
            // use byte types uniformly
            converter.setPayloadAsBytes(true);
    // 设置消息转换器
            adapter.setConverter(converter);
            adapter.setQos(1);
          // 设置在接收已经订阅的主题信息后,发送给那个通道,具体的发送方法需要翻上层的抽象类
            adapter.setOutputChannel(inboundChannel);
            return adapter;
        }
        /**
         * Define a default channel to handle messages that have no effect.
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = ChannelName.DEFAULT)
        public MessageHandler defaultInboundHandler() {
            return message -> {
                log.info("The default channel does not handle messages." +
                        "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
                        "\nPayload: " + message.getPayload());
            };
        }
    }

主要是两个地方:

1、@IntegrationComponentScan ,开启 Spring Integration 的注解扫描,扫描我们写的 @ServiceActivator(inputChannel = ChannelName.DEFAULT)、``@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)等等

2、MqttPahoMessageDrivenChannelAdapter 实现了 MessageProducerSupport 接口,同时也是最后的实现类。故此有较多的具体实现是在这个类中的。

4.5、Message Router

    @Component
    @Slf4j
    public class InboundMessageRouter extends AbstractMessageRouter {
        /**
         * All mqtt broker messages will arrive here before distributing them to different channels.
         * @param message message from mqtt broker
         * @return channel
         *
         * 全部节点的信息都会先从这里过,然后再查询TopicEnum中的方法,寻找到相应的通道(也就是代码中已经注册的Channel)
         * 举个例子:就比如我现在使用MQTTX向 mysys/envents_test (broker)发送一个消息
         * 首先会经过这里,然后我们根据 mysys/envents_test 在 TopicEnum.find(topic) 寻找,
         * 找到相应的通道为:STATE_ENVENTS(Pattern.compile("^"+MY_BASIC_PRE+ENVENTS_TEST+"$"), ChannelName.ENVENTS_INBOUND_TEST),
         * 即找到一个 ChannelName 为 ENVENTS_INBOUND_TEST 通道(这个通道我们已经注册在Spring 中啦)
         *
         * 找到这个通道后,我们会将消息投递到这个通道去
         * 接下来就是看是谁订阅了这个通道的消息,那么就会接着处理这个消息
         * 比如我们的案例中是由 EnventsTestRouter 这个二级路由订阅了消息通道,来进行消息的再次分发,
         * 在这里的时候,EnventsTestRouter 可以不再是根据节点的名称来进行处理,而是具体的消息来进行二次处理,比如指定要判断消息中的某一个字段是什么
         * 从而再交由谁处理(即再次投递到哪个 ChannelName 中去)
         *
         */
        @Override
        @Router(inputChannel = ChannelName.INBOUND)
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            MessageHeaders headers = message.getHeaders();
            String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
            byte[] payload = (byte[])message.getPayload();
            log.info("received topic :{} \t payload :{}", topic, new String(payload));
            TopicEnum topicEnum = TopicEnum.find(topic);
            MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
            return Collections.singleton(bean);
        }
    }

补充:所有的入站信息,都会率先经过这里。determineTargetChannels的实际作用并非是分发,而是找到需要接收的Channel(信道),具体的调用是在上层的抽象类 AbstractMessageRouter.handleMessageInternal 方法内,具体的分发也是在这个方法的下半部分。

4.6、IntegrationFlow Java DSL

    @Bean
    public IntegrationFlow myTestMethodRouterFlow() {
        return IntegrationFlows
                .from(ChannelName.ENVENTS_INBOUND_TEST)
                .<byte[], CommonTopicReceiver>transform(payload -> {
                    try {
                        return mapper.readValue(payload, CommonTopicReceiver.class);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return new CommonTopicReceiver();
                })
                .<CommonTopicReceiver, EnventsTestMethodEnum>route(
                        receiver -> EnventsTestMethodEnum.find(receiver.getMethod()),
                        mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(
                                methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())))
                .get();
    }

具体的API使用,我没有牵扯太多,简单的说一下方法:

1、form(),接收来自 ChannelName.ENVENTS_INBOUND_TEST 通道的消息

2、transform(),将接收的消息转换自己需要的类型,我这里是将 byte[]转换为 CommonTopicReceiver 类型

3、route(),这个方法,怎么说,坦白说我自己也想了蛮久的,源码也看了,网上资料也查了,大部分都没有解答我的疑惑。我先说作用:这是一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route**方法来完成的。

疑惑的点在哪里呢?

<CommonTopicReceiver, EnventsTestMethodEnum>route(
                    receiver -> EnventsTestMethodEnum.find(receiver.getMethod()),
                    mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(
                            methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())));

有没有人注意到route()方法的第二个参数 mapping 话说,这个 mapping 是怎么来的?我也没有定义。正确方法:问GPT(手头狗头)

第一遍解释:

.route(...) :这是整个代码片段中最复杂的部分,它定义了一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route**方法来完成的。

  • receiver -> EnventsTestMethodEnum.find(receiver.getMethod()) :这个表达式是一个Lambda表达式,它接受一个**CommonTopicReceiver对象作为输入,并根据该对象的方法(getMethod())返回一个EnventsTestMethodEnum**枚举值。它的作用是决定消息应该被路由到哪个通道。
  • mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(...) :这个表达式也是一个Lambda表达式,它接受一个**mapping参数,该参数用于定义路由规则。在这里,它遍历了所有的EnventsTestMethodEnum枚举值,然后通过.channelMapping()**方法将每个枚举值映射到相应的输出通道。

看完还是不理解,然后我又拆出来,单独询问了一遍:

image.png

看完这个就大致明白啦,这个参数是 Spring Integration 由框架自动传递给Lambda表达式的参数。

从其他博主那找了一个简单案例:Spring Integration提供了一个IntegrationFlow来定义系统继承流程,而通过IntegrationFlowsIntegrationFlowBuilder来实现使用Fluent API来定义流程。在Fulent API里,分别提供了下面方法来映射Spring Integration的端点(EndPoint)。

    transform() -> Transformer
    filter() -> Filter
    handle() -> ServiceActivator、Adapter、Gateway
    split() -> Splitter
    aggregate() -> Aggregator
    route() -> Router
    bridge() -> Bridge

一个简单的流程定义如下:

    @Bean
    public IntegrationFlow demoFlow(){
        return IntegrationFlows.from("input")  //从Channel  input获取消息
          .<String,Integer>transform(Integer::parseint) //将消息转换成整数
          .get();  //获得集成流程并注册为Bean
    }

原文链接:blog.csdn.net/qq_40929047…

4.7、Message Handler

关于 Message Handler 我在入站适配器的配置类中,有配置过一个默认的消息处理器(通常用来兜底的)

    /**
     * Define a default channel to handle messages that have no effect.
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.DEFAULT)
    public MessageHandler defaultInboundHandler() {
        return message -> {
            log.info("The default channel does not handle messages." +
                    "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
                    "\nPayload: " + message.getPayload());
        };
    }

上面的@ServiceActivator(inputChannel = ChannelName.DEFAULT) 就是表明这是处理从 DEFAULT通道的消息处理方法。

但实际上能够处理消息的并非只有 MessageHandler 类,只要在 xxxxxServiceImpl (已经注册到bean)上标记@ServiceActivator(inputChannel = ChannelName.xxxx) 即可处理来自 xxxx 的消息,如果消息不再需要继续传递,那么到这里即是消息的终点啦

比如案例中:

@Service
@Slf4j
public class EnventsTestServiceImpl implements EnventsTestService {
    @Autowired
    private IMessageSenderService messageSenderService;
    @Autowired
    private ObjectMapper mapper;
    @Override
    @ServiceActivator(inputChannel = ChannelName.INBOUND_TASK_TEST2, outputChannel = ChannelName.OUTBOUND_TEST_REPLY)
    public CommonTopicReceiver handleInboundTest1Reply(CommonTopicReceiver receiver, MessageHeaders headers) {
        String dockSn  = receiver.getGateway();
        log.info("handleInboundTest1");
        log.info("dockSn:{}",dockSn);
        log.info("receiver:{}",receiver);
        log.info("headers:{}",headers);
        return receiver;
    }
    @ServiceActivator(inputChannel = ChannelName.OUTBOUND_TEST_REPLY,outputChannel = ChannelName.OUTBOUND)
    @Override
    public void handleOutboundTestReply(CommonTopicReceiver receiver, MessageHeaders headers) {
        log.info("handleOutboundTest");
        log.info("receiver:{}",receiver);
        log.info("headers:{}",headers);
        CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                .tid("receiver.getTid()")
                .bid("receiver.getBid()")
                .method("reply")
                .timestamp(System.currentTimeMillis())
                .data(RequestsReply.success())
                .build();
        messageSenderService.publish("envents_test/response", build);
    }
}

4.8、订阅主题

不知道看到这里的小伙伴是否还记得基础概念的这张图:

image.png

与外界信息来源进行交互的ChannelAdapter(入站适配器)来做的,在谈到入站适配器的配置时,我们也看到了连接也是它来做的,包括初始化时,可以订阅配置文件中配置的主题。

image.png

那么自然添加新的主题,也是通过它来完成啦,以下为具体实现,调用则是在上层抽象类 AbstractMqttMessageDrivenChannelAdapter 中

image.png

image.png

案例中的应用:

image.png

4.9、向某个主题发送消息和@MessagingGateway注解

坦白说,在我刚看下面这段代码的时候,我也是有些懵的,虽然意思很好猜,就是发送消息,但为啥是这样写,却是完全不懂啦。不过正是因为这些好奇,最后才组成了这篇文章吧

    @Component
    @MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
    public interface IMqttMessageGateway {
        /**
         * Publish a message to a specific topic.
         * @param topic target
         * @param payload   message
         */
        void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
        /**
         * Use a specific qos to push messages to a specific topic.
         * @param topic     target
         * @param payload   message
         * @param qos   qos
         */
        void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
    }

1、简要来说,Messaging Gateway 就是在项目中只定义消息端点的接口(使用 Xml 或者 java 注解标识这个接口),接口的具体实现由 spring 容器实现(具体是 GatewayProxyFactoryBean 来创建接口实现)。Messaging Gateway 产生的消息将根据消息头中的 request-channel 发送到对应的 channel,并由 reply-channel 中获取响应。

GatewayProxyFactoryBean 创建动态代理对象,拦截发送Mqtt消息的处理,委托给对应的MessageChannel(消息通道),此消息通道是通过@MessagingGateway注解的defaultRequestChannel属性来配置的。 后面再由订阅这个消息通道的出站适配器进行处理,从而发送到MQTT Broker

总的来说就是定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGatewaydefaultRequestChannel 参数用于绑定具体的 MessageChannel

2、对于接口方法中的参数,默认是以 Map 作为消息头而具体的类作为消息的负载(payload),也可以使用 @Header,@Payload 参数注解指定。

3、对于没有参数的方法,这意味着不需要调用者传入而是借由 Messaging Gateway 自动生成。

4、对于消息处理过程中的异常,默认情况下会层层的向上传递,为了捕获相应的异常,可以在接口的方法上添加 throws 关键字定义需要捕获的异常。除此之外,还可以通过指定一个 errorChannel 将错误由指定的消息消费者处理。

案例中的应用:

image.png

4.10、出站适配器

谈到这个,还是把上面的图扒拉下来:

image.png

不过这里的出站适配器是由MqttPahoMessageHandler实现的。

    @Configuration
    public class MqttOutboundConfiguration {
        @Autowired
        private MqttPahoClientFactory mqttClientFactory;
        /**
         * Clients of outbound message channels.
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                    MqttConfiguration.getBasicClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(),
                    mqttClientFactory);
            DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
            // use byte types uniformly
            converter.setPayloadAsBytes(true);
            messageHandler.setAsync(true);
            messageHandler.setDefaultQos(0);
            messageHandler.setConverter(converter);
            return messageHandler;
        }
    }

也可以使用 Java DSL 的方式配置出站适配器,如下示例:

    @Bean
    public IntegrationFlow mqttOutboundFlow() {
      return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

4.11、测试效果

image.png

如果启动项目后,要订阅新的主题:

image.png

image.png


具体代码:

    @RestController
    @RequestMapping("/topic")
    public class MqttTopicController {
        @Autowired
        private IMqttTopicService mqttTopicService;
        @Autowired
        private IMessageSenderService messageSenderService;
        @GetMapping("/add")
        public String add(String topic){
            mqttTopicService.subscribe(topic);
            return topic+"添加成功";
        }
        @GetMapping("/pulish")
        public String pulish(String topic){
            CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                    .tid("receiver.getTid()")
                    .bid("receiver.getBid()")
                    .method("reply")
                    .timestamp(System.currentTimeMillis())
                    .data(RequestsReply.success())
                    .build();
            messageSenderService.publish(topic, build);
            return "向"+topic+"发送消息";
        }
        @GetMapping("/reply")
        public CommonTopicResponse reply(){
            CommonTopicResponse<Object> build = CommonTopicResponse.builder()
                    .tid("receiver.getTid()")
                    .bid("receiver.getBid()")
                    .method("reply")
                    .timestamp(System.currentTimeMillis())
                    .data(RequestsReply.success())
                    .build();
            messageSenderService.publish("test/9876", build);
            return build;
        }
    }

只是进行了非常的简单的测试,更多需要使用的,还是需要自己亲自去测试更佳。

五、Spring Integration 的优缺点

看到这里,不知道你有感受到了哪些关于 Spring Integration 的优缺点呢

优点

  1. 解耦。借助官网的这句话“业务逻辑和集成逻辑之间的关注点分离“,业务逻辑是我们处理消息的部分,集成逻辑是消息传递的部分,在使用Spring Integration 后,消息生产者和消息消费者不再具有强藕性。
  2. 模块化: Spring Integration 使用模块化的设计,你可以根据需要选择性地添加不同的模块,例如消息通道、消息路由、消息转换等,以满足你的集成需求。
  3. 多种通信协议支持: Spring Integration 支持多种通信协议,包括HTTP、FTP、JMS、AMQP、SMTP等,使得你可以轻松地与不同系统进行通信。
  4. 与Spring生态集成度高,因为本身就出自于Spring家族,从集成度而言,比其他第三方框架要好的多。
  5. 应对复杂场景更轻松。可以根据自己的需求定制消息处理器、消息通道和路由规则,以满足复杂的集成场景。

缺点

  • 学习成本。如果是没接触过的朋友,Spring Integration 还是有一定的学习曲线的。
  • 过度 。Spring Integration 确实优点不少,但是如果你的项目并不是十分复杂,那么使用它其实有可能是繁琐和复杂的。
  • 复杂。虽然能更好的应对复杂场景,但是复杂场景下,它的配置也会变得复杂,它的维护和管理也会逐渐变得困难。这其实是一个系统发展不可避免的一个问题,当你遇上此问题时,那么也是你该进行知识输入的时候啦。
  • 适用于特定场景

总的来说,Spring Integration 是一个强大的企业集成框架,可以帮助你解决复杂的集成问题。但在选择使用它之前,你需要考虑你的具体需求、团队的经验和项目的复杂性。如果你的集成需求相对简单,可能有更轻量级的替代方案可供选择。

补充:此处多参考于ChatGPT。

参考

  1. EMQX Docker 部署指南
  2. Spring Integration 文档
  3. Spring 5 实战
  4. spring MessagingGateway 简介
  5. spring boot + mqtt 物联网开发
  6. ChatGPT

最后

这篇更多的是一个学习过程中的记录,代码的实现也是Demo,规范以及实用性仍然是有不足之处,更多的是提供参考,而非直接使用的。如果你有更好的方式,那么不妨在评论区中写下你的想法,还望朋友们不吝赐教。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6天前
|
Java 应用服务中间件 Nacos
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
Spring Cloud 常用各个组件详解及实现原理(附加源码+实现逻辑图)
39 0
|
6天前
|
监控 数据可视化 安全
一套成熟的Spring Cloud智慧工地平台源码,自主版权,开箱即用
这是一套基于Spring Cloud的智慧工地管理平台源码,具备自主版权,易于使用。平台运用现代技术如物联网、大数据等改进工地管理,服务包括建设各方,提供人员、车辆、视频监控等七大维度的管理。特色在于可视化管理、智能报警、移动办公和分布计算存储。功能涵盖劳务实名制管理、智能考勤、视频监控AI识别、危大工程监控、环境监测、材料管理和进度管理等,实现工地安全、高效的智慧化管理。
|
6天前
|
监控 Java 应用服务中间件
Spring Boot 源码面试知识点
【5月更文挑战第12天】Spring Boot 是一个强大且广泛使用的框架,旨在简化 Spring 应用程序的开发过程。深入了解 Spring Boot 的源码,有助于开发者更好地使用和定制这个框架。以下是一些关键的知识点:
25 6
|
6天前
|
Java 应用服务中间件 测试技术
深入探索Spring Boot Web应用源码及实战应用
【5月更文挑战第11天】本文将详细解析Spring Boot Web应用的源码架构,并通过一个实际案例,展示如何构建一个基于Spring Boot的Web应用。本文旨在帮助读者更好地理解Spring Boot的内部工作机制,以及如何利用这些机制优化自己的Web应用开发。
32 3
|
6天前
|
安全 Java 测试技术
Spring Boot集成支付宝支付:概念与实战
【4月更文挑战第29天】在电子商务和在线业务应用中,集成有效且安全的支付解决方案是至关重要的。支付宝作为中国领先的支付服务提供商,其支付功能的集成可以显著提升用户体验。本篇博客将详细介绍如何在Spring Boot应用中集成支付宝支付功能,并提供一个实战示例。
43 2
|
4天前
|
消息中间件 Java 数据安全/隐私保护
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件
|
6天前
|
NoSQL Java MongoDB
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
【5月更文挑战第11天】本文介绍了如何将非关系型数据库MongoDB与Spring Boot框架集成,以实现高效灵活的数据管理。Spring Boot简化了Spring应用的构建和部署,MongoDB则以其对灵活数据结构的处理能力受到青睐。集成步骤包括:添加MongoDB依赖、配置连接信息、创建数据访问对象(DAO)以及进行数据操作。通过这种方式,开发者可以充分利用两者优势,应对各种数据需求。在实际应用中,结合微服务架构等技术,可以构建高性能、可扩展的系统。掌握MongoDB与Spring Boot集成对于提升开发效率和项目质量至关重要,未来有望在更多领域得到广泛应用。
【MongoDB 专栏】MongoDB 与 Spring Boot 的集成实践
|
6天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
6天前
|
消息中间件 Java Spring
Springboot 集成Rabbitmq之延时队列
Springboot 集成Rabbitmq之延时队列
7 0
|
6天前
|
安全 Java 数据库连接
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通
在IntelliJ IDEA中通过Spring Boot集成达梦数据库:从入门到精通