RabbitMQ之发布确认高级

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 【1月更文挑战第10天】在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:

文章目录

前言

一、发布确认 整合springboot

1、确认机制方案

2、代码架构图

3、配置文件

4、添加配置类

5、消息生产者

6、回调接口

7、消息消费者

8、结果分析

二、回退消息

1、Mandatory 参数

2、消息生产者代码

3、回调接口

4、结果分析

三、备份交换机

1、代码架构图

2、修改配置类

3、报警消费者

4、测试注意事项

5、结果分析

总结


前言

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:

应 用 [xxx] 在 [08-1516:36:04] 发 生 [ 错误日志异常 ] , alertId=[xxx] 。 由
[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620] 触发。 
应用 xxx 可能原因如下
服务名为: 
异常为: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620, 
产 生 原 因 如 下 :1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: 
Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not 
allow us to use it.||Consumer received fatal=false exception on startup:

一、发布确认 整合springboot

1、确认机制方案

2、代码架构图

3、配置文件

在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated
  • NONE
    禁用发布确认模式,是默认值
  • CORRELATED
    发布消息成功到交换器后会触发回调方法
  • SIMPLE
    经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
    其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
    等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
    waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
spring.rabbitmq.host=192.168.10.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirm-type=correlated

4、添加配置类

@ConfigurationpublicclassConfirmConfig {
publicstaticfinalStringCONFIRM_EXCHANGE_NAME="confirm.exchange";
publicstaticfinalStringCONFIRM_QUEUE_NAME="confirm.queue";
//声明业务 Exchange@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
returnnewDirectExchange(CONFIRM_EXCHANGE_NAME);
    }
// 声明确认队列@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
// 声明确认队列绑定关系@BeanpublicBindingqueueBinding(@Qualifier("confirmQueue") Queuequeue,@Qualifier("confirmExchange") DirectExchangeexchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");
    }
}

5、消息生产者

@RestController@RequestMapping("/confirm")
@Slf4jpublicclassProducer {
publicstaticfinalStringCONFIRM_EXCHANGE_NAME="confirm.exchange";
@AutowiredprivateRabbitTemplaterabbitTemplate;
@AutowiredprivateMyCallBackmyCallBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublicvoidinit(){
rabbitTemplate.setConfirmCallback(myCallBack);
    }
@GetMapping("sendMessage/{message}")
publicvoidsendMessage(@PathVariableStringmessage){
//指定消息 id 为 1CorrelationDatacorrelationData1=newCorrelationData("1");
StringroutingKey="key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
CorrelationDatacorrelationData2=newCorrelationData("2");
routingKey="key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
log.info("发送消息内容:{}",message);
    }
}

6、回调接口

@Component@Slf4jpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback {
/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) {
Stringid=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
        }else{
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
        }
    }
}

7、消息消费者

@Component@Slf4jpublicclassConfirmConsumer {
publicstaticfinalStringCONFIRM_QUEUE_NAME="confirm.queue";
@RabbitListener(queues=CONFIRM_QUEUE_NAME)
publicvoidreceiveMsg(Messagemessage){
Stringmsg=newString(message.getBody());
log.info("接受到队列 confirm.queue 消息:{}",msg);
    }
}

8、结果分析

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

二、回退消息

1、Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

2、消息生产者代码

@Slf4j@ComponentpublicclassMessageProducerimplementsRabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback {
@AutowiredprivateRabbitTemplaterabbitTemplate;
//rabbitTemplate 注入之后就设置该值@PostConstructprivatevoidinit() {
rabbitTemplate.setConfirmCallback(this);
/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(this);
    }
@GetMapping("sendMessage")
publicvoidsendMessage(Stringmessage){
//让消息绑定一个 id 值CorrelationDatacorrelationData1=newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1);
log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");
CorrelationDatacorrelationData2=newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData2);
log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");
}
@Overridepublicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) {
Stringid=correlationData!=null?correlationData.getId() : "";
if (ack) {
log.info("交换机收到消息确认成功, id:{}", id);
        } else {
log.error("消息 id:{}未成功投递到交换机,原因是:{}", id, cause);
        }
    }
@OverridepublicvoidreturnedMessage(Messagemessage, intreplyCode, StringreplyText, Stringexchange, StringroutingKey) {
log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",newString(message.getBody()),replyText, exchange, routingKey);
    }
}

3、回调接口

