唠一唠 消息可靠性保障&&消息幂等性处理 (RabbitMQ实际应用问题)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 今天来唠一唠🐇消息可靠性保障和幂等性概念🐇及业界主流解决方案, 欢迎伙伴们学习💖💖💖

♨️本篇文章记录的为RabbitMQ知识中企业级项目消息可靠性保障相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛

💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习

今天我们来唠一唠RabbitMQ的相关应用问题, 这在我们设计消息投递方案以及在今后的面试中也是会经常遇到, 今天就对这消息可靠性保障消息幂等性处理两个问题进行分析, 对此疏漏以及不正确的部分也欢迎大佬们指出
@[TOC]

消息可靠性保障

提出需求:如何能够保证消息的 100% 发送成功?

首先大家要明确任何一个系统都不能保证消息的 100% 投递成功,我们是可以保证消息以最高最可靠的发送给目标方。

在RabbitMQ中采用 消息补充机制 来保证消息的可靠性

咱们先来看这张图

在这里插入图片描述
参与部分:消息生产者消息消费者数据库三个队列(Q1、Q2、Q3)、交换机回调检查服务定时检查服务

  1. 消息的生产者将业务数据存到数据库中
  2. 发送消息给 队列Q1
  3. 消息的生产者等待一定的时间后,在发送一个延迟消息给队列 Q3
  4. 消息的消费方监听 Q1 队列消息,成功接收后
  5. 消息的消费方会 发送 一条确认消息给 队列Q2
  6. 回调检查服务监听 队列Q2 发送的确认消息
  7. 回调检查服务接收到确认消息后,将消息写入到 消息的数据库表中
  8. 回调检查服务同时也会监听 队列Q3延迟消息, 如果接收到消息会和数据库比对消息的唯一标识
  9. 如果发现没有接收到确认消息,那么回调检查服务就会远程调用 消息生产者,重新发送消息
  10. 重新执行 2-7 步骤,保证消息的可靠性传输
  11. 如果发送消息和延迟消息都出现异常,定时检查服务会监控 消息库中的消息数据,如果发现不一致的消息然后远程调用消息的生产者重新发送消息。
    在这里插入图片描述

消息幂等性处理

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

场景消费端成功消费信息,但是在ack时发生网络抖动等原因,导致消息已被消费掉,然而还存在于队列当中。

1. 乐观锁机制 保证消息的幂等操作

2.

2. 唯一ID + 指纹码机制

大家肯定懂唯一 ID 的,就不多说了,为什么需要指纹码呢?这是为了应对用户在一瞬间的频繁操作,这个指纹码可能是我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中。

好处就是实现简单,就一个拼接,然后查询判断是否重复。

坏处就是在高并发时,如果是单个数据库就会有写入性能瓶颈

解决方案根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,然后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重一样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提高性能。

@RabbitListener(queues = RabbitMQConfig.ORDER_CREATE_QUEUE)
    public void createOrderListener(OrderInfoIn orderInfoIn,Message message, Channel channel) throws CommonRuntimeException, IOException {
   
   
        //消息生产端加入雪花算法
        //消费端消费前先验证该消息是否被消费
        String messageId = (String)redisTemplate.opsForValue().get(orderInfoIn.getMessageId().toString());
        if(messageId == null){
   
   //保证冥等性
            try {
   
   
                //创建订单
                redisTemplate.opsForValue().set(orderInfoIn.getMessageId().toString(), "ack");
                //成功消费
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e){
   
   
                logger.error("消费创建订单消息失败【】error:"+ message.getBody());
                logger.error("OrderConsumer  handleMessage {} , error:",message,e);
                //处理消息失败,将消息重新放回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            }
        }else{
   
   
            //已消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }

    }

3. 利用Redis的原子性去实现 (Lua脚本)

一些刚接触java的同学可能对幂等性不太清楚。幂等性就是重复消费造成结果不一致。为了保证幂等性,因此消费者消费消息只能消费一次消息。我么可以是用全局的消息id来控制幂等性。当消息被消费了之后我们可以选择缓存保存这个消息id,然后当再次消费的时候,我们可以查询缓存,如果存在这个消息id,我们就不错处理直接return即可。先改造生产者代码,在消息中添加消息id

可以使用这两个命令

sadd 向集合添加一个或多个成员
用法 SADD key value

sismember 判断member是否是set中的成员
用法 sismember key member

sadd key member [member...] : 添加set集合的member
127.0.0.1:6379> sadd set1 value1 value2 value3
(integer) 3
127.0.0.1:6379> sismember set1 value1
(integer) 1
127.0.0.1:6379> sismember set1 value4
(integer) 0

lua脚本中可以这样

local messageId = ARGV[1]
local messageKey = 'messageConfirm:' .. messageId
-- 1.判断消息是否被消费 SISMEMBER messageKey messageId
if(redis.call('sismember', messageKey, messageId) == 1) then
    -- 3.3.存在,说明是重复消费,返回1
    return 0
end
-- 2.重未被消费过(保存消息id)sadd messageKey messageId
redis.call('sadd', messageKey, messageId)
    return 1

先大概说一说可能会有哪些重复消费的问题。

首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是给你保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。

kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性

幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

在这里插入图片描述

如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
299 3
|
5月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
61 1
|
5月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
148 2
|
4月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
392 2
|
5月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
2月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
4月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
4月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
105 1
|
4月前
|
消息中间件 监控 数据安全/隐私保护
RabbitMQ 技术详解与应用指南
**RabbitMQ** 是一个开源消息代理,基于 AMQP 实现,用于应用程序间轻量、可靠的消息传递。本文档详细介绍了 RabbitMQ 的基础,包括**消息、队列、交换机、绑定、路由键和消费者**等概念,以及其**高可靠性、高性能、灵活性、可扩展性和易用性**等特性。RabbitMQ 使用生产者-消费者模型,消息通过交换机路由到队列,消费者接收并处理。文中还涵盖了安装配置的基本步骤和常见应用场景,如**异步处理、消息推送、系统解耦、流量削峰和日志收集**。
233 2
下一篇
无影云桌面