RabbitMQ发布订阅实战-实现延时重试队列

简介: 本文将会讲解如何使用RabbitMQ实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。

RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门教程开始学习。

本文将会讲解如何使用RabbitMQ实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。在这里我会带领大家一步一步的实现一个带有失败重试功能的发布订阅组件,使用该组件后可以非常简单的实现消息的发布订阅,在进行业务开发的时候,业务开发人员可以将主要精力放在业务逻辑实现上,而不需要花费时间去理解RabbitMQ的一些复杂概念。

本文将会持续修正和更新,最新内容请参考我的 GITHUB 上的 程序猿成长计划 项目,欢迎 Star,更多精彩内容请 follow me

概要

我们将会实现如下功能

  • 结合RabbitMQ的Topic模式和Work Queue模式实现生产方产生消息,消费方按需订阅,消息投递到消费方的队列之后,多个worker同时对消息进行消费
  • 结合RabbitMQ的 Message TTLDead Letter Exchange 实现消息的延时重试功能
  • 消息达到最大重试次数之后,将其投递到失败队列,等待人工介入处理bug后,重新将其加入队列消费

具体流程见下图

xxx

  1. 生产者发布消息到主Exchange
  2. 主Exchange根据Routing Key将消息分发到对应的消息队列
  3. 多个消费者的worker进程同时对队列中的消息进行消费,因此它们之间采用“竞争”的方式来争取消息的消费
  4. 消息消费后,不管成功失败,都要返回ACK消费确认消息给队列,避免消息消费确认机制导致重复投递,同时,如果消息处理成功,则结束流程,否则进入重试阶段
  5. 如果重试次数小于设定的最大重试次数(3次),则将消息重新投递到Retry Exchange的重试队列
  6. 重试队列不需要消费者直接订阅,它会等待消息的有效时间过期之后,重新将消息投递给Dead Letter Exchange,我们在这里将其设置为主Exchange,实现延时后重新投递消息,这样消费者就可以重新消费消息
  7. 如果三次以上都是消费失败,则认为消息无法被处理,直接将消息投递给Failed Exchange的Failed Queue,这时候应用可以触发报警机制,以通知相关责任人处理
  8. 等待人工介入处理(解决bug)之后,重新将消息投递到主Exchange,这样就可以重新消费了

技术实现

Linus Torvalds 曾经说过

Talk is cheap. Show me the code

我分别用Java和PHP实现了本文所讲述的方案,读者可以通过参考代码以及本文中的基本步骤来更好的理解

创建Exchange

为了实现消息的延时重试和失败存储,我们需要创建三个Exchange来处理消息。

  • master 主Exchange,发布消息时发布到该Exchange
  • master.retry 重试Exchange,消息处理失败时(3次以内),将消息重新投递给该Exchange
  • master.failed 失败Exchange,超过三次重试失败后,消息投递到该Exchange

所有的Exchange声明(declare)必须使用以下参数

参数 说明
exchange - Exchange名称
type topic Exchange 类型
passive false 如果Exchange已经存在,则返回成功,不存在则创建
durable true 持久化存储Exchange,这里仅仅是Exchange本身持久化,消息和队列需要单独指定其持久化
no-wait false 该方法需要应答确认

Java代码

// 声明Exchange:主体,失败,重试
channel.exchangeDeclare("master", "topic", true);
channel.exchangeDeclare("master.retry", "topic", true);
channel.exchangeDeclare("master.failed", "topic", true);

PHP代码

// 普通交换机
$this->channel->exchange_declare('master', 'topic', false, true, false);
// 重试交换机
$this->channel->exchange_declare('master.retry', 'topic', false, true, false);
// 失败交换机
$this->channel->exchange_declare('master.failed', 'topic', false, true, false);

在RabbitMQ的管理界面中,我们可以看到创建的三个Exchange

-w539

消息发布

消息发布时,使用basic_publish方法,参数如下

