springcloud 入门(8) springcloud Stream(上)

简介: springcloud 入门(8) springcloud Stream

文章目录

项目版本

1、jdk:1.8

2、springboot 2.1.6.RELEASE ,springcloud Greenwich.SR6

介绍

在系统开发里面难免用到消息队列,但各个的消息队列又有所区别,SpringCloudStream 的 作用就是屏蔽各种消息队列的区别,对消息队列的 API进行进一步的抽象,使得在springcloud 里面能更加方便的集成各种消息系统。通过使用springcloud Stream ,可以有效简化开发人员对消息中间件的使用复杂程度,让系统开发人员能够有更多精力去关注核心业务逻辑的处理。目前springcloud Stream只支持两大著名的消息中间件,rabbitmq 和 kafka。

Spring Cloud Stream 应用模型

Spring Cloud Stream 应用程序由一个中间件中立的核心组成。 应用程序通过在外部代理公开的目标和代码中的输入/输出参数之间建立绑定来与外部世界进行通信。 建立绑定所需的代理特定细节由特定于中间件的 Binder 实现处理。

以下图片来自官方

image.png

入门使用

消息生产者

这里使用rabbit作为消息中间件,自行安装rabbitmq

1、新建一个cloud-stream-provider,添加依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

2、创建消息消费者类

@EnableBinding(Sink.class)
public class SinkReceiver {
    @StreamListener(Sink.INPUT)
    public void receive(Object receivedMessage){
        System.out.println("receivedMessage="+receivedMessage);
    }
}

3、启动CloudStreamProviderApplication

启动之后登录rabbitmq后台http://localhost:15672/,使用默认账号密码guest登录

connections一栏能看到我们的连接

image.png

点击127.0.0.1:50433可以查看connection详情

image.png

再点击127.0.0.1:5672 (1)可以查看Channel详情

image.png

点击input.anonymous.leanfVQMRM-6dPfz1XIkCw 就能查看到具体的队列详情

image.png

下拉有个Publish message,在这里就可以发布数据

image.png

发布完之后就能在控制台看到发布的数据了

image.png

怎么知道这个队列input.anonymous.leanfVQMRM-6dPfz1XIkCw一定是启动的那个呢

其实在启动的时候默认就给我们分配了一个,查看控制台信息可以看到

image.png

Stream尝鲜完成了,接下来是个简单入门使用

1、创建一个消息生产者类

@EnableBinding(Source.class)
public class SourceProvider {
    @Resource
    @Qualifier("output")
    private MessageChannel messageChannel;
    public void send(Object sendMessage){
        messageChannel.send(MessageBuilder.withPayload(sendMessage).build());
    }
}

2、修改配置文件

server.port=8301
spring.application.name=cloud-stream-provider
#消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样
# type:消息中间件类型
spring.cloud.stream.binders.rabbitmq1.type=rabbit
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.virtual-host=/
# 实例2
#spring.cloud.stream.binders.rabbitmq2.type=rabbit
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.host=localhost
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.username=guest
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.password=guest
#spring.cloud.stream.binders.rabbitmq2.environment.spring.rabbitmq.port=5672
# 要使用的 Exchange 名称
spring.cloud.stream.bindings.output.destination=streamExchange
#设置消息类型
spring.cloud.stream.bindings.output.content-type=application/json
#要绑定的消息服务的实例名
spring.cloud.stream.bindings.output.binder=rabbitmq1

3、创建测试类用于发送消息

@RunWith(SpringRunner.class)
@SpringBootTest(classes = CloudStreamProviderApplication.class)
public class CloudStreamProviderTest{
    @Autowired
    private SourceProvider sourceProvider;
    @Test
    public void test(){
        sourceProvider.send("hello,this is first message");
    }
}

只有生产者没有消费者是没用的,接下来创建消费者

消息消费者

1、新建一个stream-consumer,添加依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

2、创建消息接收类SinkReceiver

@EnableBinding(Sink.class)
public class SinkReceiver {
    @StreamListener(Sink.INPUT)
    public void receive(Message<?> receivedMessage){
        System.out.println("receivedMessage="+receivedMessage.getPayload());
    }
}

3、修改配置文件

server.port=8401
spring.application.name=cloud-stream-consumer
#消息组件类型 rabbitmq1为自定义的rabbitmq实例名称,如果有多个消息队列实例的话可以参照下面这样
# type:消息中间件类型
spring.cloud.stream.binders.rabbitmq1.type=rabbit
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbitmq1.environment.spring.rabbitmq.port=5672
# 要使用的 Exchange 名称 ,input是rabbitmq的channel名称,后面可以自定义
spring.cloud.stream.bindings.input.destination=streamExchange
#设置消息类型
spring.cloud.stream.bindings.input.content-type=application/json
#要绑定的消息服务的实例名
spring.cloud.stream.bindings.input.binder=rabbitmq1

启动CloudStreamConsumerApplication,可以看到rabbitmq首页下方有一个消费者

image.png

调用CloudStreamProviderTest#test,查看stream-consumer控制台,会发现收到一条消息

receivedMessage=hello,this is first message
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
26天前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
|
2月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14605 26
|
2月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
288 15
|
25天前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
25天前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
26天前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
|
26天前
|
SQL Java 数据库连接
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
|
26天前
|
Java 开发工具 Spring
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
|
26天前
|
NoSQL Java Redis
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
|
26天前
|
Java Spring
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)