@Component@Slf4jpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) {
Stringid=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
        }else{
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
        }
    }
//当消息无法路由的时候的回调方法@OverridepublicvoidreturnedMessage(Messagemessage, intreplyCode, StringreplyText, Stringexchange, StringroutingKey) {
log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);
    }
}

4、结果分析

三、备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

1、代码架构图

2、修改配置类

@ConfigurationpublicclassConfirmConfig {
publicstaticfinalStringCONFIRM_EXCHANGE_NAME="confirm.exchange";
publicstaticfinalStringCONFIRM_QUEUE_NAME="confirm.queue";
publicstaticfinalStringBACKUP_EXCHANGE_NAME="backup.exchange";
publicstaticfinalStringBACKUP_QUEUE_NAME="backup.queue";
publicstaticfinalStringWARNING_QUEUE_NAME="warning.queue";
// 声明确认队列@Bean("confirmQueue")
publicQueueconfirmQueue(){
returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
//声明确认队列绑定关系@BeanpublicBindingqueueBinding(@Qualifier("confirmQueue") Queuequeue,@Qualifier("confirmExchange") DirectExchangeexchange){
returnBindingBuilder.bind(queue).to(exchange).with("key1");
    }
//声明备份 Exchange@Bean("backupExchange")
publicFanoutExchangebackupExchange(){
returnnewFanoutExchange(BACKUP_EXCHANGE_NAME);
    }
//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")
publicDirectExchangeconfirmExchange(){
ExchangeBuilderexchangeBuilder=ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                            .durable(true)
//设置该交换机的备份交换机                            .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
    }
// 声明警告队列@Bean("warningQueue")
publicQueuewarningQueue(){
returnQueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
// 声明报警队列绑定关系@BeanpublicBindingwarningBinding(@Qualifier("warningQueue") Queuequeue,@Qualifier("backupExchange") FanoutExchangebackupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);
    }
// 声明备份队列@Bean("backQueue")
publicQueuebackQueue(){
returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
// 声明备份队列绑定关系@BeanpublicBindingbackupBinding(@Qualifier("backQueue") Queuequeue,@Qualifier("backupExchange") FanoutExchangebackupExchange){
returnBindingBuilder.bind(queue).to(backupExchange);
    }
}

3、报警消费者

@Component@Slf4jpublicclassWarningConsumer {
publicstaticfinalStringWARNING_QUEUE_NAME="warning.queue";
@RabbitListener(queues=WARNING_QUEUE_NAME)
publicvoidreceiveWarningMsg(Messagemessage) {
Stringmsg=newString(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
    }
}

4、测试注意事项

重新启动项目的时候需要把原来的 confirm.exchange 删除因为我们修改了其绑定属性,不然报以下错:

5、结果分析

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高


总结

以上就是RabbitMQ之发布确认高级的相关知识,希望对你有所帮助。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题
【RabbitMQ高级篇】消息可靠性问题
175 0
|
7月前
|
消息中间件 存储 Java
RabbitMQ——高级篇
RabbitMQ——高级篇
60 0
|
7月前
|
消息中间件 存储 Java
RabbitMQ高级
RabbitMQ高级
48 0
|
7月前
|
消息中间件 存储 Java
【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级
【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级
|
消息中间件 存储 负载均衡
工作八年?是高级开发?竟然答不出:如何保证RabbitMQ的高可用?
一个8年工作经验的小伙伴,被问到这样一个问题,说如何保证RabbitMQ的高可用。关于这个问题呢,这位小伙伴倒是有个实操经验,就是不知道如何组织语言。所以,当时面试结果不太理想。今天,我给大家分享一下我的理解。
157 0
|
消息中间件 Java 测试技术
【RabbitMQ高级篇】消息可靠性问题(1)(下)
【RabbitMQ高级篇】消息可靠性问题(1)(下)
126 0
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题(1)(上)
【RabbitMQ高级篇】消息可靠性问题(1)
107 0
|
存储 消息中间件
十二、RabbitMQ高级 - 惰性队列
十二、RabbitMQ高级 - 惰性队列
|
消息中间件
十一、RabbitMQ高级 - 延迟队列
十一、RabbitMQ高级 - 延迟队列
|
消息中间件
十、RabbitMQ高级 - 死信交换机
十、RabbitMQ高级 - 死信交换机

相关产品

  • 云消息队列 MQ