参数 说明
message - 发布的消息对象
exchange master 消息发布到的Exchange
routing-key - 路由KEY,用于标识消息类型
mandatory false 是否强制路由,指定了该选项后,如果没有订阅该消息,则会返回路由不可达错误
immediate false 指定了当消息无法直接路由给消费者时如何处理

发布消息时,对于message对象,其内容建议使用json编码后的字符串,同时消息需要标识以下属性

'delivery_mode'=> 2 // 1为非持久化,2为持久化

Java代码

channel.basicPublish(
    "master", 
    routingKey, 
    MessageProperties.PERSISTENT_BASIC, // delivery_mode
    message.getBytes()
);

PHP代码

$msg = new AMQPMessage($message->serialize(), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$this->channel->basic_publish($msg, 'master', $routingKey);

消息订阅

消息订阅的实现相对复杂一些,需要完成队列的声明以及队列和Exchange的绑定。

Declare Queue

对于每一个订阅消息的服务,都必须创建一个该服务对应的队列,将该队列绑定到关注的路由规则,这样之后,消息生产者将消息投递给Exchange之后,就会按照路由规则将消息分发到对应的队列供消费者消费了。

消费服务需要declare三个队列

  • [queue_name] 队列名称,格式符合 [服务名称]@订阅服务标识
  • [queue_name]@retry 重试队列
  • [queue_name]@failed 失败队列

订阅服务标识是客户端自己对订阅的分类标识符,比如用户中心服务(服务名称ucenter),包含两个订阅:user和enterprise,这里两个订阅的队列名称就为 ucenter@userucenter@enterprise,其对应的重试队列为 ucenter@user@retryucenter@enterprise@retry

Declare队列时,参数规定规则如下

参数 说明
queue - 队列名称
passive false 队列不存在则创建,存在则直接成功
durable true 队列持久化
exclusive false 排他,指定该选项为true则队列只对当前连接有效,连接断开后自动删除
no-wait false 该方法需要应答确认
auto-delete false 当不再使用时,是否自动删除

对于@retry重试队列,需要指定额外参数

'x-dead-letter-exchange'    => 'master'
'x-dead-letter-routing-key' => [queue_name],
'x-message-ttl'              => 30 * 1000 // 重试时间设置为30s

这里的两个header字段的含义是,在队列中延迟30s后,将该消息重新投递到x-dead-letter-exchange对应的Exchange中,并且routing key指定为消费队列的名称,这样就可以实现消息只投递给原始出错时的队列,避免消息重新投递给所有关注当前routing key的消费者了。

Java代码

// 声明监听队列
channel.queueDeclare(
    queueName, // 队列名称
    true,      // durable
    false,     // exclusive
    false,     // autoDelete
    null       // arguments
);
channel.queueDeclare(queueName + "@failed", true, false, false, null);

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", exchangeName());
arguments.put("x-message-ttl", 30 * 1000);
arguments.put("x-dead-letter-routing-key", queueName);
channel.queueDeclare(queueName + "@retry", true, false, false, arguments);

PHP代码

$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_declare($failedQueueName, false, true, false, false, false);
$this->channel->queue_declare(
    $retryQueueName, // 队列名称
    false,           // passive
    true,            // durable
    false,           // exclusive
    false,           // auto_delete
    false,           // nowait
    new AMQPTable([
        'x-dead-letter-exchange' => 'master',
        'x-dead-letter-routing-key' => $queueName,
        'x-message-ttl'          => 30 * 1000,
    ])
);

在RabbitMQ的管理界面中,Queues部分可以看到我们创建的三个队列

查看队列的详细信息,我们可以看到 queueName@retry 队列与其它两个队列的不同

-w486

Bind Exchange & Queue

创建完队列之后,需要将队列与Exchange绑定(bind),不同队列需要绑定到之前创建的对应的Exchange上面

Queue Exchange
[queue_name] master
[queue_name]@retry master.retry
[queue_name]@failed master.failed

绑定时,需要提供订阅的路由KEY,该路由KEY与消息发布时的路由KEY对应,区别是这里可以使用通配符同时订阅多种类型的消息。

参数 说明
queue - 绑定的队列
exchange - 绑定的Exchange
routing-key - 订阅的消息路由规则
no-wait false 该方法需要应答确认

Java代码

// 绑定监听队列到Exchange
channel.queueBind(queueName, "master", routingKey);
channel.queueBind(queueName, exchangeName(), queueName);
channel.queueBind(queueName + "@failed", "master.failed", queueName);
channel.queueBind(queueName + "@retry", "master.retry", queueName);

PHP代码

$this->channel->queue_bind($queueName, 'master', $routingKey);
$this->channel->queue_bind($queueName, 'master', $queueName);
$this->channel->queue_bind($retryQueueName, 'master.retry', $queueName);
$this->channel->queue_bind($failedQueueName, 'master.failed', $queueName);

在RabbitMQ的管理界面中,我们可以看到该队列与Exchange和routing-key的绑定关系

-w361

-w405

-w399

消息消费实现

使用 basic_consume 对消息进行消费的时候,需要注意下面参数

参数 说明
queue - 消费的队列名称
consumer-tag - 消费者标识,留空即可
no_local false 如果设置了该字段,服务器将不会发布消息到 发布它的客户端
no_ack false 需要消费确认应答
exclusive false 排他访问,设置后只允许当前消费者访问该队列
nowait false 该方法需要应答确认

消费端在消费消息时,需要从消息中获取消息被消费的次数,以此判断该消息处理失败时重试还是发送到失败队列。

Java代码

protected Long getRetryCount(AMQP.BasicProperties properties) {
    Long retryCount = 0L;
    try {
        Map<String, Object> headers = properties.getHeaders();
        if (headers != null) {
            if (headers.containsKey("x-death")) {
                List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
                if (deaths.size() > 0) {
                    Map<String, Object> death = deaths.get(0);
                    retryCount = (Long) death.get("count");
                }
            }
        }
    } catch (Exception e) {}

    return retryCount;
}

PHP代码

protected function getRetryCount(AMQPMessage $msg): int
{
    $retry = 0;
    if ($msg->has('application_headers')) {
        $headers = $msg->get('application_headers')->getNativeData();
        if (isset($headers['x-death'][0]['count'])) {
            $retry = $headers['x-death'][0]['count'];
        }
    }

    return (int)$retry;
}

消息消费完成后,需要发送消费确认消息给服务端,使用basic_ack方法

ack(delivery-tag=消息的delivery-tag标识)

Java代码

// 消息消费处理
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        ...
        // 注意,由于使用了basicConsume的autoAck特性,因此这里就不需要手动执行
        // channel.basicAck(envelope.getDeliveryTag(), false);
    }
};
// 执行消息消费处理
channel.basicConsume(
    queueName, 
    true, // autoAck
    consumer
);

