唠一唠 消息可靠性保障&&消息幂等性处理 (RabbitMQ实际应用问题)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 今天来唠一唠🐇消息可靠性保障和幂等性概念🐇及业界主流解决方案, 欢迎伙伴们学习💖💖💖

♨️本篇文章记录的为RabbitMQ知识中企业级项目消息可靠性保障相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛

💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习

今天我们来唠一唠RabbitMQ的相关应用问题, 这在我们设计消息投递方案以及在今后的面试中也是会经常遇到, 今天就对这消息可靠性保障消息幂等性处理两个问题进行分析, 对此疏漏以及不正确的部分也欢迎大佬们指出
@[TOC]

消息可靠性保障

提出需求:如何能够保证消息的 100% 发送成功?

首先大家要明确任何一个系统都不能保证消息的 100% 投递成功,我们是可以保证消息以最高最可靠的发送给目标方。

在RabbitMQ中采用 消息补充机制 来保证消息的可靠性

咱们先来看这张图

在这里插入图片描述
参与部分:消息生产者消息消费者数据库三个队列(Q1、Q2、Q3)、交换机回调检查服务定时检查服务

  1. 消息的生产者将业务数据存到数据库中
  2. 发送消息给 队列Q1
  3. 消息的生产者等待一定的时间后,在发送一个延迟消息给队列 Q3
  4. 消息的消费方监听 Q1 队列消息,成功接收后
  5. 消息的消费方会 发送 一条确认消息给 队列Q2
  6. 回调检查服务监听 队列Q2 发送的确认消息
  7. 回调检查服务接收到确认消息后,将消息写入到 消息的数据库表中
  8. 回调检查服务同时也会监听 队列Q3延迟消息, 如果接收到消息会和数据库比对消息的唯一标识
  9. 如果发现没有接收到确认消息,那么回调检查服务就会远程调用 消息生产者,重新发送消息
  10. 重新执行 2-7 步骤,保证消息的可靠性传输
  11. 如果发送消息和延迟消息都出现异常,定时检查服务会监控 消息库中的消息数据,如果发现不一致的消息然后远程调用消息的生产者重新发送消息。
    在这里插入图片描述

消息幂等性处理

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

场景消费端成功消费信息,但是在ack时发生网络抖动等原因,导致消息已被消费掉,然而还存在于队列当中。

1. 乐观锁机制 保证消息的幂等操作

2.

2. 唯一ID + 指纹码机制

大家肯定懂唯一 ID 的,就不多说了,为什么需要指纹码呢?这是为了应对用户在一瞬间的频繁操作,这个指纹码可能是我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中。

好处就是实现简单,就一个拼接,然后查询判断是否重复。

坏处就是在高并发时,如果是单个数据库就会有写入性能瓶颈

解决方案根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,然后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重一样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提高性能。

@RabbitListener(queues = RabbitMQConfig.ORDER_CREATE_QUEUE)
    public void createOrderListener(OrderInfoIn orderInfoIn,Message message, Channel channel) throws CommonRuntimeException, IOException {
   
   
        //消息生产端加入雪花算法
        //消费端消费前先验证该消息是否被消费
        String messageId = (String)redisTemplate.opsForValue().get(orderInfoIn.getMessageId().toString());
        if(messageId == null){
   
   //保证冥等性
            try {
   
   
                //创建订单
                redisTemplate.opsForValue().set(orderInfoIn.getMessageId().toString(), "ack");
                //成功消费
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            } catch (Exception e){
   
   
                logger.error("消费创建订单消息失败【】error:"+ message.getBody());
                logger.error("OrderConsumer  handleMessage {} , error:",message,e);
                //处理消息失败,将消息重新放回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
            }
        }else{
   
   
            //已消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }

    }

3. 利用Redis的原子性去实现 (Lua脚本)

一些刚接触java的同学可能对幂等性不太清楚。幂等性就是重复消费造成结果不一致。为了保证幂等性,因此消费者消费消息只能消费一次消息。我么可以是用全局的消息id来控制幂等性。当消息被消费了之后我们可以选择缓存保存这个消息id,然后当再次消费的时候,我们可以查询缓存,如果存在这个消息id,我们就不错处理直接return即可。先改造生产者代码,在消息中添加消息id

可以使用这两个命令

sadd 向集合添加一个或多个成员
用法 SADD key value

sismember 判断member是否是set中的成员
用法 sismember key member

sadd key member [member...] : 添加set集合的member
127.0.0.1:6379> sadd set1 value1 value2 value3
(integer) 3
127.0.0.1:6379> sismember set1 value1
(integer) 1
127.0.0.1:6379> sismember set1 value4
(integer) 0

lua脚本中可以这样

local messageId = ARGV[1]
local messageKey = 'messageConfirm:' .. messageId
-- 1.判断消息是否被消费 SISMEMBER messageKey messageId
if(redis.call('sismember', messageKey, messageId) == 1) then
    -- 3.3.存在,说明是重复消费,返回1
    return 0
end
-- 2.重未被消费过(保存消息id)sadd messageKey messageId
redis.call('sadd', messageKey, messageId)
    return 1

先大概说一说可能会有哪些重复消费的问题。

首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是给你保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。

kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性

幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

在这里插入图片描述

如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
209 3
|
3月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
36 1
|
6月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
3月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
267 2
|
8月前
|
消息中间件 运维 Java
RocketMQ的常规运维实践应用
RocketMQ的常规运维实践应用
490 1
|
4月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
184 0
|
5月前
|
消息中间件 缓存 NoSQL
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
174 0
|
5月前
|
消息中间件 Java
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
103 0
|
6月前
|
消息中间件 网络协议 物联网
Golang微服务框架Kratos应用MQTT消息队列
MQTT 协议 是由`IBM`的`Andy Stanford-Clark博士`和`Arcom`(已更名为Eurotech)的`Arlen Nipper博士`于 1999 年发明,用于石油和天然气行业。工程师需要一种协议来实现最小带宽和最小电池损耗,以通过卫星监控石油管道。最初,该协议被称为消息队列遥测传输,得名于首先支持其初始阶段的 IBM 产品 MQ 系列。2010 年,IBM 发布了 MQTT 3.1 作为任何人都可以实施的免费开放协议,然后于 2013 年将其提交给结构化信息标准促进组织 (OASIS) 规范机构进行维护。2019 年,OASIS 发布了升级的 MQTT 版本 5。
51 0
|
6月前
|
消息中间件 存储 中间件
Golang微服务框架Kratos应用RabbitMQ消息队列
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
90 1