上篇文章我们简单的介绍了stream的使用,发现使用还是蛮方便的,但是在上个案例中,如果有多个消息接收者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决了!
Stream消息分组
消息分组的作用我们已经介绍了。注意在Stream中处于同一个group中的多个消费者是竞争关系。就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。通过案例我们来演示看看,这里我们会创建3个服务,分别如下
服务 | 介绍 |
stream-group-sender | 消息发送者服务 |
stream-group-receiverA | 消息接收者服务 |
stream-group-receiverB | 消息接收者服务 |
1.1 创建项目
1.2 pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.13.RELEASE</version> </parent> <groupId>com.bobo</groupId> <artifactId>stream-group-sender</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
1.3 配置文件
配置中的“outputProduct”可以自定义,但是我们等会在消息接口中要使用到。
spring.application.name=stream-sender server.port=9060 #设置服务注册中心地址,指向另一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 链接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 对应 MQ 是 exchange outputProduct自定义的信息 spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
1.4 发送接口
/** * 发送消息的接口 * @author dengp * */ public interface ISendeService { String OUTPUT="outputProduct"; /** * 指定输出的交换器名称 * @return */ @Output(OUTPUT) SubscribableChannel send(); }
1.5 启动类
@SpringBootApplication @EnableEurekaClient // 绑定我们刚刚创建的发送消息的接口类型 @EnableBinding(value={ISendeService.class}) public class StreamSenderStart { public static void main(String[] args) { SpringApplication.run(StreamSenderStart.class, args); } }
1.6 创建pojo
在本案例中我们发送的消息是自定义的对象
package com.bobo.stream.pojo; import java.io.Serializable; public class Product implements Serializable{ private Integer id; private String name; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product(Integer id, String name) { super(); this.id = id; this.name = name; } public Product() { super(); } @Override public String toString() { return "Product [id=" + id + ", name=" + name + "]"; } }
2.创建stream-group-receiverA服务
2.1 创建项目
2.2 pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.13.RELEASE</version> </parent> <groupId>com.bobo</groupId> <artifactId>stream-group-receiverA</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.3 配置文件
配置文件中配置分组“groupProduct”
spring.application.name=stream-group-receiverA server.port=9070 #设置服务注册中心地址,指向另一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 链接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 对应 MQ 是 exchange 和消息发送者的 交换器是同一个 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义 spring.cloud.stream.bindings.inputProduct.group=groupProduct
2.4 接收消息的接口
/** * 接收消息的接口 * @author dengp * */ public interface IReceiverService { String INPUT = "inputProduct"; /** * 指定接收的交换器名称 * @return */ @Input(INPUT) SubscribableChannel receiver(); }
2.5 消息的具体处理类
/** * 具体接收消息的处理类 * @author dengp * */ @Service @EnableBinding(IReceiverService.class) public class ReceiverService { @StreamListener(IReceiverService.INPUT) public void onReceiver(Product p){ System.out.println("消费者A:"+p); } }
注意同样需要添加Product类
package com.bobo.stream.pojo; import java.io.Serializable; public class Product implements Serializable{ private Integer id; private String name; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product(Integer id, String name) { super(); this.id = id; this.name = name; } public Product() { super(); } @Override public String toString() { return "Product [id=" + id + ", name=" + name + "]"; } }
2.6 启动类
@SpringBootApplication @EnableEurekaClient @EnableBinding(value={IReceiverService.class}) public class StreamReceiverStart { public static void main(String[] args) { SpringApplication.run(StreamReceiverStart.class, args); } }
3.创建stream-group-receiverB服务
此服务和stream-group-receiverA一样,复制一份只需修改application.properties中的服务名称,端口。我们先将group设置不一样,我们测试来看看
spring.application.name=stream-group-receiverB server.port=9071 #设置服务注册中心地址,指向另一个注册中心 eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/ #rebbitmq 链接信息 spring.rabbitmq.host=192.168.88.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=dpb spring.rabbitmq.password=123 spring.rabbitmq.virtualHost=/ # 对应 MQ 是 exchange 和消息发送者的 交换器是同一个 spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct # 具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义 spring.cloud.stream.bindings.inputProduct.group=groupProduct1
4.测试代码
@RunWith(SpringRunner.class) @SpringBootTest(classes=StreamSenderStart.class) public class StreamTest { @Autowired private ISendeService sendService; @Test public void testStream(){ Product p = new Product(666, "stream test ..."); // 将需要发送的消息封装为Message对象 Message message = MessageBuilder .withPayload(p) .build(); sendService.send().send(message ); } }
在stream-group-receiverA和stream-group-receiverB服务的group不一致的情况下
改为同组的情况下
启动服务,发送数据
通过结果可以看到只有其中一个受到消息。避免了消息重复消费。
案例代码github:https://github.com/q279583842q/springcloud-e-book