PHP代码

$this->channel->basic_consume(
    $queueName,
    '',    // customer_tag
    false, // no_local
    false, // no_ack
    false, // exclusive
    false, // nowait
    function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) {
        ...
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
);

如果消息处理中出现异常,应该将该消息重新投递到重试Exchange,等待下次重试

basic_publish(msg, 'master.retry', queueName)
ack(delivery-tag) // 不要忘记了应答消费成功消息

如果判断重试次数大于3次,仍然处理失败,则应该讲消息投递到失败Exchange,等待人工处理

basic_publish(msg, 'master.failed', queueName)
ack(delivery-tag) // 不要忘记了应答消费成功消息

一定不要忘记ack消息,因为重试、失败都是通过将消息重新投递到重试、失败Exchange来实现的,如果忘记ack,则该消息在超时或者连接断开后,会重新被重新投递给消费者,如果消费者依旧无法处理,则会造成死循环。

Java代码

try {
    String message = new String(body, "UTF-8");
    // 消息处理函数
    handler.handle(message, envelope.getRoutingKey());

} catch (Exception e) {
    long retryCount = getRetryCount(properties);
    if (retryCount > 3) {
        // 重试次数大于3次,则自动加入到失败队列
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey()));
        channel.basicPublish('master.failed', queueName, createOverrideProperties(properties, headers), body);
    } else {
        // 重试次数小于3,则加入到重试队列,30s后再重试
        Map<String, Object> headers = properties.getHeaders();
        if (headers == null) {
            headers = new HashMap<>();
        }

        headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey()));
        channel.basicPublish('master.retry', queueName, createOverrideProperties(properties, headers), body);
    }
}

在消息发送到重试队列和失败队列时,我们在消息的headers中添加了一个名为x-orig-routing-key的字段,该字段是实现消息重试的关键字段,由于我们的消息需要在不同的Exchange,Queue之间流转,为了避免消息在重新投递到主Exchange时,被所有的消费者队列重新消费,在重试过程中,我们将消息的routing-key修改为队列名称,直接投递给原始消费消息的队列。x-orig-routing-key用于在之后能够重新获取到最开始的routing-key。

这里的重复消费是指 某个消息被两个消费方A和B消费了,其中A消费失败,B成功,这时候,消息由A消费者重新投递到主Exchange后,B消费队列也会获取到该消息,因此就会导致B消费者重复消费已经消费国的消息

失败任务重试

如果任务重试三次仍未成功,则会被投递到失败队列,这时候需要人工处理程序异常,处理完毕后,需要将消息重新投递到队列进行处理,这里唯一需要做的就是从失败队列订阅消息,然后获取到消息后,清空其application_headers头信息,然后重新投递到master这个Exchange即可。

Java代码

channel.basicPublish(
    'master', 
    envelope.getRoutingKey(),
    MessageProperties.PERSISTENT_BASIC,
    body
);

PHP代码

$msg->set('application_headers', new AMQPTable([]));
$this->channel->basic_publish(
    $msg,
    'master',
    $msg->get('routing_key')
);

怎么使用

队列和Exchange以及发布订阅的关系我们就说完了,那么使用起来是什么效果呢?这里我们以Java代码为例

// 发布消息
Publisher publisher = new Publisher(factory.newConnection(), 'master');
publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create");

// 订阅消息
new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME)
    .init("user-monitor", "user.*")
    .subscribe((message, routingKey) -> {
        // TODO 业务逻辑
        System.out.printf("    <%s> message consumed: %s\n", routingKey, message);
    }
);

总结

使用RabbitMQ时,实现延时重试和失败队列的方式并不仅仅局限于本文中描述的方法,如果读者有更好的实现方案,欢迎拍砖,在这里我也只是抛砖引玉了。本文中讲述的方法还有很多优化空间,读者也可以试着去改进其实现方案,比如本文中使用了三个Exchagne,是否只使用一个Exchange也能实现本文中所讲述的功能。

本文将会持续修正和更新,最新内容请参考我的 GITHUB 上的 程序猿成长计划 项目,欢迎 Star,更多精彩内容请 follow me

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
573 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
8月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
8月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
3月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
1307 1
|
8月前
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
6月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
4645 9
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
8月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署
|
8月前
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
8月前
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
ly~
|
消息中间件 存储 监控
如何查看 RocketMQ 消息的重试次数和时间间隔?
RocketMQ消息重试次数和时间间隔可通过查看消费者和Broker日志、使用管理控制台的监控页面和消息查询功能,或通过分析消费者代码和RocketMQ客户端库代码等方式获取。日志中常有消费失败重试的明确记录,控制台可监控消费情况推断重试状态,代码分析则适合技术用户深入了解。
ly~
1084 3