RabbitMQ消息重复消费场景及解决方案

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,经济版 1GB 1个月
简介: RabbitMQ消息重复消费场景及解决方案

这里介绍一下RabbitMQ重复消费的场景,以及如何解决消息重复消费的问题。

注:本文只做粗略逻辑实现借鉴,实际业务场景需根据实际情况再做细化处理。

目录
消息重复消费

MQ的一条消息被消费者消费了多次

重复消费场景重现测试

如何解决消息重复消费的问题

编码

解决消息重复消费测试

消息重复消费
什么是消息重复消费?

首先我们来看一下消息的传输流程。消息生产者-->MQ-->消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在两个阶段:

1、生产者多发送了消息给MQ;

2、MQ的一条消息被消费者消费了多次。

第一种场景很好控制,只要保证消息生成者不重复发送消息给MQ即可。

我们着重来看一下第二个场景。

MQ的一条消息被消费者消费了多次
在保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

重复消费场景重现测试
1、消息发送者发送1万条消息给MQ

@GetMapping("/rabbitmq/sendToClient")
public String sendToClient() {

String message = "server message sendToClient";
for (int i = 0; i < 10000; i++) {
    amqpTemplate.convertAndSend("queueName3",message+": "+i);

}
return message;

}
启动消息发送服务,调用接口发送消息,mq成功收到1万条消息。

2、消费者监听消费消息

@RabbitListener(queues = "queueName3")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(String message) {

System.out.println("接收者2--接收到queueName3队列的消息为:"+message);

}
启动消费者服务,然后中断消费服务,此时消费到了第7913个消息:

78c1aeb4f8047b12420c85d8f08afe26.png此时查看MQ的消息,现在MQ队列中应该还有2087个消息,但还有2088个消息,说明最后一个消息被消费了没有被MQ服务确认。

d7cdc7b59d93c73390c8688def86e616.png再次启动消费者服务,消息从第7913个消息开始消费,而不是第7914个消息

e6992fea7e7ba8deb163a33879f9c146.png
如何解决消息重复消费的问题
为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下:

消费者监听到消息后获取id,先去查询这个id是否存中

如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)

如果存在则丢弃此消息

编码
消息生产者服务:

/**

  • @Description: 发送消息 模拟消息重复消费
  • 消息重复消费情景:消息生产者已把消息发送到mq,消息消费者在消息消费的过程中突然因为网络原因或者其他原因导致消息消费中断
  • 消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,
  • 为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息
  • @param:
  • @return: java.lang.String
  • @Author: chenping

*/
@GetMapping("/rabbitmq/sendMsgNoRepeat")
public String sendMsgNoRepeat() {

String message = "server message sendMsgNoRepeat";
for (int i = 0; i <10000 ; i++) {
    Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
    amqpTemplate.convertAndSend("queueName4",msg);
}
return message;

}

消息消费者服务:

方案1:将id存入string中(单消费者场景):

这样一个队列,redis数据只有一条,每次消息过来都覆盖之前的消息,但是消费者多的情况不适用,可能会存在问题--一个消息被多个消费者消费

@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage(Message message) throws UnsupportedEncodingException {

String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");

String messageRedisValue = redisUtil.get("queueName4","");
if (messageRedisValue.equals(messageId)) {
    return;
}
System.out.println("消息:"+msg+", id:"+messageId);

redisUtil.set("queueName4",messageId);//以队列为key,id为value

}
方案2:将id存入list中(多消费者场景)

这个方案可以解决多消费者的问题,但是随着mq的消息增加,redis数据越来越多,需要去清除redis数据。

@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {

String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");

List<String> messageRedisValue = redisUtil.lrange("queueName4");
if (messageRedisValue.contains(messageId)) {
    return;
}
System.out.println("消息:"+msg+", id:"+messageId);

redisUtil.lpush("queueName4",messageId);//存入list

}
方案3:将id以key值增量存入string中并设置过期时间:

以消息id为key,消息内容为value存入string中,设置过期时间(可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)

@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {

String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");

String messageRedisValue = redisUtil.get(messageId,"");
if (msg.equals(messageRedisValue)) {
    return;
}
System.out.println("消息:"+msg+", id:"+messageId);

redisUtil.set(messageId,msg,10L);//以id为key,消息内容为value,过期时间10分钟

}
解决消息重复消费测试:
首先,启动消息生成服务,发送一万条消息:

2ed5b7b45f3fd1148758cb770b31a6d6.png启动消息消费服务,然后中断服务,消费了1934条消息:

4a432e8dc65095b12db9e6480d3616c8.png查看未被消费的消息条数为8067条,多了一条(10000-1934=8066 ):

9af2fb311bc6374f5c682911f312bdc7.png再次启动消费者服务,消费者舍弃了已被消费的第1934条消息

1d589a9d0344618094c67067837e6c47.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88459 0
|
8月前
|
消息中间件 弹性计算 Java
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践
|
2月前
|
消息中间件 SQL 容灾
深度剖析 RocketMQ 5.0,消息进阶:如何支撑复杂业务消息场景?
本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。
108320 287
|
16天前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
|
1月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
2月前
|
消息中间件 Cloud Native 物联网
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?
本文主要介绍业务消息的应用解耦场景,具体解耦什么? RocketMQ 在业务消息场景的基础特性。业界那么多消息队列能实现应用解耦,RocketMQ 在基础特性上有哪些增强?
125394 2
深度剖析 RocketMQ 5.0,消息基础:RocketMQ 在业务消息场景的基础优势是什么?
|
2月前
|
消息中间件 人工智能 Java
RocketMQ重复消费的症状以及解决方案
RocketMQ重复消费的症状以及解决方案
|
2月前
|
消息中间件 存储 Cloud Native
深度剖析 RocketMQ 5.0,架构解析:云原生架构如何支撑多元化场景?
了解 RocketMQ 5.0 的核心概念和架构概览;然后我们会从集群角度出发,从宏观视角学习 RocketMQ 的管控链路、数据链路、客户端和服务端如何交互;学习 RocketMQ 如何实现数据的存储,数据的高可用,如何利用云原生存储进一步提升竞争力。
140477 3
|
2月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
515 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
2月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流存储:流场景的诉求是什么?
本文将从使用的角度出发,来更详细的展示一下流存储的场景,看看它和业务消息的场景有哪些区别。 RocketMQ 5.0 面向流存储的场景,提供了哪些特性。再结合两个数据集成的案例,来帮助大家了解流存储的用法。
3437 2