一、简介
按照现有rabbitMQ的相关知识,⽣产者会发送消息到达消息服务器。但是在实际⽣产环境下,消息⽣产者发送的消息很有可能当到达了消息服务器之后,由于消息服务器的问题导致消息丢失,如宕机。因为消息服务器默认会将消息存储在内存中。⼀旦消息服务器宕机,则消息会产⽣丢失。因此要保证⽣产者的消息不丢失,要开始持久化策略。
rabbitMQ持久化: 1. 交换机持久化 2. 队列持久化 3. 消息持久化
RabbitMQ数据保护机制:
事务机制
事务机制采⽤类数据库的事务机制进⾏数据保护,当消息到达消息服务器,⾸先会开启⼀个事务,接着进⾏数据磁盘持久化,只有持久化成功才会进⾏事务提交,向消息⽣产者返回成功通知,消息⽣产者⼀旦接收成功通知则不会再发送此条消息。当出现异常,则返回失败通知.消息⽣产者⼀旦接收失败通知,则继续发送该条消息。 事务机制虽然能够保证数据安全,但是此机制采⽤的是同步机制,会产⽣系统间消息阻塞,影响整个系统的消息吞吐量。从⽽导致整个系统的性能下降,因此不建议使⽤。
confirm机制
confirm模式需要基于channel进⾏设置, ⼀旦某条消息被投递到队列之后,消息队列就会发送⼀个确认信息给⽣产者,如果队列与消息是可持久化的, 那么确认消息会等到消息成功写⼊到磁盘之后发出。 confirm的性能⾼,主要得益于它是异步的.⽣产者在将第⼀条消息发出之后等待确认消息的同时也可以继续发送后续的消息.当确认消息到达之后,就可以通过回调⽅法处理这条确认消息. 如果MQ服务宕机了,则会返回nack消息. ⽣产者同样在回调⽅法中进⾏后续处理。
二、必达消息(confirm
)
1、原理
基于实现的ConfirmCallback
接口,假如RabbitMQ收到消息后,会回调实现这个接口的类。
@FunctionalInterface public interface ConfirmCallback { void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3); }
2、pom.xml
# 开启confirm机制 spring.rabbitmq.publisher-returns=true
3、配置类
@Configuration public class RabbitMQConfig { //声明队列,并开启持久化 @Bean public Queue queue() { /** * 第⼀个参数:队列名称 * 第⼆个参数:是否开启队列持久化 */ return new Queue("seckill_order", true); } }
4、业务实现
@Override public boolean add(Long id, String time, String username) { //发送消息(消息必达) customMessageSender.sendMessage("", "seckill_order", JSON.toJSONString(seckillOrder)); }
5、必达工具类
import com.alibaba.fastjson.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * ⾃定义消息发送类 * 增强RabbitTemplate */ @Component public class CustomMessageSender implements RabbitTemplate.ConfirmCallback { static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class); private static final String MESSAGE_CONFIRM_ = "message_confirm_"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; /** * 构造⽅法 * * @param rabbitTemplate */ public CustomMessageSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } /** * ⽣产者通知回调⽅法 * * @param correlationData 唯⼀标识 * @param ack 成功/失败 * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { //返回成功通知 //删除redis中的相关数据 redisTemplate.delete(correlationData.getId()); redisTemplate.delete(MESSAGE_CONFIRM_ + correlationData.getId()); } else { //返回失败通知 Map<String, String> map = (Map<String, String>) redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_ + correlationData.getId()); String exchange = map.get("exchange"); String routingKey = map.get("routingKey"); String sendMessage = map.get("sendMessage"); //重新发送 rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(sendMessage)); } } /** * ⾃定义发送⽅法 * * @param exchange 交换器 * @param routingKey 路由键 * @param message 消息内容 */ public void sendMessage(String exchange, String routingKey, String message) { //设置消息唯⼀标识并存⼊缓存 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); redisTemplate.opsForValue().set(correlationData.getId(), message); //本次发送到相关元信息存⼊缓存 Map<String, String> map = new HashMap<>(); map.put("exchange", exchange); map.put("routingKey", routingKey); map.put("sendMessage", message); redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_ + correlationData.getId(), map); //携带唯⼀标识发送消息 rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); } }
三、成功后回执
1、原理
⾃动应答机制: 消息消费者成功接收到消息后,会进⾏消费并⾃动通知消息服务器将该条消息删除。
手动应答机制: 只有在消息消费者将消息处理完,才会通知消息服务器将该条消息删除
消费者发起成功通知
- DeliveryTag: 消息的唯⼀标识 channel+消息编号
- 第⼆个参数:是否开启批量处理。
false
:不开启批量
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
返回失败通知
- 第⼀个boolean:
true
所有消费者都会拒绝这个消息。false
代表只有当前消费者拒绝。 - 第⼆个boolean:
true
当前消息会进⼊到死信队列。false
重新回到原有队列中,默认回到头部。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
2、pom.xml
# 关闭自动提交 spring.rabbitmq.listener.simple.acknowledge-mode=manual
3、成功与失败处理机制
package com.lydms.demorabbitmq.client; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class SecKillOrderListener { @RabbitListener(queues = "cancel_order_queue") public void receiveSecKillOrderMessage(Channel channel, Message message) { boolean result = true; if (result) { /** * 更新数据库操作成功 * 消费者发起成功通知 * DeliveryTag: 消息的唯⼀标识 channel+消息编号 * 第⼆个参数:是否开启批量处理 false:不开启批量 */ try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } else { /** * 返回失败通知 * 第⼀个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝 * 第⼆个boolean true当前消息会进⼊到死信队列,false重新回到原有队列中,默认回到头部 */ try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { e.printStackTrace(); } } } }
四、流量削峰
在秒杀这种⾼并发的场景下,每秒都有可能产⽣⼏万甚⾄⼗⼏万条消息,如果没有对消息处理量进⾏任何限制的话,很有可能因为过多的消息堆积从⽽导致消费者宕机的情况。因此官⽹建议对每⼀个消息消费者都设置处理消息总数(消息抓取总数)。
消息抓取总数的值,设置过⼤或者过⼩都不好,过⼩的话,会导致整个系统消息吞吐能⼒下降,造成性能浪费。过⼤的话,则很有可能导致消息过多,导致整个系统OOM(out of memory)内存溢出。因此官⽹建议每⼀个消费者将该值设置在100-300之间。
@RabbitListener(queues = "cancel_order_queue") public void receiveSecKillOrderMessage(Channel channel, Message message) { // 预抓取总数 try { channel.basicQos(300); } catch (IOException e) { e.printStackTrace(); } }