1. 简介
事件驱动架构(Event-driven 架构,简称 EDA)是软件设计领域内的一套程序设计模型。这套模型的意义是所有的操作通过事件的发送/接收来完成。举个例子,比如一个订单的创建在传统软件设计中服务端通过接口暴露创建订单的动作,然后客户端访问创建订单。在事件驱动设计里,订单的创建通过接收订单事件来完成,这个过程中有事件发送者和事件接受者这两个模块,事件发送者的作用是发送订单事件,事件接受者的作用的接收订单事件。Spring Cloud Stream 是一套基于消息的事件驱动开发框架,它提供了一套全新的消息编程模型,此模型屏蔽了底层具体消息中间件的使用方式。开发者们使用这套模型可以完成基于消息的事件驱动应用开发。
2. 学习目标
- 掌握 Spring 对消息的编程模型封装
- 掌握 RocketMQ 整合 Spring Cloud Stream 完成消息的发送和接收
- 掌握 RocketMQ 整合 Spring Cloud Bus 完成远程事件的发送和接收
3. 详细内容
- 概念理解:指导读者理解 Spring 的消息编程模型
- 消息发送/接收:实战 Spring Cloud Steam RocketMQ Binder
- 事件发送/接收: 实战 Spring Cloud Bus RocketMQ
4. 理解 Spring 消息编程模型
首先我们来看这个场景,不同的消息中间件发送消息的代码:
每个消息中间件都有自己的消息模型编程,他们的代码编写方式都不一致。同样地,在消息的订阅方面,也是不同的代码。这个时候如果某天想把 Kafka 切换到 RocketMQ,必须得修改大量代码。
Spring 生态里有两个消息相关的模块和项目,分别是 spring-messaging 模块和 Spring Integration 项目,它们对消息的编程模型进行了统一,不论是 Apache RocketMQ 的 Message,或者是 Apache Kafka 的 ProducerRecord,都被统一称为 org.springframework.messaging.Message 接口。
Message 接口有两个方法,分别是 getPayload 以及 getHeaders 用于获取消息体以及消息头。如图所示,这也意味着一个消息 Message 由 Header 和 Payload 组成:
Payload 是一个泛型,意味是消息体可以放任意数据类型。Header 是一个 MessageHeaders 类型的消息头。
有了消息之后,这个消息被发送到哪里呢?Spring 提供了消息通道 MessageChannel 的概念。消息可以被发送到消息通道里,然后再通过消息处理器 MessageHandler 去处理消息通道里的消息:
消息处理这里又会遇到一个问题。如果消息通道里只有 1 个消息,但是消息处理器有 N 个,这个时候要被哪个消息处理器处理呢?这里又涉及一个消息分发器的问题。UnicastingDispatcher 表示单播的处理方式,消息会通过负载均衡被分发到某一个消息处理器上,BroadcastingDispatcher 表示广播的方式,消息会被所有的消息处理器处理。
5. Spring Cloud Stream
Spring Cloud Stream 是一套基于消息的事件驱动开发框架。
Spring Cloud Stream 在 Spring Integration 项目的基础上再进行了一些封装,提出一些新的概念,让开发者能够更简单地使用这套消息编程模型。如图所示,这是三者之间的关系:
如下图所示,这是 Spring Cloud Stream 的编程模型。通过 RabbitMQ Binder 构建 input Binding 用于读取 RabbitMQ 上的消息,将 payload 内容转成大写再通过 Kafka Binder 构建的 output Binding 写入到 Kafka 中。图上中间的 4行非常简单的代码就可以完成从 RabbitMQ 读取消息再写入到 Kafka 的动作。
以下代码是使用 Spring Cloud Stream 以最简单的方式完成消息的发送和接收:
@SpringBootApplication@EnableBinding({Source.class, Sink.class}) // ① public class SCSApplication { public static void main(String[] args) { new SpringApplicationBuilder().sources(SCSApplication.class) .web(WebApplicationType.NONE).run(args); } @Autowired Source source; // ② @Bean public CommandLineRunner runner() { return (args) -> { source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build()); // ③ }; } @StreamListener(Sink.INPUT) // ④ @SendTo(Source.OUTPUT) // ⑤ public String receive(String msg) { return msg.toUpperCase(); } }
- 使用 @EnableBinding 注解,注解里面有两个参数 Source 和 Sink,它们都是接口。Source 接口内部有个 MessageChannel 类型返回值的 output 方法,被 @Output 注解修饰表示这是一个 Output Binding;Sink 接口内部有个 SubscribableChannel 类型返回值的 intput 方法,被 @Input 注解修饰表示这是一个 Input Binding。@EnableBinding 注解会针对这两个接口生成动态代理。
- 注入 @EnableBinding 注解对于 Source 接口生成的动态代理。
- 使用 @EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel 发送一条消息。最终消息会被发送到消息中间件对应的 topic 里。
- @StreamListener 注解订阅 @EnableBinding 注解对于 Sink 接口生成的动态代理内部的 SubscribableChannel 中的消息,这里会订阅到消息中间件对应的topic 和 group。
- 消息处理结果发送到@EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel。最终消息会被发送到消息中间件对应的topic 里。
上述代码需要配置信息:
spring.cloud.stream.bindings.input.destination=test-input spring.cloud.stream.bindings.input.group=test-input-binder spring.cloud.stream.bindings.input.binder=kafka spring.cloud.stream.bindings.output.destination=test-output spring.cloud.stream.bindings.output.binder=rocketmq
这里的 Input Binding 对应的 topic 是 test-input,group 是 test-input-binder,对应的 MQ 是 Kafka,Output Binding 对应的 topic 是 test-output,对应的 MQ 是 RocketMQ。
所以这段代码的意思是以 test-input-binder 这个 group 去 Kafka 上读取 test-input 这个 topic 下的消息,把消息的内容转换成大写再发送给 RocketMQ 的 test-output topic 上。