Spring Cloud Stream学习(五)入门

简介: Spring Cloud Stream学习(五)入门

概述:


Spring Cloud Stream是一个为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot来创建独立的,可用于生产的Spring应用程序。它通过使用Spring Integration来连接消息代理中间件以实现消息时间驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅,消费组,以及分区这个3个概念(后文会详细介绍)。简单来说,Spring Cloud Stream本质上就是整合了Spring Boot 跟 Spring Integration。实现了一套轻量级的消息驱动的微服务框架。


入门案例:


我这里直接用官网的例子:

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {
  public static void main(String[] args) {
    SpringApplication.run(LoggingConsumerApplication.class, args);
  }
  @StreamListener(Sink.INPUT)
  public void handle(Person person) {
    System.out.println("Received: " + person);
  }
  public static class Person {
    private String name;
    public String getName() {
      return name;
    }
    public void setName(String name) {
      this.name = name;
    }
    public String toString() {
      return this.name;
    }
  }
}

pom文件

<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

我们不需要做其他任务的配置,启动服务,可以在日志中看到如下的输出:

image.png

我们可以看到,红线框出的部分代表了,我们申明了一个队列,队列名称为input.anonymous.xxxx,并将其绑定到了名为input的交换机上

我们可以到RabbitMQ的管理后台查看以验证我们的结论:

image.png

我们查看队列也可以发现:

image.png

                                   image.png

我们启动程序后,相当于做为一个消费者一直在监听这个队列,我们现在可以尝试直接用MQ后台往这个队列中publish一条消息

image.png

可以看到程序输出如下:

receive: hello,spring cloud stream

在上面的例子中,我们用到了这几个注解

  1. @EnableBinding(Sink.class)
  2. @StreamListener(Sink.INPUT)

还有这个类:

public interface Sink {
  String INPUT = "input";
  @Input(Sink.INPUT)
  SubscribableChannel input();
}

我们现在一一介绍下:


@EnableBinding 注解,该注解用来指定一个或多个定义了@input,@output注解的接口,以此来实现对消息通道(Channel)的绑定。在上面的例子中,我们通过@EnableBinding(Sink.class),绑定了Sink接口,该接口是Spring Cloud Stream默认实现的对输入消息通道绑定的定义。


@StreamListener(Sink.INPUT),这个注解主要的作用是将被修饰的方法注册为消息中间件上数据流的事件监听器.注册中的属性值对应了监听的消息通道名。在上面的例子中,我们通过这个注解将receive方法注册为input消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发送消息时,receive方法会做出对应的响应动作


经过上面的入门案例后,我们对Spring Cloud Stream算是有了一些基础的了解,现在我们继续深入的学习下Spring Cloud Stream,官网中Spring Cloud Stream的应用模型如下(红字是我对这个模型图的理解):

image.png

我们之前讲了,Spring Cloud Stream引入了发布-订阅,消费者组,分区这个3个概念。我们现在就这三个概念一起探讨下:


1.发布-订阅


Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件后,它会通过共享的Topic主题进行广播,消息消费者在收到消息后,会对它进行自身的业务逻辑处理,这里的Topic其实就是对应了RabbitMQ中的交换机


2.消费组


设想这样一种情况,我们有3台服务器,并且都部署了我们应用,并且这3台服务器都是MQ的消费者。假设我们发送了一条用户确认支付的消息,现在服务器应该做的操作是扣减库存。但是问题来了,我们之前已经知道了发布订阅模式,按照现在这种情况,3台服务器都会收到我们的消息,这样的话,扣减库存这一操作就会被执行3次,这样显然是不合理的。可能有的同学会说,我们只将一台服务器注册为MQ的消费者不就行了吗?但是这样是否又违背了高可用的原则呢?对于这种情况,MQ为我们提供了一种更有效的方法,就是消费组。


我们可以通过spring.cloud.stream.bindings.通道名称.group=groupName(在上面的例子就是spring.cloud.stream.bindings.input.group=groupName)指定组名,在同一组的消费组,只会有一个实例收到消息


3.消息分区


