【消息中间件】异常和死信消息们的浪浪山 2

简介: 【消息中间件】异常和死信消息们的浪浪山

(3)发送消息,实现ConfirmCallback回调

当消息甚至还没有到达交换机,通过ConfirmCallback来执行回调策略。这时不需要全局唯一的ConfrimCallback回调,可以每次发消息时指定不同的ConfirmCallback回调。因此代码放到单元测试类中即可。

image.png

代码如下。

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";
        // 2.准备CorrelationData
        // 2.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判断结果
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
    }
}

上面代码中的交换机和queue及二者的绑定,可以手动的在管控台创建,配置(如果有不需要)。


99d1bb830991451cba627b4c861950c9.png


32fb532ad26948dba4090dcde1b0ed8e.png

跑以下这段测试代码。结果如下。



60ceea306f884d9eb0b788d14af66988.png

在管控台可以看到消息ready数为1。

c1266316b666456a93ce9f2dfeb1d75c.png

下面演示下消息根本没有到达交换机,没有返回值的失败情况。将代码中交互机修改成一个不存在的,如aamp

6dab3d6375bf4595845bef305da240a4.png

2.3 消息持久化

在创建队列与交换机时可以设置是否持久化,这样不会因为宕机而丢失消息。在管控台上傻瓜式,选择Durable即可。


bf7e5d2e58424a69b2b7c3a737ec4a41.png

在代码里实现也特别简单,指定参数即可。我们这里不演示了,直接把代码截图贴给大家。22eddd21908541ab910fe00280e85283.png


另外,在spring中队列、交换机和消息默认情况下其实都是持久的哦。

2.4 消费者消息确认

经过生产者消息确认机制和消息持久化,消息一定可以投递到消费者,但是是否消息一定可以被消费还不一定,如果投递时,消费者死了。那就GG了。

因此还需要消费者消息确认机制。


f0efbbf82e1148ffb8b9a8d21cceac7d.png

先将配置中acknowledge-mode设置成none测试下。

logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug
spring:
  rabbitmq:
    host: localhost # rabbitMQ的ip地址
    port: 5672 # 端口
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none

编写下监听类,在代码中模拟下处理过程出现异常的情形。

@Slf4j
@Component
public class SpringRabbitListener {
   @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }
}

如下图,打一个断点。

68c30b2ad1704b0eab8d44412384d1ea.png

在管控台确定下队列的情况,可以看到现在有一条消息。

f2917680c3bb4ff88ed9415bc2f19603.png

debug消费者。


ec96ef0859044cf5a6f49dee989150e4.png

执行到断点处,查看管控台,队列已经没有消息了。

b2c15495f955458187ea04adfc1f582a.png

这说明消息已经投递到消费者进行消费了。接着走,消费者就出异常了,消息丢失。

接着来,将 acknowledge-mode:设置为auto。使用生产者发送一条新消息,再用消费者debug。

管控台如下所示,发现unacked字段是1,说明此时消息已经被消费者获取,但是还没有返回值ack。

2be5eabf868e4525bdafc0331f6a1af0.png

如果放开断点直接跑,消费者会一直刷新获取消息。消息会一直重新尝试投递。

这样的方式比直接丢消息要好一点,但是捏,也不完美,如果消费者代码本身没有问题,消费者会最终将消息消费,如果代码本身有问题,就一直跑着。后面会学习更加升级的做法。

2.5 失败重试机制

上述问题,可以设置重试的上限。设置很简单,在消费者的配置文件里配配就好。


image.png

读者请自测。

2.6 消费者失败消息处理策略

上面的策略有一个问题,重试多次以后消息就丢了,普通消息无所谓,重要消息那就难受了。

实际上,可以指定消费者失败消息处理策略。

image.png

第三种策略显然是最完整的,生产中很推荐。其具体做法参考下图。


image.png

做一下,编写ErrorMessageConfig


@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }
    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

测试下,重新跑下消费者。

在管控台可以检测下queue,exchange及其绑定关系。


65aae6d21ac644948daeab2ad87a015e.png

直接通过管控台给simple.queue发送下消息。




1e34eb82e1f541b6aebbf4b7cf233011.png效果是这样的。

4e7f366adc354931aaed24fea99096f2.png

在管控台的error.queue中可以看到消息,甚至可以看到具体的异常栈信息!牛啊!



ba5148de371947078df22e045bcad010.png


c265ab91f7894bf4ad4d2d181a70495c.png


相关文章
|
消息中间件 存储 NoSQL
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
697 13
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
235 4
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
|
消息中间件 Docker 容器
【消息中间件】异常和死信消息们的浪浪山 3
【消息中间件】异常和死信消息们的浪浪山
|
消息中间件 Java Spring
【消息中间件】异常和死信消息们的浪浪山 1
【消息中间件】异常和死信消息们的浪浪山
|
8月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
120 0
|
7月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1827 0
|
6月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
286 3
|
3月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
132 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
5月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】