【消息中间件】异常和死信消息们的浪浪山 3

简介: 【消息中间件】异常和死信消息们的浪浪山

3.死信消息的浪浪山

3.1 ttl

7a94d05ed294401785a0bd00f2148909.png

举一个栗子,订单超时未支付则自动取消。

3.1.1 设置队列TTL

下面用代码实现下第一种方式吧。

生产者模块新增配置类TTLRabbitConfiguration.

@Configuration
public class TTLRabbitConfiguration {
    public static final String QUEUE_NAME = "ttl_queue_test";
    public static final String EXCHANGE_NAME = "ttl_exchange_test";
    @Bean("ttlExchange")
    public Exchange ttlExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
    }
    @Bean("ttlQueue")
    public Queue ttlQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置5s的过期时间
        args.put("x-message-ttl", 5000);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
    }
  @Bean
    public Binding bindTTLQueueExchange(@Qualifier("ttlQueue")Queue queue, @Qualifier("ttlExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
    }

测试类新增方法。

 @Test
    public void testTTLSend() {
        rabbitTemplate.convertAndSend(TTLRabbitConfiguration.EXCHANGE_NAME, "ttl", "ttl mq haha~~~~");
    }

启动运行,可以看到下图中ttl_queue_test会有标识TTL,有1条消息ready。

eedd43eb3c78487288225d96e89f762a.png

5s以后,ready的消息数会变成0条。

3.1.2 设置消息TTL

配置类

@Configuration
public class MSGTTLRabbitConfig {
    public static final String QUEUE_NAME = "msg_queue";
    public static final String EXCHANGE_NAME = "msg_exchange";
    @Bean("msgExchange")
    public Exchange msgExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    @Bean("msgQueue")
    public Queue msgQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    @Bean
    public Binding bindMSGQueueExchange(@Qualifier("msgQueue")Queue queue, @Qualifier("msgExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("msg_ttl").noargs();
    }
}

测试方法。

   @Test
    public void testMsgTTLSend() {
        MessagePostProcessor postProcessor= new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(MSGTTLRabbitConfig.EXCHANGE_NAME, "msg_ttl", "msg_ttl mq haha~~~~",postProcessor);
    }

读者可以自测。

3.2 死信交换机

如果ttl到达,直接将消息删除,消息永久就消失了。实际上业务往往不会真的删除,而是将过期队列中过期的消息移入死信交换机。


698aa5b37f2e40ad8f35f4ed41708d95.png

0dfa69bec4ae4894a8e6e14975819f3c.png

注意与前面所学的消息失败的异常交换机进行对比。可以发现,异常消息是消费者将其投递到异常队列,而死信消费者可不会管事哦。

8e43d67b036244e6b50dd6d128e14a2d.png



死信交换机当然也可以做异常兜底,但是他还有其它的应用场景。建议异常兜底方案还是使用异常交换机来搞。

由于死信消息会直接由普通队列投递到死信队列,而不是通过consumer,因此,需要在投递时指定死信交换机和对应的路由key。


903be0f0457a4c928807d61af03f6d5e.png




be137e7343784158a5b472bf4afd7eb3.png


3.3 延迟队列


12decaebb2394d84beecbbd790acd556.png手工去实现延迟队列多少有点繁琐,可以使用官方插件来快速做。


a59e99b038a3408bafa8e99ce44e48a9.png


下面来安装下延迟队列插件。


官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq


下面我们会讲解基于Docker来安装RabbitMQ插件,如果您是通过其它方式安装的RabbitMQ,可以选择使用docker再装下或者自己查找对应的插件安装方式。


3.3.1下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html


其中包含各种各样的插件,包括我们要使用的DelayExchange插件:


ac829f6909c844d5a53f4cd5cc6414f0.png

3.3.2 上传插件

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

docker volume inspect mq-plugins

可以得到下面结果:

b50e903b594f430f955bec5540eddcff.png

接下来,将插件上传到这个目录即可:

1f58464c789e4265be8a5e8d70fea7c3.png


3.3.3 安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:

10ffd8cf1ce94abaa8170e1b0b01291d.png

3.3.4 使用插件

在管控台。bdadc0b8f605455dbed3cc3b7fedba67.png

3a17dfe4c9484e9c8f5fe355e673bb35.png

或者也可以在代码中做上面同样的工作。

声明下死信交换机。


a60d4a0ded9b4dabb365f6406763d1ab.png


image.png

代码贴下。

 @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }

发消息。

b905689cc44b489980625bc1cef85f4c.png

代码如下。

   @Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 2.准备CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
        log.info("发送消息成功");
    }

跑一下,会发现一个问题

5a7401ce31ab4c8c8118bf03f2b0cfe1.png

实际上消息只是延迟了,但是异常队列处理了它。因此我们需要对之前的异常策略进行下增强。将生产者的config进行下增强,判断下是否是延迟消息。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}

202cdcf9df9d4236a0051209f80fe144.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 NoSQL
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
667 7
【2021年遇到最头疼的Bug】【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析总结
|
开发框架 Java 中间件
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
223 2
java程序设计与j2ee中间件技术/软件开发技术(I)-实验三-接口、开闭原则和异常
|
消息中间件 Java 测试技术
【消息中间件】异常和死信消息们的浪浪山 2
【消息中间件】异常和死信消息们的浪浪山
|
消息中间件 Java Spring
【消息中间件】异常和死信消息们的浪浪山 1
【消息中间件】异常和死信消息们的浪浪山
|
6月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
110 0
|
5月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1367 0
|
4月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
275 3
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
83 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】