♨️本篇文章记录的为RabbitMQ知识中
企业级项目
中消息可靠性保障相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习
今天我们来唠一唠RabbitMQ的相关应用问题, 这在我们设计消息投递方案
以及在今后的面试
中也是会经常遇到, 今天就对这消息可靠性保障
和消息幂等性处理
两个问题进行分析, 对此疏漏以及不正确的部分也欢迎大佬们指出
@[TOC]
消息可靠性保障
提出需求:如何能够保证消息的 100% 发送成功?
首先大家要明确任何一个系统都不能保证消息的 100% 投递成功,我们是可以保证消息以最高最可靠的发送给目标方。
在RabbitMQ中采用 消息补充机制 来保证消息的可靠性
咱们先来看这张图
参与部分:消息生产者
、消息消费者
、数据库
、三个队列
(Q1、Q2、Q3)、交换机
、回调检查服务
、定时检查服务
- 消息的生产者将业务数据存到数据库中
- 发送消息给 队列Q1
- 消息的生产者等待一定的时间后,在发送一个延迟消息给队列 Q3
- 消息的消费方监听 Q1 队列消息,成功接收后
- 消息的消费方会 发送 一条确认消息给 队列Q2
- 回调检查服务监听 队列Q2 发送的确认消息
- 回调检查服务接收到确认消息后,将消息写入到 消息的数据库表中
- 回调检查服务同时也会监听 队列Q3延迟消息, 如果接收到消息会和数据库比对消息的唯一标识
- 如果发现没有接收到确认消息,那么回调检查服务就会远程调用 消息生产者,重新发送消息
- 重新执行 2-7 步骤,保证消息的可靠性传输
- 如果发送消息和延迟消息都出现异常,定时检查服务会监控 消息库中的消息数据,如果发现不一致的消息然后远程调用消息的生产者重新发送消息。
消息幂等性处理
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
场景:消费端成功消费信息,但是在ack时发生网络抖动等原因,导致消息已被消费掉,然而还存在于队列当中。
1. 乐观锁机制 保证消息的幂等操作
2. 唯一ID + 指纹码机制
大家肯定懂唯一 ID 的,就不多说了,为什么需要指纹码呢?这是为了应对用户在一瞬间的频繁操作,这个指纹码可能是我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中。
好处就是实现简单,就一个拼接,然后查询判断是否重复。
坏处就是在高并发时,如果是单个数据库就会有写入性能瓶颈
解决方案 :根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,然后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重一样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提高性能。
@RabbitListener(queues = RabbitMQConfig.ORDER_CREATE_QUEUE)
public void createOrderListener(OrderInfoIn orderInfoIn,Message message, Channel channel) throws CommonRuntimeException, IOException {
//消息生产端加入雪花算法
//消费端消费前先验证该消息是否被消费
String messageId = (String)redisTemplate.opsForValue().get(orderInfoIn.getMessageId().toString());
if(messageId == null){
//保证冥等性
try {
//创建订单
redisTemplate.opsForValue().set(orderInfoIn.getMessageId().toString(), "ack");
//成功消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e){
logger.error("消费创建订单消息失败【】error:"+ message.getBody());
logger.error("OrderConsumer handleMessage {} , error:",message,e);
//处理消息失败,将消息重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}else{
//已消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
3. 利用Redis的原子性去实现 (Lua脚本)
一些刚接触java的同学可能对幂等性不太清楚。幂等性就是重复消费造成结果不一致。为了保证幂等性,因此消费者消费消息只能消费一次消息。我么可以是用全局的消息id来控制幂等性。当消息被消费了之后我们可以选择缓存保存这个消息id,然后当再次消费的时候,我们可以查询缓存,如果存在这个消息id,我们就不错处理直接return即可。先改造生产者代码,在消息中添加消息id
可以使用这两个命令
sadd
向集合添加一个或多个成员
用法SADD
key
value
sismember 判断member是否是set中的成员
用法 sismember
key
member
sadd key member [member...] : 添加set集合的member
127.0.0.1:6379> sadd set1 value1 value2 value3
(integer) 3
127.0.0.1:6379> sismember set1 value1
(integer) 1
127.0.0.1:6379> sismember set1 value4
(integer) 0
lua脚本中可以这样
local messageId = ARGV[1]
local messageKey = 'messageConfirm:' .. messageId
-- 1.判断消息是否被消费 SISMEMBER messageKey messageId
if(redis.call('sismember', messageKey, messageId) == 1) then
-- 3.3.存在,说明是重复消费,返回1
return 0
end
-- 2.重未被消费过(保存消息id)sadd messageKey messageId
redis.call('sadd', messageKey, messageId)
return 1
先大概说一说可能会有哪些重复消费的问题。
首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是给你保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。
kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。
但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性
幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习