Spring cloud stream【消息分组】

简介: 上篇文章我们简单的介绍了stream的使用,发现使用还是蛮方便的,但是在上个案例中,如果有多个消息接收者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决了!

上篇文章我们简单的介绍了stream的使用,发现使用还是蛮方便的,但是在上个案例中,如果有多个消息接收者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决了!


20190628194832868.png


Stream消息分组


 消息分组的作用我们已经介绍了。注意在Stream中处于同一个group中的多个消费者是竞争关系。就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。通过案例我们来演示看看,这里我们会创建3个服务,分别如下

服务 介绍
stream-group-sender 消息发送者服务
stream-group-receiverA 消息接收者服务
stream-group-receiverB 消息接收者服务



1.1 创建项目


20190628195652470.png


1.2 pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.13.RELEASE</version>
  </parent>
  <groupId>com.bobo</groupId>
  <artifactId>stream-group-sender</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Dalston.SR5</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>


1.3 配置文件


 配置中的“outputProduct”可以自定义,但是我们等会在消息接口中要使用到。

spring.application.name=stream-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange  outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct


1.4 发送接口

/**
 * 发送消息的接口
 * @author dengp
 *
 */
public interface ISendeService {
  String OUTPUT="outputProduct";
  /**
   * 指定输出的交换器名称
   * @return
   */
  @Output(OUTPUT)
  SubscribableChannel send();
}


1.5 启动类

@SpringBootApplication
@EnableEurekaClient
// 绑定我们刚刚创建的发送消息的接口类型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {
  public static void main(String[] args) {
    SpringApplication.run(StreamSenderStart.class, args);
  }
}


1.6 创建pojo


 在本案例中我们发送的消息是自定义的对象

package com.bobo.stream.pojo;
import java.io.Serializable;
public class Product implements Serializable{
  private Integer id;
  private String name;
  public Integer getId() {
    return id;
  }
  public void setId(Integer id) {
    this.id = id;
  }
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public Product(Integer id, String name) {
    super();
    this.id = id;
    this.name = name;
  }
  public Product() {
    super();
  }
  @Override
  public String toString() {
    return "Product [id=" + id + ", name=" + name + "]";
  }
}


2.创建stream-group-receiverA服务


2.1 创建项目


20190628200014977.png


2.2 pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.13.RELEASE</version>
  </parent>
  <groupId>com.bobo</groupId>
  <artifactId>stream-group-receiverA</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>Dalston.SR5</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>


2.3 配置文件


 配置文件中配置分组“groupProduct”

spring.application.name=stream-group-receiverA
server.port=9070
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct


2.4 接收消息的接口

/**
 * 接收消息的接口
 * @author dengp
 *
 */
public interface IReceiverService {
  String INPUT = "inputProduct";
  /**
   * 指定接收的交换器名称
   * @return
   */
  @Input(INPUT)
  SubscribableChannel receiver();
}


2.5 消息的具体处理类

/**
 * 具体接收消息的处理类
 * @author dengp
 *
 */
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
  @StreamListener(IReceiverService.INPUT)
  public void onReceiver(Product p){
    System.out.println("消费者A:"+p);
  }
}


注意同样需要添加Product类

package com.bobo.stream.pojo;
import java.io.Serializable;
public class Product implements Serializable{
  private Integer id;
  private String name;
  public Integer getId() {
    return id;
  }
  public void setId(Integer id) {
    this.id = id;
  }
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public Product(Integer id, String name) {
    super();
    this.id = id;
    this.name = name;
  }
  public Product() {
    super();
  }
  @Override
  public String toString() {
    return "Product [id=" + id + ", name=" + name + "]";
  }
}


2.6 启动类

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {
  public static void main(String[] args) {
    SpringApplication.run(StreamReceiverStart.class, args);
  }
}


3.创建stream-group-receiverB服务


 此服务和stream-group-receiverA一样,复制一份只需修改application.properties中的服务名称,端口。我们先将group设置不一样,我们测试来看看

spring.application.name=stream-group-receiverB
server.port=9071
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct1


4.测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
  @Autowired
  private ISendeService sendService;
  @Test
  public void testStream(){
    Product p = new Product(666, "stream test ...");
    // 将需要发送的消息封装为Message对象
    Message message = MessageBuilder
                .withPayload(p)
                .build();
    sendService.send().send(message );
  }
}


在stream-group-receiverA和stream-group-receiverB服务的group不一致的情况下


20190628201243304.png20190628201256344.png20190628201333968.png






改为同组的情况下


20190628201417171.png20190628201439444.png




启动服务,发送数据



20190628201856207.png2019062820193335.png



通过结果可以看到只有其中一个受到消息。避免了消息重复消费。


20190628202504108.png


案例代码github:https://github.com/q279583842q/springcloud-e-book

20190628202229960.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
监控 负载均衡 Java
深入理解Spring Cloud中的服务网关
深入理解Spring Cloud中的服务网关
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
36 6
|
1月前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
58 5
|
1月前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
41 5
|
2月前
|
存储 运维 安全
Spring运维之boot项目多环境(yaml 多文件 proerties)及分组管理与开发控制
通过以上措施,可以保证Spring Boot项目的配置管理在专业水准上,并且易于维护和管理,符合搜索引擎收录标准。
52 2
|
6月前
|
设计模式 监控 Java
解析Spring Cloud中的断路器模式原理
解析Spring Cloud中的断路器模式原理
|
6月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
15092 34
|
6月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
632 15
|
6月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
136 3
|
5月前
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常