了解了消费组的概念后,我们已经可以做到,在多实例的情况下,可以确保我们的消息只被消费一次。但是,现在我们无法保证消息到底是被哪个消费者消费了,对于同一条消息,它多次到达之后可能是由不同的消费者消费的。但是在有些特定的情况下,我们可能想要一些具有特定特征的消息每次都被同一个消费者消费。这个时候消息生产者可以为消息增加一个固有的特征ID来进行分区,使得拥有这些ID的消息每次能被同一个消费者消费。分区概念的引入就是为了解决这样的问题的:当生产者将消息数据发送到多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者示例接收和处理


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
26天前
|
XML Java 测试技术
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
这篇文章介绍了Spring5框架的三个新特性:支持@Nullable注解以明确方法返回、参数和属性值可以为空;引入函数式风格的GenericApplicationContext进行对象注册和管理;以及如何整合JUnit5进行单元测试,同时讨论了JUnit4与JUnit5的整合方法,并提出了关于配置文件加载的疑问。
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
|
26天前
|
Java
Spring5入门到实战------9、AOP基本概念、底层原理、JDK动态代理实现
这篇文章是Spring5框架的实战教程,深入讲解了AOP的基本概念、如何利用动态代理实现AOP,特别是通过JDK动态代理机制在不修改源代码的情况下为业务逻辑添加新功能,降低代码耦合度,并通过具体代码示例演示了JDK动态代理的实现过程。
Spring5入门到实战------9、AOP基本概念、底层原理、JDK动态代理实现
|
12天前
|
小程序 前端开发 Java
SpringBoot+uniapp+uview打造H5+小程序+APP入门学习的聊天小项目
JavaDog Chat v1.0.0 是一款基于 SpringBoot、MybatisPlus 和 uniapp 的简易聊天软件,兼容 H5、小程序和 APP,提供丰富的注释和简洁代码,适合初学者。主要功能包括登录注册、消息发送、好友管理及群组交流。
33 0
SpringBoot+uniapp+uview打造H5+小程序+APP入门学习的聊天小项目
|
21天前
|
缓存 前端开发 JavaScript
前后端分离 SpringBoot+Vue商城买卖系统通杀版本。大家可以参考学习一下
这篇文章介绍了一个使用SpringBoot+Vue开发的前后端分离商城系统,包括技术架构、开发环境、实现的功能以及项目截图,并展示了普通用户和商家端的功能界面。
前后端分离 SpringBoot+Vue商城买卖系统通杀版本。大家可以参考学习一下
|
21天前
|
Java 数据库连接 Spring
后端框架入门超详细 三部曲 Spring 、SpringMVC、Mybatis、SSM框架整合案例 【爆肝整理五万字】
文章是关于Spring、SpringMVC、Mybatis三个后端框架的超详细入门教程,包括基础知识讲解、代码案例及SSM框架整合的实战应用,旨在帮助读者全面理解并掌握这些框架的使用。
后端框架入门超详细 三部曲 Spring 、SpringMVC、Mybatis、SSM框架整合案例 【爆肝整理五万字】
|
24天前
|
NoSQL Java Redis
Redis6入门到实战------ 八、Redis与Spring Boot整合
这篇文章详细介绍了如何在Spring Boot项目中整合Redis,包括在`pom.xml`中添加依赖、配置`application.properties`文件、创建配置类以及编写测试类来验证Redis的连接和基本操作。
Redis6入门到实战------ 八、Redis与Spring Boot整合
|
26天前
|
SQL 数据库
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
这篇文章是Spring5框架的实战教程,深入讲解了如何使用JdbcTemplate进行数据库的批量操作,包括批量添加、批量修改和批量删除的具体代码实现和测试过程,并通过完整的项目案例展示了如何在实际开发中应用这些技术。
Spring5入门到实战------13、使用JdbcTemplate操作数据库(批量增删改)。具体代码+讲解 【下篇】
|
26天前
|
XML Java 数据格式
Spring5入门到实战------11、使用XML方式实现AOP切面编程。具体代码+讲解
这篇文章是Spring5框架的AOP切面编程教程,通过XML配置方式,详细讲解了如何创建被增强类和增强类,如何在Spring配置文件中定义切入点和切面,以及如何将增强逻辑应用到具体方法上。文章通过具体的代码示例和测试结果,展示了使用XML配置实现AOP的过程,并强调了虽然注解开发更为便捷,但掌握XML配置也是非常重要的。
Spring5入门到实战------11、使用XML方式实现AOP切面编程。具体代码+讲解
|
26天前
|
XML Java Maven
Spring5入门到实战------16、Spring5新功能 --整合日志框架(Log4j2)
这篇文章是Spring5框架的入门到实战教程,介绍了Spring5的新功能——整合日志框架Log4j2,包括Spring5对日志框架的通用封装、如何在项目中引入Log4j2、编写Log4j2的XML配置文件,并通过测试类展示了如何使用Log4j2进行日志记录。
Spring5入门到实战------16、Spring5新功能 --整合日志框架(Log4j2)
|
26天前
|
XML Java 数据库
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式
这篇文章是Spring5框架的实战教程,详细介绍了事务的概念、ACID特性、事务操作的场景,并通过实际的银行转账示例,演示了Spring框架中声明式事务管理的实现,包括使用注解和XML配置两种方式,以及如何配置事务参数来控制事务的行为。
Spring5入门到实战------15、事务操作---概念--场景---声明式事务管理---事务参数--注解方式---xml方式