Spring cloud stream【入门介绍】

简介: 在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性。

案例代码:https://github.com/q279583842q/springcloud-e-book


 在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性。


一、什么是SpringCloudStream


 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。


二、Stream 解决了什么问题?


 Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程


官网结构图

2.png

截屏2021-12-03 上午9.20.03.png


三、消息驱动入门案例


 我们通过一个入门案例来演示下通过stream来整合RabbitMQ来实现消息的异步通信的效果,所以首先要开启RabbitMQ服务,RabbitMQ不清楚的请参考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404


1.创建消息发送者服务


1.1 创建项目


 创建一个SpringCloud项目



20190628170239341.png

1.2 pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.13.RELEASE</version>
  </parent>
  <groupId>com.bobo</groupId>
  <artifactId>stream-sender</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Dalston.SR5</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

1.3 配置文件


 配置文件中除了必要的服务名称,端口和Eureka的信息外我们还要添加RabbitMQ的注册信息

spring.application.name=stream-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/


1.4 创建消费发送者接口


 创建一个发送消息的接口。具体如下:方法名称自定义,返回类型必须是SubscribableChannel,在Output注解中指定交换器名称。

/**
 * 发送消息的接口
 * @author dengp
 *
 */
public interface ISendeService {
  /**
   * 指定输出的交换器名称
   * @return
   */
  @Output("dpb-exchange")
  SubscribableChannel send();
}


1.5 启动类


 在启动类中通过@EnableBinding注解绑定我们创建的接口类。

@SpringBootApplication
@EnableEurekaClient
// 绑定我们刚刚创建的发送消息的接口类型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {
  public static void main(String[] args) {
    SpringApplication.run(StreamSenderStart.class, args);
  }
}


2.创建消息消费者服务


2.1 创建项目


20190628171139741.png


2.2 pom文件


 添加的依赖和发送消息的服务是一致的

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.13.RELEASE</version>
  </parent>
  <groupId>com.bobo</groupId>
  <artifactId>stream-receiver</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Dalston.SR5</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>


2.3 配置文件


 注意修改服务名称和端口

spring.application.name=stream-receiver
server.port=9061
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/


2.4 创建接收消息的接口


 此接口和发送消息的接口相似,注意使用的是@Input注解。

/**
 * 接收消息的接口
 * @author dengp
 *
 */
public interface IReceiverService {
  /**
   * 指定接收的交换器名称
   * @return
   */
  @Input("dpb-exchange")
  SubscribableChannel receiver();
}


2.5 创建处理消息的处理类


 注意此类并不是实现上面创建的接口。而是通过@EnableBinding来绑定我们创建的接口,同时通过@StreamListener注解来监听dpb-exchange对应的消息服务

/**
 * 具体接收消息的处理类
 * @author dengp
 *
 */
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
  @StreamListener("dpb-exchange")
  public void onReceiver(byte[] msg){
    System.out.println("消费者:"+new String(msg));
  }
}


2.6 启动类


 同样要添加@EnableBinding注解

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {
  public static void main(String[] args) {
    SpringApplication.run(StreamReceiverStart.class, args);
  }
}


3.编写测试代码


 通过单元测试来测试服务。

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import com.bobo.stream.StreamSenderStart;
import com.bobo.stream.sender.ISendeService;
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
  @Autowired
  private ISendeService sendService;
  @Test
  public void testStream(){
    String msg = "hello stream ...";
    // 将需要发送的消息封装为Message对象
    Message message = MessageBuilder
                .withPayload(msg.getBytes())
                .build();
    sendService.send().send(message );
  }
}


启动消息消费者后,执行测试代码。结果如下:


20190628171938899.png


消息接收者获取到了发送者发送的消息,同时我们在RabbitMQ的web界面也可以看到相关的信息


20190628172100738.png


总结


 我们同stream实现了消息中间件的使用,我们发现只有在两处地址和RabbitMQ有耦合,第一处是pom文件中的依赖,第二处是application.properties中的RabbitMQ的配置信息,而在具体的业务处理中并没有出现任何RabbitMQ相关的代码,这时如果我们要替换为Kafka的话我们只需要将这两处换掉即可,即实现了中间件和服务的高度解耦。


相关文章
|
26天前
|
存储 安全 Java
Spring Security 入门
Spring Security 是 Spring 框架中的安全模块,提供强大的认证和授权功能,支持防止常见攻击(如 CSRF 和会话固定攻击)。它通过过滤器链拦截请求,核心概念包括认证、授权和自定义过滤器。配置方面,涉及密码加密、用户信息服务、认证提供者及过滤器链设置。示例代码展示了如何配置登录、注销、CSRF防护等。常见问题包括循环重定向、静态资源被拦截和登录失败未返回错误信息,解决方法需确保路径正确和添加错误提示逻辑。
Spring Security 入门
|
27天前
|
SpringCloudAlibaba Dubbo Java
【SpringCloud Alibaba系列】Dubbo基础入门篇
Dubbo是一款高性能、轻量级的开源Java RPC框架,提供面向接口代理的高性能RPC调用、智能负载均衡、服务自动注册和发现、运行期流量调度、可视化服务治理和运维等功能。
【SpringCloud Alibaba系列】Dubbo基础入门篇
|
14天前
|
人工智能 自然语言处理 Java
Spring Cloud Alibaba AI 入门与实践
本文将介绍 Spring Cloud Alibaba AI 的基本概念、主要特性和功能,并演示如何完成一个在线聊天和在线画图的 AI 应用。
189 7
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
80 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
5月前
|
XML Java 测试技术
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
这篇文章介绍了Spring5框架的三个新特性:支持@Nullable注解以明确方法返回、参数和属性值可以为空;引入函数式风格的GenericApplicationContext进行对象注册和管理;以及如何整合JUnit5进行单元测试,同时讨论了JUnit4与JUnit5的整合方法,并提出了关于配置文件加载的疑问。
Spring5入门到实战------17、Spring5新功能 --Nullable注解和函数式注册对象。整合JUnit5单元测试框架
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
42 6
|
1月前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
68 5
|
1月前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
49 5
|
1月前
|
Java 数据库连接 数据库
从入门到精通---深入剖析Spring DAO
在Java企业级开发中,Spring框架以其强大的功能和灵活性,成为众多开发者的首选。Spring DAO(Data Access Object)作为Spring框架中处理数据访问的重要模块,对JDBC进行了抽象封装,极大地简化了数据访问异常的处理,并能统一管理JDBC事务。本文将从概述、功能点、背景、业务点、底层原理等多个方面深入剖析Spring DAO,并通过多个Java示例展示其应用实践,同时指出对应实践的优缺点。
28 1
|
2月前
|
监控 Java 数据安全/隐私保护
如何用Spring Boot实现拦截器:从入门到实践
如何用Spring Boot实现拦截器:从入门到实践
60 5