公众号merlinsea
消息的可靠性投递【生产者端保证】:
1、保证消息在生产者投递出去以后,生产者要知道消息投递成功/失败的状态。
2、rabbitmq中消息投递的过程:生产者 -> 交换机 ->队列 ->消费者。
3、生产者到交换机的可靠性保证:confirmCallback机制保证,即消息从生产者到交换机以后,broker会发送ack信息给到生产者,生产者可以根据ack状态来判断消息是成功发送还是失败发送,然后决定接下来的行为。
4、交换机到队列的可靠性保证: returnCallback机制保证,如果消息的交换机找不到对应的路由key和消息的key进行匹配,将会返回ReturnMessage给生 产者。
使用建议:
开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是⾮常 重要的消息真⼼不建议⽤消息确认机制。
配置文件中开启消息的可靠性保证【默认是不开启】
#消息队列 spring: rabbitmq: host: 39.107.221.166 port: 5672 virtual-host: /dev password: password username: admin #开启消息二次确认,生产者到broker的交换机 publisher-confirm-type: correlated #开启消息二次确认,交换机到队列的可靠性投递 publisher-returns: true #为true,则交换机处理消息到路由失败,则会返回给生产者 template: mandatory: true
生产者端的代码【confirmCallback】:
void testConfirmCallback(){ //RabbitTemplate对象设置回调函数 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback======>"); System.out.println("correlationData======>correlationData="+correlationData); System.out.println("ack======>ack="+ack); System.out.println("cause======>cause="+cause); if(ack){ System.out.println("发送成功"); //更新数据库 消息的状态为成功 TODO }else { System.out.println("发送失败,记录到日志或者数据库"); //更新数据库 消息的状态为失败 TODO } } }); //数据库新增一个消息记录,状态是发送 TODO //发送消息 //template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单"); //发送到一个不存在的exchange上 template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"error", "order.new","新订单"); }
生产者端代码 【returnCallback】
/** * 交换机到队列可靠性投递测试,也是返回给发送者进行处理,但只有发送失败才会触发returnedMessage */ @Test void testReturnCallback(){ template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { //只有投递失败才会回调这个函数 @Override public void returnedMessage(ReturnedMessage returned) { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+returned.toString()); } }); //投递成功 //template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单ReturnsCallback"); //模拟异常,投递的队列不存在 template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "error.order.new","新订单ReturnsCallback"); }