(3)发送消息,实现ConfirmCallback回调
当消息甚至还没有到达交换机,通过ConfirmCallback来执行回调策略。这时不需要全局唯一的ConfrimCallback回调,可以每次发消息时指定不同的ConfirmCallback回调。因此代码放到单元测试类中即可。
代码如下。
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue() throws InterruptedException { // 1.准备消息 String message = "hello, spring amqp!"; // 2.准备CorrelationData // 2.1.消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 2.2.准备ConfirmCallback correlationData.getFuture().addCallback(result -> { // 判断结果 if (result.isAck()) { // ACK log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId()); } else { // NACK log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId()); // 重发消息 } }, ex -> { // 记录日志 log.error("消息发送失败!", ex); // 重发消息 }); // 3.发送消息 rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData); } }
上面代码中的交换机和queue及二者的绑定,可以手动的在管控台创建,配置(如果有不需要)。
跑以下这段测试代码。结果如下。
在管控台可以看到消息ready数为1。
下面演示下消息根本没有到达交换机,没有返回值的失败情况。将代码中交互机修改成一个不存在的,如aamp
2.3 消息持久化
在创建队列与交换机时可以设置是否持久化,这样不会因为宕机而丢失消息。在管控台上傻瓜式,选择Durable即可。
在代码里实现也特别简单,指定参数即可。我们这里不演示了,直接把代码截图贴给大家。
另外,在spring中队列、交换机和消息默认情况下其实都是持久的哦。
2.4 消费者消息确认
经过生产者消息确认机制和消息持久化,消息一定可以投递到消费者,但是是否消息一定可以被消费还不一定,如果投递时,消费者死了。那就GG了。
因此还需要消费者消息确认机制。
先将配置中acknowledge-mode
设置成none测试下。
logging: pattern: dateformat: HH:mm:ss:SSS level: cn.itcast: debug spring: rabbitmq: host: localhost # rabbitMQ的ip地址 port: 5672 # 端口 username: guest password: guest virtual-host: / listener: simple: prefetch: 1 acknowledge-mode: none
编写下监听类,在代码中模拟下处理过程出现异常的情形。
@Slf4j @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { log.debug("消费者接收到simple.queue的消息:【" + msg + "】"); System.out.println(1 / 0); log.info("消费者处理消息成功!"); } }
如下图,打一个断点。
在管控台确定下队列的情况,可以看到现在有一条消息。
debug消费者。
执行到断点处,查看管控台,队列已经没有消息了。
这说明消息已经投递到消费者进行消费了。接着走,消费者就出异常了,消息丢失。
接着来,将 acknowledge-mode:设置为auto。使用生产者发送一条新消息,再用消费者debug。
管控台如下所示,发现unacked字段是1,说明此时消息已经被消费者获取,但是还没有返回值ack。
如果放开断点直接跑,消费者会一直刷新获取消息。消息会一直重新尝试投递。
这样的方式比直接丢消息要好一点,但是捏,也不完美,如果消费者代码本身没有问题,消费者会最终将消息消费,如果代码本身有问题,就一直跑着。后面会学习更加升级的做法。
2.5 失败重试机制
上述问题,可以设置重试的上限。设置很简单,在消费者的配置文件里配配就好。
读者请自测。
2.6 消费者失败消息处理策略
上面的策略有一个问题,重试多次以后消息就丢了,普通消息无所谓,重要消息那就难受了。
实际上,可以指定消费者失败消息处理策略。
第三种策略显然是最完整的,生产中很推荐。其具体做法参考下图。
做一下,编写ErrorMessageConfig
@Configuration public class ErrorMessageConfig { @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue"); } @Bean public Binding errorMessageBinding(){ return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
测试下,重新跑下消费者。
在管控台可以检测下queue,exchange及其绑定关系。
直接通过管控台给simple.queue发送下消息。
效果是这样的。
在管控台的error.queue中可以看到消息,甚至可以看到具体的异常栈信息!牛啊!