消息队列-死信队列

简介: 消息队列-死信队列

一、业务场景。

1.1 消息队列中的消息因各种原因未能正常消费的情况。

       在消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如:格式错误、设备故障等,这些未成功处理的消息就被称为“死信”。

       这些消息应该怎么去处理呢?

1.2 订单支付失败处理方案。

       假设你的电商网站有一个订单处理系统,订单从用户提交到最终交付可能需要多个步骤,比如支付、库存检查、物流安排等。在这个过程中,可能会出现各种异常情况,比如用户支付超时、库存不足、物流问题等,导致订单处理失败。

       对这些失败的订单想发起重试并在一定时间内如果还是失败需要记录日志、发送短信、取消订单等操作,应该如何去处理呢?

二、什么是死信队列?

       死信队列(Dead Letter Queue,DLQ)是消息队列(Message Queue)中的一种特殊队列,用于存储无法被消费者成功处理的消息。当消息无法被正常消费时(比如处理超时、处理失败等情况),这些消息会被移动到死信队列中,以便进一步处理或者进行错误处理。

       死信队列通常用于处理异常情况,帮助系统应对消息消费失败、处理超时或出现其他错误的情况。通过将失败的消息移动到死信队列,可以方便地进行后续的重试、错误日志记录、通知相关人员等处理,从而提高系统的可靠性和容错性。

三、SpringBoot中实现死信队列(RabbitMQ)。

业务场景描述

在电子商务平台中,用户在下单后需要一定时间完成支付。如果用户在指定时间内未完成支付,订单应自动取消。为了实现这一功能,可以使用延时队列和死信队列。

  1. 订单支付超时处理:当用户下单后,系统会将一个延迟任务放入延时队列。这个任务会在用户支付超时后触发,取消订单。
  2. 取消订单操作:延时队列的任务执行时,会取消订单,并将订单信息放入死信队列。
  3. 订单异常处理:如果取消订单操作失败,订单信息会继续留在死信队列中,以便后续处理
3.1 延时队列(Delayed Queue)

首先,我们需要一个延时队列来处理支付超时任务。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
 
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME, true);
    }
 
    @Bean
    public DirectExchange delayedExchange() {
        return new DirectExchange(DELAYED_EXCHANGE_NAME);
    }
 
    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY);
    }
}
3.2 死信队列(Dead Letter Exchange)

接下来,我们需要一个死信队列来处理无法取消的订单。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
    // ... 其他配置
 
    public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
    public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
    public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
 
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE_NAME, true);
    }
 
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    }
 
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
    }
}
3.3 生产者(Order Service)

最后,我们需要一个生产者来发送延迟任务和处理死信队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
 
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
 
@Configuration
public class RabbitMQConfig {
    // ... 其他配置
 
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
 
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
 
    // 假设有一个订单服务的ExecutorService,用于处理订单取消操作
    @Bean
    public ExecutorService orderServiceExecutor(RabbitTemplate rabbitTemplate) {
        // 创建订单服务的ExecutorService
        return Executors.newFixedThreadPool(10);
    }
 
    // ... 其他配置
}

接下来,我们需要在订单服务中发送延迟任务和处理死信队列。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.util.Date;
import java.util.concurrent.ExecutorService;
 
@Service
public class OrderService {
 
    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplate;
 
    @Autowired
    private ExecutorService orderServiceExecutor;
 
    @Autowired
    private RabbitAdmin rabbitAdmin;
 
    // 假设有一个支付超时时间,单位为秒
    private static final int PAYMENT_TIMEOUT = 600;
 
    @PostConstruct
    public void init() {
        // 创建一个延时队列,队列中的消息将在指定时间后触发
        rabbitAdmin.declareQueue(new Queue("delayed.queue", true));
        rabbitAdmin.declareQueue(new Queue("dead.letter.queue", true));
    }
 
    public void createOrder(Order order) {
        // 创建一个延迟消息,消息将在指定时间后触发
        rabbitMessagingTemplate.convertAndSend("delayed.exchange", "delayed.routing.key", order, message -> {
            message.getMessageProperties().setHeader("x-delay", PAYMENT_TIMEOUT * 1000);
            message.getMessageProperties().setExpiration(PAYMENT_TIMEOUT * 1000);
            return message;
        });
    }
 
    public void handleDeadLetter(Order order) {
        // 处理死信队列中的订单
        orderServiceExecutor.submit(() -> {
            // 取消订单逻辑
            // ...
            // 将订单信息发送到死信队列
            rabbitMessagingTemplate.convertAndSend("dead.letter.exchange", "dead.letter.routing.key", order);
        });
    }
}

       在这个例子中,我们创建了一个订单服务,该服务负责处理订单支付超时的情况。我们使用RabbitMQ的延时队列来处理支付超时任务,并使用死信队列来处理无法取消的订单。当订单支付超时时,订单服务会创建一个延迟消息,并将其发送到延时队列。当延时队列中的消息被触发时,订单服务会取消订单,并将订单信息发送到死信队列。如果取消订单操作失败,订单信息会继续留在死信队列中,以便后续处理。

四、SpringBoot中实现死信队列(RocketMQ)。

4.1 RocketMQ中的延时队列。

       在 RocketMQ 中,你可以通过设定消息的 delayTimeLevel 属性来创建延时队列。delayTimeLevel 是一个整数,它代表了一个延迟级别。RocketMQ 为这个属性预设了一些固定的值,这些值代表了不同的延迟时间。你可以根据需要选择适合的延迟级别。

       

Level 1: 1s
Level 2: 5s
Level 3: 10s
Level 4: 30s
Level 5: 1m
Level 6: 2m
Level 7: 3m
Level 8: 4m
Level 9: 5m
Level 10: 6m
Level 11: 7m
Level 12: 8m
Level 13: 9m
Level 14: 10m
...

       用户可以根据具体业务需求选择合适的延时级别。例如,如果业务要求延时 1 分钟,则可以将消息的延时级别设置为 5(Level 5 对应 1 分钟)。

4.1.1 发送延迟消息.

        当你发送消息时,你可以指定消息的延迟时间。例如,你可以设置消息在发送后10秒后被消费。

producer.send(new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(), 1000 * 10));
4.1.2 消费延迟消息

       消费者需要设置消费组(Consumer Group)来消费延迟消息。RocketMQ会根据延迟时间将消息发送到对应的延迟队列中。

consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        // 处理延迟消息
        System.out.printf("Receive message [%s] at %s%n", new String(msg.getBody()), new Date());
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
4.1.3 设置消费组

       为了消费延迟消息,消费者需要设置一个消费组。RocketMQ会根据消费组将延迟消息发送到对应的消费者。

consumer.subscribe("TopicTest", "TagA");

4.2 死信队列

consumer.DLQTagFilterThreshold=10000
consumer.DLQTagFilterEnable=true
consumer.DLQName=DLQ_TOPIC
4.2.1 创建死信队列.

       在RocketMQ的配置文件中,你可以创建一个死信队列,用于存储无法正常消费的消息。

consumer.DLQName=DLQ_TOPIC
4.2.2 消费死信队列中的消息
consumer.subscribe("DLQ_TOPIC", "DLQ_TAG");
consumer.registerMessageListener((msgs, context) -> {
    for (MessageExt msg : msgs) {
        // 处理死信队列中的消息
        System.out.printf("Receive dead letter message [%s] at %s%n", new String(msg.getBody()), new Date());
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

       在这个例子中,我们创建了一个消费者实例,并订阅了死信队列DLQ_TOPICDLQ_TAG。当死信队列中有消息时,消费者会消费这些消息。

4.2.3 设置死信队列的消费策略.

       在RocketMQ的配置文件中,你可以设置死信队列的消费策略,如消费线程数、消息重试次数等。

consumer.ConsumeFromWhere=CONSUME_FROM_LAST_OFFSET
consumer.ConsumeThreadMin=1
consumer.ConsumeThreadMax=10
consumer.PullBatchSize=32

       通过以上步骤,你可以在RocketMQ中处理死信队列中的消息。这有助于提高系统的健壮性和消息的可靠性。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
9月前
|
消息中间件 存储 负载均衡
拆解一下消息队列、任务队列、任务调度系统
拆解一下消息队列、任务队列、任务调度系统
240 0
|
2月前
|
消息中间件 NoSQL Java
别再用 Redis List 实现消息队列了,Stream 专为队列而生
别再用 Redis List 实现消息队列了,Stream 专为队列而生
107 0
|
15天前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
1月前
|
消息中间件
【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中
【消息队列开发】 虚拟主机设计——放送消息到队列/交换机中
|
1月前
|
消息中间件 安全
【消息队列开发】 虚拟主机设计——操作队列
【消息队列开发】 虚拟主机设计——操作队列
|
1月前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之如何排查是哪个队列导致的异常TPS增加
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
11月前
|
消息中间件 存储 监控
消息队列与任务队列的区别
消息队列和任务队列是我们在软件系统中常常遇到的概念。尽管它们的名字相似,但实际上它们有不同的用途和工作原理。本文将介绍消息队列和任务队列之间的区别。
430 0
|
消息中间件
消息队列:第四章:延迟检查队列
消息队列:第四章:延迟检查队列
135 0
消息队列:第四章:延迟检查队列
|
消息中间件 数据安全/隐私保护
它让你1小时精通RabbitMQ消息队列(新增死信处理)
它让你1小时精通RabbitMQ消息队列(新增死信处理)
315 0
它让你1小时精通RabbitMQ消息队列(新增死信处理)
|
消息中间件 NoSQL Java
别再用 Redis List 实现消息队列了,Stream 专为队列而生
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
639 0
别再用 Redis List 实现消息队列了,Stream 专为队列而生

热门文章

最新文章