关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码2:https://developer.aliyun.com/article/1394928
4.4、入站适配器MqttInboundConfiguration
@Slf4j @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { @Autowired private MqttPahoClientFactory mqttClientFactory; @Resource(name = ChannelName.INBOUND) private MessageChannel inboundChannel; /** * Clients of inbound message channels. * @return */ @Bean(name = "adapter") public MessageProducerSupport mqttInbound() { MqttClientOptions options = MqttConfiguration.getBasicClientOptions(); // 此处在初始化的时候,初始化时,默认订阅了配置文件中的已经写定的 topic // 如果后期有需要再增加的订阅主题,调用 addTopic() 即可 MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( options.getClientId() + "_consumer_" + System.currentTimeMillis(), mqttClientFactory, options.getInboundTopic().split(",")); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); // use byte types uniformly converter.setPayloadAsBytes(true); // 设置消息转换器 adapter.setConverter(converter); adapter.setQos(1); // 设置在接收已经订阅的主题信息后,发送给那个通道,具体的发送方法需要翻上层的抽象类 adapter.setOutputChannel(inboundChannel); return adapter; } /** * Define a default channel to handle messages that have no effect. * @return */ @Bean @ServiceActivator(inputChannel = ChannelName.DEFAULT) public MessageHandler defaultInboundHandler() { return message -> { log.info("The default channel does not handle messages." + "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) + "\nPayload: " + message.getPayload()); }; } }
主要是两个地方:
1、@IntegrationComponentScan
,开启 Spring Integration 的注解扫描,扫描我们写的 @ServiceActivator(inputChannel = ChannelName.DEFAULT)、``@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)等等
2、MqttPahoMessageDrivenChannelAdapter
实现了 MessageProducerSupport
接口,同时也是最后的实现类。故此有较多的具体实现是在这个类中的。
4.5、Message Router
@Component @Slf4j public class InboundMessageRouter extends AbstractMessageRouter { /** * All mqtt broker messages will arrive here before distributing them to different channels. * @param message message from mqtt broker * @return channel * * 全部节点的信息都会先从这里过,然后再查询TopicEnum中的方法,寻找到相应的通道(也就是代码中已经注册的Channel) * 举个例子:就比如我现在使用MQTTX向 mysys/envents_test (broker)发送一个消息 * 首先会经过这里,然后我们根据 mysys/envents_test 在 TopicEnum.find(topic) 寻找, * 找到相应的通道为:STATE_ENVENTS(Pattern.compile("^"+MY_BASIC_PRE+ENVENTS_TEST+"$"), ChannelName.ENVENTS_INBOUND_TEST), * 即找到一个 ChannelName 为 ENVENTS_INBOUND_TEST 通道(这个通道我们已经注册在Spring 中啦) * * 找到这个通道后,我们会将消息投递到这个通道去 * 接下来就是看是谁订阅了这个通道的消息,那么就会接着处理这个消息 * 比如我们的案例中是由 EnventsTestRouter 这个二级路由订阅了消息通道,来进行消息的再次分发, * 在这里的时候,EnventsTestRouter 可以不再是根据节点的名称来进行处理,而是具体的消息来进行二次处理,比如指定要判断消息中的某一个字段是什么 * 从而再交由谁处理(即再次投递到哪个 ChannelName 中去) * */ @Override @Router(inputChannel = ChannelName.INBOUND) protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { MessageHeaders headers = message.getHeaders(); String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString(); byte[] payload = (byte[])message.getPayload(); log.info("received topic :{} \t payload :{}", topic, new String(payload)); TopicEnum topicEnum = TopicEnum.find(topic); MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName()); return Collections.singleton(bean); } }
补充:所有的入站信息,都会率先经过这里。determineTargetChannels的实际作用并非是分发,而是找到需要接收的Channel(信道),具体的调用是在上层的抽象类 AbstractMessageRouter.handleMessageInternal
方法内,具体的分发也是在这个方法的下半部分。
4.6、IntegrationFlow Java DSL
@Bean public IntegrationFlow myTestMethodRouterFlow() { return IntegrationFlows .from(ChannelName.ENVENTS_INBOUND_TEST) .<byte[], CommonTopicReceiver>transform(payload -> { try { return mapper.readValue(payload, CommonTopicReceiver.class); } catch (IOException e) { e.printStackTrace(); } return new CommonTopicReceiver(); }) .<CommonTopicReceiver, EnventsTestMethodEnum>route( receiver -> EnventsTestMethodEnum.find(receiver.getMethod()), mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach( methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName()))) .get(); }
具体的API使用,我没有牵扯太多,简单的说一下方法:
1、form(),接收来自 ChannelName.ENVENTS_INBOUND_TEST 通道的消息
2、transform(),将接收的消息转换自己需要的类型,我这里是将 byte[]转换为 CommonTopicReceiver 类型
3、route(),这个方法,怎么说,坦白说我自己也想了蛮久的,源码也看了,网上资料也查了,大部分都没有解答我的疑惑。我先说作用:这是一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route
**方法来完成的。
疑惑的点在哪里呢?
<CommonTopicReceiver, EnventsTestMethodEnum>route( receiver -> EnventsTestMethodEnum.find(receiver.getMethod()), mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach( methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())));
有没有人注意到route()
方法的第二个参数 mapping
话说,这个 mapping
是怎么来的?我也没有定义。正确方法:问GPT(手头狗头)
第一遍解释:
.route(...)
:这是整个代码片段中最复杂的部分,它定义了一个消息路由器。路由器根据输入消息的内容选择一个输出通道,这个选择是通过**.route
**方法来完成的。
receiver -> EnventsTestMethodEnum.find(receiver.getMethod())
:这个表达式是一个Lambda表达式,它接受一个**CommonTopicReceiver
对象作为输入,并根据该对象的方法(getMethod()
)返回一个EnventsTestMethodEnum
**枚举值。它的作用是决定消息应该被路由到哪个通道。mapping -> Arrays.stream(EnventsTestMethodEnum.values()).forEach(...)
:这个表达式也是一个Lambda表达式,它接受一个**mapping
参数,该参数用于定义路由规则。在这里,它遍历了所有的EnventsTestMethodEnum
枚举值,然后通过.channelMapping()
**方法将每个枚举值映射到相应的输出通道。
看完还是不理解,然后我又拆出来,单独询问了一遍:
看完这个就大致明白啦,这个参数是 Spring Integration
由框架自动传递给Lambda表达式的参数。
从其他博主那找了一个简单案例:Spring Integration
提供了一个IntegrationFlow
来定义系统继承流程,而通过IntegrationFlows
和IntegrationFlowBuilder
来实现使用Fluent API来定义流程。在Fulent API
里,分别提供了下面方法来映射Spring Integration
的端点(EndPoint)。
transform() -> Transformer filter() -> Filter handle() -> ServiceActivator、Adapter、Gateway split() -> Splitter aggregate() -> Aggregator route() -> Router bridge() -> Bridge
一个简单的流程定义如下:
@Bean public IntegrationFlow demoFlow(){ return IntegrationFlows.from("input") //从Channel input获取消息 .<String,Integer>transform(Integer::parseint) //将消息转换成整数 .get(); //获得集成流程并注册为Bean }
原文链接:blog.csdn.net/qq_40929047…
4.7、Message Handler
关于 Message Handler
我在入站适配器的配置类中,有配置过一个默认的消息处理器(通常用来兜底的)
/** * Define a default channel to handle messages that have no effect. * * @return */ @Bean @ServiceActivator(inputChannel = ChannelName.DEFAULT) public MessageHandler defaultInboundHandler() { return message -> { log.info("The default channel does not handle messages." + "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) + "\nPayload: " + message.getPayload()); }; }
上面的@ServiceActivator(inputChannel = ChannelName.DEFAULT)
就是表明这是处理从 DEFAULT通道的消息处理方法。
但实际上能够处理消息的并非只有 MessageHandler 类,只要在 xxxxxServiceImpl (已经注册到bean)上标记@ServiceActivator(inputChannel = ChannelName.xxxx)
即可处理来自 xxxx 的消息,如果消息不再需要继续传递,那么到这里即是消息的终点啦
比如案例中:
@Service @Slf4j public class EnventsTestServiceImpl implements EnventsTestService { @Autowired private IMessageSenderService messageSenderService; @Autowired private ObjectMapper mapper; @Override @ServiceActivator(inputChannel = ChannelName.INBOUND_TASK_TEST2, outputChannel = ChannelName.OUTBOUND_TEST_REPLY) public CommonTopicReceiver handleInboundTest1Reply(CommonTopicReceiver receiver, MessageHeaders headers) { String dockSn = receiver.getGateway(); log.info("handleInboundTest1"); log.info("dockSn:{}",dockSn); log.info("receiver:{}",receiver); log.info("headers:{}",headers); return receiver; } @ServiceActivator(inputChannel = ChannelName.OUTBOUND_TEST_REPLY,outputChannel = ChannelName.OUTBOUND) @Override public void handleOutboundTestReply(CommonTopicReceiver receiver, MessageHeaders headers) { log.info("handleOutboundTest"); log.info("receiver:{}",receiver); log.info("headers:{}",headers); CommonTopicResponse<Object> build = CommonTopicResponse.builder() .tid("receiver.getTid()") .bid("receiver.getBid()") .method("reply") .timestamp(System.currentTimeMillis()) .data(RequestsReply.success()) .build(); messageSenderService.publish("envents_test/response", build); } }
4.8、订阅主题
不知道看到这里的小伙伴是否还记得基础概念的这张图:
与外界信息来源进行交互的ChannelAdapter(入站适配器)来做的,在谈到入站适配器的配置时,我们也看到了连接也是它来做的,包括初始化时,可以订阅配置文件中配置的主题。
那么自然添加新的主题,也是通过它来完成啦,以下为具体实现,调用则是在上层抽象类 AbstractMqttMessageDrivenChannelAdapter 中
案例中的应用:
4.9、向某个主题发送消息和@MessagingGateway
注解
坦白说,在我刚看下面这段代码的时候,我也是有些懵的,虽然意思很好猜,就是发送消息,但为啥是这样写,却是完全不懂啦。不过正是因为这些好奇,最后才组成了这篇文章吧。
@Component @MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND) public interface IMqttMessageGateway { /** * Publish a message to a specific topic. * @param topic target * @param payload message */ void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); /** * Use a specific qos to push messages to a specific topic. * @param topic target * @param payload message * @param qos qos */ void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos); }
1、简要来说,Messaging Gateway
就是在项目中只定义消息端点的接口(使用 Xml 或者 java 注解标识这个接口),接口的具体实现由 spring 容器实现(具体是 GatewayProxyFactoryBean
来创建接口实现)。Messaging Gateway 产生的消息将根据消息头中的 request-channel 发送到对应的 channel,并由 reply-channel 中获取响应。
即 GatewayProxyFactoryBean
创建动态代理对象,拦截发送Mqtt消息的处理,委托给对应的MessageChannel(消息通道),此消息通道是通过@MessagingGateway
注解的defaultRequestChannel
属性来配置的。 后面再由订阅这个消息通道的出站适配器进行处理,从而发送到MQTT Broker。
总的来说就是定义一个 @MessagingGateway
修饰的接口,用于消息的发送,@MessagingGateway
的 defaultRequestChannel
参数用于绑定具体的 MessageChannel
。
2、对于接口方法中的参数,默认是以 Map 作为消息头而具体的类作为消息的负载(payload),也可以使用 @Header,@Payload 参数注解指定。
3、对于没有参数的方法,这意味着不需要调用者传入而是借由 Messaging Gateway 自动生成。
4、对于消息处理过程中的异常,默认情况下会层层的向上传递,为了捕获相应的异常,可以在接口的方法上添加 throws 关键字定义需要捕获的异常。除此之外,还可以通过指定一个 errorChannel 将错误由指定的消息消费者处理。
案例中的应用:
4.10、出站适配器
谈到这个,还是把上面的图扒拉下来:
不过这里的出站适配器是由MqttPahoMessageHandler
实现的。
@Configuration public class MqttOutboundConfiguration { @Autowired private MqttPahoClientFactory mqttClientFactory; /** * Clients of outbound message channels. * @return */ @Bean @ServiceActivator(inputChannel = ChannelName.OUTBOUND) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( MqttConfiguration.getBasicClientOptions().getClientId() + "_producer_" + System.currentTimeMillis(), mqttClientFactory); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); // use byte types uniformly converter.setPayloadAsBytes(true); messageHandler.setAsync(true); messageHandler.setDefaultQos(0); messageHandler.setConverter(converter); return messageHandler; } }
也可以使用 Java DSL 的方式配置出站适配器,如下示例:
@Bean public IntegrationFlow mqttOutboundFlow() { return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient")); }
4.11、测试效果
如果启动项目后,要订阅新的主题:
具体代码:
@RestController @RequestMapping("/topic") public class MqttTopicController { @Autowired private IMqttTopicService mqttTopicService; @Autowired private IMessageSenderService messageSenderService; @GetMapping("/add") public String add(String topic){ mqttTopicService.subscribe(topic); return topic+"添加成功"; } @GetMapping("/pulish") public String pulish(String topic){ CommonTopicResponse<Object> build = CommonTopicResponse.builder() .tid("receiver.getTid()") .bid("receiver.getBid()") .method("reply") .timestamp(System.currentTimeMillis()) .data(RequestsReply.success()) .build(); messageSenderService.publish(topic, build); return "向"+topic+"发送消息"; } @GetMapping("/reply") public CommonTopicResponse reply(){ CommonTopicResponse<Object> build = CommonTopicResponse.builder() .tid("receiver.getTid()") .bid("receiver.getBid()") .method("reply") .timestamp(System.currentTimeMillis()) .data(RequestsReply.success()) .build(); messageSenderService.publish("test/9876", build); return build; } }
只是进行了非常的简单的测试,更多需要使用的,还是需要自己亲自去测试更佳。
五、Spring Integration 的优缺点
看到这里,不知道你有感受到了哪些关于 Spring Integration
的优缺点呢
优点:
- 解耦。借助官网的这句话“业务逻辑和集成逻辑之间的关注点分离“,业务逻辑是我们处理消息的部分,集成逻辑是消息传递的部分,在使用Spring Integration 后,消息生产者和消息消费者不再具有强藕性。
- 模块化: Spring Integration 使用模块化的设计,你可以根据需要选择性地添加不同的模块,例如消息通道、消息路由、消息转换等,以满足你的集成需求。
- 多种通信协议支持: Spring Integration 支持多种通信协议,包括HTTP、FTP、JMS、AMQP、SMTP等,使得你可以轻松地与不同系统进行通信。
- 与Spring生态集成度高,因为本身就出自于Spring家族,从集成度而言,比其他第三方框架要好的多。
- 应对复杂场景更轻松。可以根据自己的需求定制消息处理器、消息通道和路由规则,以满足复杂的集成场景。
缺点:
- 学习成本。如果是没接触过的朋友,Spring Integration 还是有一定的学习曲线的。
- 过度 。Spring Integration 确实优点不少,但是如果你的项目并不是十分复杂,那么使用它其实有可能是繁琐和复杂的。
- 复杂。虽然能更好的应对复杂场景,但是复杂场景下,它的配置也会变得复杂,它的维护和管理也会逐渐变得困难。这其实是一个系统发展不可避免的一个问题,当你遇上此问题时,那么也是你该进行知识输入的时候啦。
- 适用于特定场景。
总的来说,Spring Integration 是一个强大的企业集成框架,可以帮助你解决复杂的集成问题。但在选择使用它之前,你需要考虑你的具体需求、团队的经验和项目的复杂性。如果你的集成需求相对简单,可能有更轻量级的替代方案可供选择。
补充:此处多参考于ChatGPT。
参考
- EMQX Docker 部署指南
- Spring Integration 文档
- Spring 5 实战
- spring MessagingGateway 简介
- spring boot + mqtt 物联网开发
- ChatGPT
最后
这篇更多的是一个学习过程中的记录,代码的实现也是Demo,规范以及实用性仍然是有不足之处,更多的是提供参考,而非直接使用的。如果你有更好的方式,那么不妨在评论区中写下你的想法,还望朋友们不吝赐教。