当大量的客户访问请求打到后端,去访问数据库等,瞬间会爆炸的。
经过前端或者其他的方案进行限流外。
还是有大量的请求,这个时候需要削峰了
1. 削峰示例1
application.yaml
rabbitmq: listener: simple: acknowledge-mode: manual #消息手动确认 #削峰限流 prefetch: 1 #消费者每次从队列中取几个消息 concurrency: 1 #消费者数量 max-concurrency: 1 #启动消费者最大数量
生产者:
@RestController public class DirectProducer { @Autowired private RabbitTemplate rabbitTemplate; @PostMapping("direct") public String sendMessage(){ String message = "消息"; for(int i =0;i<10;i++) { rabbitTemplate.convertAndSend("myExchange", "myKey", message+":"+i); System.out.println("生产者发送消息:" + message+":"+i); } return "send message successfully"; } }
消费者:
先设置小一点,然后循环往队列里面放消息,消费的时候延迟2
秒
@Component public class DirectConsumer { // 监听并接收队列中的消息 @RabbitHandler @RabbitListener(queues = "myQueue") public void getMessage(String msg, Channel channel, Message message) throws InterruptedException { Thread.sleep(2000L); System.out.println("消费者接收到了消息:" + msg); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } } }
效果:此时消息会全部放到列队,但是会一条一条消费。简单的实现了削峰处理
2. 削峰示例2
调整消费者的数量:
rabbitmq: listener: simple: acknowledge-mode: manual #消息手动确认 #削峰限流 prefetch: 1 #消费者每次从队列中取几个消息 concurrency: 3 #消费者数量 max-concurrency: 10 #启动消费者最大数量
生产者发出50条message到MQ中
@PostMapping("direct") public String sendMessage(){ String message = "消息"; for(int i =0;i<50;i++) { rabbitTemplate.convertAndSend("myExchange", "myKey", message+":"+i); System.out.println("生产者发送消息:" + message+":"+i); } return "send message successfully"; }
此时就会有3个消费者同时去消费队列中的消息。所以这个消费者数量需要根据实际的情况去设置所能承受的一个值,也就是峰值
3. 重试策略
如果说消费者在消费的过程中失败了,那么会一直消费,一直到成功为止。
但是也可以添加重试策略,比如失败三次就不在消费了。
rabbitmq: listener: simple: acknowledge-mode: manual #消息手动确认 #削峰限流 prefetch: 1 #消费者每次从队列中取几个消息 concurrency: 3 #消费者数量 max-concurrency: 10 #启动消费者最大数量 default-requeue-rejected: true #消息消费失败后,重新进入消费队列中 #重试策略 retry: initial-interval: 1000 #1秒后重试 enabled: true #启用发布重试 max-attempts: 3 #传递消息的最大尝试次数 max-interval: 10000 #尝试的最大时间间隔 multiplier: 1.0 #应用于先前传递重试时间间隔的乘数
消费者处理有异常:
@RabbitHandler @RabbitListener(queues = "myQueue") public void getMessage(String msg, Channel channel, Message message) throws InterruptedException { Thread.sleep(2000L); System.out.println("消费者接收到了消息:" + msg); int a = 10/0; // 有异常 try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } } }
运行效果:
每次接收有异常,会将消息重新放到队列中
重试3次后,还有异常,则不再重试,直接报异常
4. 如何保证rabbitmq消息不丢失
丢失数据场景:
- 生产者没有生产成功,即生产者丢失
rabbitmq
丢失了- 消费端丢失,即消费端没消费成功。
开启confirm
回调,启动手动确定消息消费。
rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-confirm-type: correlated # 生产者发送消息到交换机 消息确认 publisher-returns: true # 交换机绑定到队列 消息确认 listener: simple: acknowledge-mode: manual #消息手动确认 #削峰限流 prefetch: 1 #消费者每次从队列中取几个消息 concurrency: 3 #消费者数量 max-concurrency: 10 #启动消费者最大数量 default-requeue-rejected: true #消息消费失败后,重新进入消费队列中 #重试策略 retry: initial-interval: 1000 #1秒后重试 enabled: true #启用发布重试 max-attempts: 3 #传递消息的最大尝试次数 max-interval: 10000 #尝试的最大时间间隔 multiplier: 1.0 #应用于先前传递重试时间间隔的乘数 template: mandatory: true #在消息没有被路由到合适队列情况下会将消息返还给消息发布者 #当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息, # 那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时, # 出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中, 否则就将消息return给发送者; #: true # 启用强制信息