1. 引言
代码已提交至Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/SpringCloud-Stream-Demo
在上一篇博客《微服务技术系列教程(33) - SpringCloud-消息驱动简介&原理》,我们了解到了SpringCloud消息驱动概念以及绑定器原理。
本文使用默认的RabbitMQ来实现,加深理解。
2.消息驱动实现
2.1 生产者
1.新建Maven项目stream-producer
2.添加maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
3.application.yml信息
server: port: 9001 spring: application: name: spingcloud-stream-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
4.创建管道
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.SubscribableChannel; public interface SendMessageInterface { // 创建一个输出管道,用于发送消息 @Output("my_msg") SubscribableChannel sendMsg(); }
5.Controller发送消息
@RestController public class SendMsgController { @Autowired private SendMessageInterface sendMessageInterface; @RequestMapping("/sendMsg") public String sendMsg() { String msg = UUID.randomUUID().toString(); System.out.println("生产者发送内容msg:" + msg); Message build = MessageBuilder.withPayload(msg.getBytes()).build(); sendMessageInterface.sendMsg().send(build); return "success"; } }
6.启动类绑定
@SpringBootApplication @EnableBinding(SendMessageInterface.class) // 开启绑定 public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } }
2.2 消费者
1.新建Maven项目stream-consumer
2.添加maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
3.application.yml
server: port: 9002 spring: application: name: spingcloud-stream-consumer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
4.管道中绑定消息
public interface RedMsgInterface { // 从管道中获取消息 @Input("my_msg") SubscribableChannel redMsg(); }
5.消费者获取消息
@Component public class Consumer { @StreamListener("my_msg") public void listener(String msg) { System.out.println("消费者获取生产消息:" + msg); } }
6.启动类绑定消费监听接口
@SpringBootApplication @EnableBinding(RedMsgInterface.class) public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } }
2.3 启动测试
1.启动application.yml配置的RabbitMQ服务器
2.启动生产者和消费者,可以看到SpringCloud默认帮我们创建了队列以及交换机。
3.生产者生产消息,浏览器输入:http://localhost:9001/sendMsg,可以看到消费者控制台能接收到消息: