rocketmq整合SpringCloudStream
发送消息
引入包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
添加配置文件
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
destination: TopicTest
group: PRODUCER_GROUP_TOPIC_TEST
代码配置:
@SpringBootApplication
@EnableBinding({ Source.class })
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
public class ProduceController {
@Autowired
private Source source;
@PostConstruct
private void init() throws InterruptedException {
MessageBuilder builder = MessageBuilder.withPayload("init...");
Message message = builder.build();
source.output().send(message);
System.out.println("init...");
}
}
@EnableBinding({ Source.class })表示绑定配置文件中名称为output的消息通道Binding,Source类中定义的消息通道名称为output。
消费消息:
配置文件:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
destination: TopicTest2
group: CONSUER_GROUP_DEMO_1
name-server是RocketMq的NameServer地址,destination指定Topic名称,指定名称为input的Binding接收TopicTest的消息
消息监听:
@EnableBinding({ Sink.class})
@SpringBootApplication
public class Application {
@StreamListener(value = InputChannel.ORDER_INPUT)
public void receive(String receiveMsg) {
System.out.println("receive: " + receiveMsg);
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@EnableBinding({ Sink.class})表示绑定配置文件名称为input的消息通道Binding,Sink类中定义的消息通道名称为input,@StreamListener表示定义一个消息监听器,接收RocketMQ中的消息。
Spring Cloud Stream
Spring Cloud Stream是构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,目的是简化消息业务在Spring Cloud应用程序中的开发。
通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定中间件绑定器Binder实现连接到外部代理。
Spring Cloud Stream实现基于发布/订阅机制,核心四个部分组成:Spring Framework中的Spring Messaging和Spring Integration,Spring Cloud Stream中的Binders和Bindings。
Spring Messaging:Spring Framework中的统一消息编程模型
- Message:消息对象,包含消息头Header和消息体Payload
- MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送至消息通道。
- MessageHandler:消息处理器接口,用于处理消息逻辑。
Spring Integration:支持企业集成的扩展机制,提供简单的模型来构建企业集成解决方案,对Spring Messaging进行扩展。
- MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器
- MessageRouter:消息路由接口,定义默认的输出消息通道。
- Filter:消息过滤注解,用于配置消息过滤表达式
- Aggregator:消息的聚合注解,用于将一条消息拆分成多条。
- Splitter:消息分割,用于将一条消息拆分成多条。
Binders:目标绑定器,负责与外部消息中间件系统集成的组件。
- doBindProducer:绑定消息中间件客户端发送消息模块。
- doBindConsumer:绑定消息中间件客户端接收消息模块。
Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。
Spring Cloud Alibaba RocketMQ架构图
- MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口
- MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口
- Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器
- Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道
spring-cloud-stream官网: