1. Spring Cloud Stream是什么?
Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。
官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的Spring习惯用法和最佳实践,包括对持久pub/sub 语义、消费者组和有状态分区的支持。
简单的理解就是Spring Cloud Stream 通过在上层定义统一消息的编程模型,屏蔽了底层消息中间件的差异,降低了使用成本。下图展示了Spring Cloud Stream的处理架构
Spring Cloud Stream的核心构建块(编程模型)是:
- Destination Binders: 负责提供与外部消息传递系统集成的组件。Binders 可以生成Bindings。
- **Bindings: ** 外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。即用来绑定消息生产者和消息消费者。它有两种类型,INPUT和OUTPUT,INPUT对应消费者,OUTPUT对应生产者。
- Message: 生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范的数据结构。
2. Spring Cloud Stream的执行流程
3. 注解代码实现
首先创建一个生产者项目 my-springcloud-rocketmq-producer 和一个消费者项目 my-springcloud-rocketmq-consumer。
本demo使用的 版本号是 cloud 2021.0.5.0 +springboot 2.6.13
在 my-springcloud-rocketmq-producer 上的操作
3.1. 引入依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
3.2 . 属性文件配置
spring: cloud: stream: bindings: output: destination: my-springcloud-stream-topic rocketmq: binder: name-server: 172.31.184.89:9876
3.3. 定义生产者
在MySpringcloudRocketmqProducerApplication 添加 @EnableBinding(Source.class) 注解。然后创建生产者。
@Component public class MyProducer { @Resource private Source source; public void sendMessage(String msg) { // 封装消息头 Map<String, Object> headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "tagA"); // 创建消息对象 Message<String> message = MessageBuilder.createMessage(msg, new MessageHeaders(headers)); // 发送消息 source.output().send(message); } }
在 my-springcloud-rocketmq-consumer上的操作
3.4. 引入依赖同生产者
3.5. 配置文件修改
spring.cloud.stream.rocketmq.binder.name-server=172.31.184.89:9876 spring.cloud.stream.bindings.input.destination=my-springcloud-stream-topic spring.cloud.stream.bindings.input.group=my-springcloud-stream-consume-group
3.6. 定义消费者
在MySpringcloudRocketmqConsumerApplication 类上添加 @EnableBinding(Sink.class)注解。
@Component public class MyConsumer { @StreamListener(Sink.INPUT) public void processMessage(String message) { System.out.println("收到的消息=" + message); } }