SpringCloud Day08--消息驱动(SpringCloud Stream)(一)

简介: SpringCloud Day08--消息驱动(SpringCloud Stream)(一)

11. SpringCloud Stream 消息驱动


11.1 消息驱动概述


11.1.1 是什么?


官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。


应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。

通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。

所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。


通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。


目前仅支持RabbitMQ、Kafka。


一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型


官网参考文档:



Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念


f903197c22236dd0eb1bacb8c04d6267.png


https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/

Spring Cloud Stream中文指导手册: https://m.wang1314.com/doc/webapp/topic/20971999.html


11.1.2 标准MQ


b4ef31dc139d6adb896477449a1e64c7.png

生产者/消费者之间靠消息媒介传递信息内容==>Message

消息必须走特定的通道==>消息通道MessageChannel

消息通道里的消息如何被消费呢,谁负责收发处理==>消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅


11.1.3 为什么用Cloud Stream?


比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,

像RabbitMQ有exchange,kafka有Topic和Partitions分区


8d0f98b6384a1784a53cb78e2b9e8351.jpg


这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。


1.stream凭什么可以统一底层差异?


在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,

由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。


通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。



4c8c6854fc75a13a8b90e6294777dafb.png


2.Binder


在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程



9cec8c254f7de8527768dd9e76672f3d.png


定义绑定器Binder作为中间

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。


11.1.4 Stream中的消息通信方式遵循了发布-订阅模式


Topic主题进行广播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic


11.1.5 Spring Cloud Stream标准流程套路


097f29c81c0cb0cd12766a7fbfbbee72.jpg


7963028fc89c774e34e5bd390a552426.png


Binder:很方便的连接中间件,屏蔽差异

Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置

Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。


11.1.6 编码API和常用注解

e2be1390407a4fe9b9fc9496c711a077.png


d368a3d47901fe481606c00621677e9a.png


11.2 案例说明


RabbitMQ环境已经OK


工程中新建三个子模块


cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块

cloud-stream-rabbitmq-consumer8802,作为消息接收模块

cloud-stream-rabbitmq-consumer8803 作为消息接收模块


11.3 消息驱动之生产者


  • 建Module—cloud-stream-rabbitmq-provider8801
  • POM


<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • YML
server:
  port: 8801
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.174.128
                port: 5672
                username: admin
                password: admin
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7004.com:7004/eureka  # 集群版
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址
  • 主启动类


@SpringBootApplication
public class StreamMQMain8801
{
    public static void main(String[] args)
    {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}


  • 业务类

发送消息接口:


public interface IMessageProvider
{
    public String send() ;
}


相关文章
|
18天前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
|
14天前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
31 0
|
18天前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
|
18天前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
18天前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
|
18天前
|
SQL Java 数据库连接
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
|
18天前
|
Java 开发工具 Spring
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
|
18天前
|
NoSQL Java Redis
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
|
18天前
|
Java Spring
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
|
18天前
|
Java Maven Python
【Azure Spring Cloud】部署Azure spring cloud 失败
【Azure Spring Cloud】部署Azure spring cloud 失败