文章目录
项目版本
1、jdk:1.8
2、springboot 2.1.6.RELEASE ,springcloud Greenwich.SR6
介绍
在系统开发里面难免用到消息队列,但各个的消息队列又有所区别,SpringCloudStream 的 作用就是屏蔽各种消息队列的区别,对消息队列的 API进行进一步的抽象,使得在springcloud 里面能更加方便的集成各种消息系统。通过使用springcloud Stream ,可以有效简化开发人员对消息中间件的使用复杂程度,让系统开发人员能够有更多精力去关注核心业务逻辑的处理。目前springcloud Stream只支持两大著名的消息中间件,rabbitmq 和 kafka。
Spring Cloud Stream 应用模型
Spring Cloud Stream 应用程序由一个中间件中立的核心组成。 应用程序通过在外部代理公开的目标和代码中的输入/输出参数之间建立绑定来与外部世界进行通信。 建立绑定所需的代理特定细节由特定于中间件的 Binder 实现处理。
以下图片来自官方
入门使用
消息生产者
这里使用rabbit作为消息中间件,自行安装rabbitmq
1、新建一个cloud-stream-provider,添加依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2、创建消息消费者类
@EnableBinding(Sink.class) public class SinkReceiver { @StreamListener(Sink.INPUT) public void receive(Object receivedMessage){ System.out.println("receivedMessage="+receivedMessage); } }
3、启动CloudStreamProviderApplication
启动之后登录rabbitmq后台http://localhost:15672/,使用默认账号密码guest登录
connections一栏能看到我们的连接
点击127.0.0.1:50433可以查看connection详情
再点击127.0.0.1:5672 (1)可以查看Channel详情
点击input.anonymous.leanfVQMRM-6dPfz1XIkCw 就能查看到具体的队列详情
下拉有个Publish message,在这里就可以发布数据
发布完之后就能在控制台看到发布的数据了
怎么知道这个队列input.anonymous.leanfVQMRM-6dPfz1XIkCw一定是启动的那个呢
其实在启动的时候默认就给我们分配了一个,查看控制台信息可以看到
Stream尝鲜完成了,接下来是个简单入门使用
1、创建一个消息生产者类
@EnableBinding(Source.class) public class SourceProvider { @Resource @Qualifier("output") private MessageChannel messageChannel; public void send(Object sendMessage){ messageChannel.send(MessageBuilder.withPayload(sendMessage).build()); } }
2、修改配置文件
server.port=8301 spring.application.name=cloud-stream-provider #消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样 # type:消息中间件类型 spring.cloud.stream.binders.rabbitmq1.type=rabbit spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.virtual-host=/ # 实例2 #spring.cloud.stream.binders.rabbitmq2.type=rabbit #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.host=localhost #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.username=guest #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.password=guest #spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.port=5672 # 要使用的 Exchange 名称 spring.cloud.stream.bindings.output.destination=streamExchange #设置消息类型 spring.cloud.stream.bindings.output.content-type=application/json #要绑定的消息服务的实例名 spring.cloud.stream.bindings.output.binder=rabbitmq1
3、创建测试类用于发送消息
@RunWith(SpringRunner.class) @SpringBootTest(classes = CloudStreamProviderApplication.class) public class CloudStreamProviderTest{ @Autowired private SourceProvider sourceProvider; @Test public void test(){ sourceProvider.send("hello,this is first message"); } }
只有生产者没有消费者是没用的,接下来创建消费者
消息消费者
1、新建一个stream-consumer,添加依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2、创建消息接收类SinkReceiver
@EnableBinding(Sink.class) public class SinkReceiver { @StreamListener(Sink.INPUT) public void receive(Message<?> receivedMessage){ System.out.println("receivedMessage="+receivedMessage.getPayload()); } }
3、修改配置文件
server.port=8401 spring.application.name=cloud-stream-consumer #消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样 # type:消息中间件类型 spring.cloud.stream.binders.rabbitmq1.type=rabbit spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672 # 要使用的 Exchange 名称 ,input是rabbitmq的channel名称,后面可以自定义 spring.cloud.stream.bindings.input.destination=streamExchange #设置消息类型 spring.cloud.stream.bindings.input.content-type=application/json #要绑定的消息服务的实例名 spring.cloud.stream.bindings.input.binder=rabbitmq1
启动CloudStreamConsumerApplication,可以看到rabbitmq首页下方有一个消费者
调用CloudStreamProviderTest#test,查看stream-consumer控制台,会发现收到一条消息
receivedMessage=hello,this is first message