八、发布确认高级
correlated
adj. 有相互关系的
v. (使)相关联;(使)相互对照(correlate 的过去分词)
如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?
8.1 发布确认 springboot 版本
8.1.1确认机制方案
8.1.2 代码架构图
8.1.3 配置文件
在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
- NONE
禁用发布确认模式,是默认值 - CORRELATED
发布消息成功到交换器后会触发回调方法 - SIMPLE
经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
spring: rabbitmq: host: 192.168.42.96 port: 5672 username: admin password: 123 # 开启了交换机回调 publisher-confirm-type: correlated # 开启回退消息 publisher-returns: true
8.1.4 配置类代码
package com.caq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { // 交换机 public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange_name"; // 队列 public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; // routingKey public static final String CONFIRM_ROUTING_KEY = "key1"; //声明交换机 @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } //交换机创建通过new的形式,队列的创建通过对象的方法QueueBuilder.durable //声明队列 @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //绑定 @Bean public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
8.1.5 消息生产者
package com.caq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { // 交换机 public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange_name"; // 队列 public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; // routingKey public static final String CONFIRM_ROUTING_KEY = "key1"; //声明交换机 @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } //交换机创建通过new的形式,队列的创建通过对象的方法QueueBuilder.durable //声明队列 @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //绑定 @Bean public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
8.1.6 回调接口
交换机确认回调方法 参数介绍:
1、发消息 交换机接收到了 回调
CorrelationData 保存回调消息的ID及相关信息
交换机收到消息 ack = true
cause null
2、发消息 交换机没有接收到 回调
CorrelationData 保存回调消息的ID及相关信息
交换机收到消息 ack = false
cause 失败的原因
correlation 相互关系
package com.caq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component public class MycallBack implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ //注入 rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //三元运算 String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到了Id为:{}的信息", id); } else { log.info("交换机还未收到了Id为:{}的信息,由于原因:{}", id, cause); } } }
8.1.7 消息消费者
package com.caq.consumer; import com.caq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class Consumer { //接收消息 @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMessage(Message message) { String msg = new String(message.getBody()); log.info("接受到的队列confirm,queue消息{}",msg); } }
正常情况:发送消息到交换机
如果交换机收不到消息呢?
怎么回调呢?如果消息发不出去,那就给我返回过来然后保存下来
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。