1、前言
RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失:
- 消息发送到交换机的过程。
- 消息从交换机发送到队列的过程。
而RabbitMQ提供了类似于回调函数的机制来告诉发送方消息是否发送成功。这里针对上述的两种情况,RabbitMQ也是给出了以下的应对策略:
- publisher-confirm:消息到达交换机时会触发。
- publisher-return:到达交换机但是没有路由到队列,会返回ack以及失败原因。
2、publisher-confirm
在SpringBoot项目的properties文件中加上
spring.rabbitmq.publisher-confirm-type=correlated
该配置有三个值:
- none:是禁用发布确认模式,是默认值
- correlated:是发布消息成功到交换器后会触发回调方法
- simple:有两种效果,第一种和correlated值一样会触发回调方法;第二种在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
RabbitMQ的配置类实现ConfirmCallback
/*** @author LoneWalker* @date 2023/4/8* @description*/publicclassRabbitMqConfigimplementsRabbitTemplate.ConfirmCallback { publicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) { RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); //设置给rabbitTemplaterabbitTemplate.setConfirmCallback(this); returnrabbitTemplate; } publicMessageConverterjackson2JsonMessageConverter() { returnnewJackson2JsonMessageConverter(); } publicDirectExchangegetExchange(){ returnnewDirectExchange("directExchange",false,false); } publicQueuegetQueue(){ returnnewQueue("publisher.addUser",true,false,false); } publicBindinggetBinding(DirectExchangeexchange,Queuequeue){ returnBindingBuilder.bind(queue).to(exchange).with("publisher.addUser"); } /*** 消息成功到达交换机会触发* @param correlationData* @param ack* @param cause*/publicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) { if (ack) { log.info("交换机收到消息成功:"+correlationData.getId()); }else { log.error("交换机收到消息失败:"+correlationData.getId() +"原因:"+cause); } } }
而需要这个correlationData是因为确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。所以我们改写一下发送消息的方法:
publicclassPublisherServiceImplimplementsPublisherService{ privatefinalRabbitTemplaterabbitTemplate; publicvoidaddUser(Useruser) { CorrelationDatacorrelationData=newCorrelationData(); correlationData.setId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user,correlationData); } }
然后发送消息:
再模拟一下失败的情况——把交换机名称改成错的:
温馨提示:测试完把交换机名称改回去。
3、publisher-return
在SpringBoot项目的properties文件中添加:
spring.rabbitmq.publisher-returns=true
###消息在没有被队列接收时是否强行退回还是直接丢弃
spring.rabbitmq.template.mandatory=true
RabbitMQ的配置类再实现ReturnsCallback
publicclassRabbitMqConfigimplementsRabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { publicRabbitTemplaterabbitTemplate(CachingConnectionFactoryconnectionFactory) { RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); //设置给rabbitTemplaterabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); rabbitTemplate.setMandatory(true); returnrabbitTemplate; } publicMessageConverterjackson2JsonMessageConverter() { returnnewJackson2JsonMessageConverter(); } publicDirectExchangegetExchange(){ returnnewDirectExchange("directExchange",false,false); } publicQueuegetQueue(){ returnnewQueue("publisher.addUser",true,false,false); } publicBindinggetBinding(DirectExchangeexchange,Queuequeue){ returnBindingBuilder.bind(queue).to(exchange).with("publisher.addUser"); } /*** 消息成功到达交换机会触发* @param correlationData* @param ack* @param cause*/publicvoidconfirm(CorrelationDatacorrelationData, booleanack, Stringcause) { if (ack) { log.info("交换机收到消息成功:"+correlationData.getId()); }else { log.error("交换机收到消息失败:"+correlationData.getId() +"原因:"+cause); } } /*** 消息未成功到达队列会触发* @param returnedMessage*/publicvoidreturnedMessage(ReturnedMessagereturnedMessage) { log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId()); } }
把路由键改为错误的值:
正常来说消息到达交换机就一定可以到达队列,到不了队列基本上就是代码写错了。