一文读懂RocketMQ的高可用机制——消息消费高可用

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 一文读懂RocketMQ的高可用机制——消息消费高可用

一、前言

在前两篇我们介绍了

一文读懂RocketMQ的高可用机制——消息存储高可用

一文读懂RocketMQ的高可用机制——消息发送高可用

这一篇我们来说一下消息消费是如何保证高可用的。

要想知道消息消费的高可用,那我们得知道消息是怎么消费的吧。我们知道 Consumer 集群中每个 Consumer 都有消费组,那一个消费组的多个消费者是如何对消息队列(一个主题对应多个消息队列),那消费者是如何做的负载均衡呢?一个消费者又是如何并发消费消息队列的消息呢(一个消费者可以对应多个消息队列,而一个消息队列只能被一个消费者消费)?这就是消费端消息负载均衡与重新分布机制,这个留给你自己去思考。

本文重点讲消息消费高可用的机制。我们的业务场景,确实无法避免消息消费失败的情况下,比如网络异常、业务逻辑本身异常、还有的自身业务没异常而调用的第三方异常等。但无论啥情况,作为一款消息中间件,你得保证我消息消费的时候消息不会丢失吧,要是重要的业务,数据丢了,那没人敢用这款产品了,所以消息消费高可用也异常的重要。即使消息一直消费失败,也不能丢失数据。那 RocketMQ 是如何保证消息消费不丢数据的呢?答案就是消费重试机制消息 ACK 机制来保证。

二、消息重试机制

1、消息消费过程

PullMessageService 负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存储 ProcessQueue 消息队列处理队列中,然后调用 ConsumeMessageService#submitConsumeRequest 方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。ConsumeMessageService 支持顺序消息和并发(无序)消息,核心类图如下:

我们这里暂时先分享

ConsumeMessageConcurrentlyService 无序消息。

不难发现代码入口

ConsumeMessageConcurrentlyService 的内部类 ConsumeRequest#run 方法。

处理消费结果继续跟进去:

public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();
    if (consumeRequest.getMsgs().isEmpty())
        return;
    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

从源码中可以看出,

如果返回状态 status 是 CONSUME_SUCCESS,此时ackIndex = consumeRequest.getMsgs().size() - 1;再看下面消息模式是 CLUSTERING 的场景,for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)从这里可以看出如果消息成功,则无需发送 sendMsgBack 给 broker。

如果返回状态 status 是 RECONSUME_LATER,此时ackIndex = -1则这批消息都会发送给 Broker,也就是这一批消息都得重新消费。如果发送 ack 失败,则会延迟 5s 后重新在消费端重新消费。

if (!msgBackFailed.isEmpty()) {
    consumeRequest.getMsgs().removeAll(msgBackFailed);
    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}

消费者向 Broker 发送 ACK 消息,如果发送成功,重试机制由 broker 处理,如果发送 ack 消息失败,则将该任务直接在消费者这边,再次在本地处理该批消息,默认延长 5s 后在消费者重新消费,其关键总结如下:

a. 根据消费返回的状态,得到 ackIndex 的值。

b. 如果消费成功,则无需发送 sendMsgBack 给 broker;如果消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack。

c. 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)




if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { 
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}


然后我们重点跟踪 sendMessageBack 方法:

DefaultMQPushConsumerImpl#sendMessageBack

其核心实现要点如下:

a. 首先根据 brokerName 得到 broker 地址信息,然后通过网络发送到指定的 Broker上。

b. 如果上述过程失败,则创建一条新的消息重新发送给 Broker,此时新消息的主题为重试主题"%RETRY%" + ConsumeGroupName注意,这里的主题和原先的消息主题没任何关系而是和消费组相关。


我们再来看下 Broker 端怎么处理消费端发送过来的消息,代码在:

SendMessageProcessor#processRequest

Broker 端会处理 CONSUMER_SEND_MSG_BACK 命令。

三、Broker 端处理 CONSUMER_SEND_MSG_BACK 命令

1、获取消费组的订阅信息

代码入口:

SendMessageProcessor#consumerSendMsgBack

其核心实现要点如下:

a. groupName

消费组名称,RocketMQ 消息消费重试不是以主题,而是以消费组。

b. retryQueueNums

重试队列的数量,读队列,写队列个数(主题)。

c. retryMaxTimes

允许最大的重复次数。

2、根据重试主题创建

SendMessageProcessor#consumerSendMsgBack

TopicConfigManager#createTopicInSendMessageBackMethod

如果创建主题配置信息错误,会抛出系统异常,产生的效果是消费端发送 ACK 消息错误,会创建一条新的消息,消息内部 ID 为原消息 ID,然后重新发送给 Broker。

3、根据消息偏移量尝试从 commitlog 日志文件中获取消息内容

SendMessageProcessor#consumerSendMsgBack

4、延迟级别、消费次数处理

SendMessageProcessor#consumerSendMsgBack

如果消息次数或延迟级别小于 0,设置消息的主题为%DLQ% + 消费组名称如果消息的延迟级别为 0,则 3 + 消息重试的次数。

5、重新发送该消息到 commitlog 中

如果消息发送成功,则返回成功,否则返回错误,消费端会将这些消息直接在消费端延迟 5s 后重新消费。

现在成功将消息发送到 commitlog 中,主题为RETRY_TOPIC + 消费组名称也就是消息重试的消息主题是基于消费组。不是每一个主题都有一个重试主题。而是每一个消费组有一个重试主题。那这些主题的消息,又是如何在被消费者获取并进行消费的。

然后进行消费进度更新:

ConsumeMessageConcurrentlyService#processConsumeResult

消息现在是存储到 commitlog 文件里了,那怎么消费呢?

四、延迟消息机制

延时消息是消息发送到 Broker 后,并不立即被消费者消费而是要等到特定的时间后才能被消费, RocketMQ 并不支持任意的时间精度,如果要支持任意时间精度定时调度,不可避免地需要在 Broker 层做消息排序,再加上持久化方面的考量,将不可避免的带来巨大的性能消耗,所以 RocketMQ 只支持特 定级别的延迟消息。消息延迟级别在 Broker 端通过 messageDelayLevel 配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”delayLevel=1 表示延迟消息 1s,delayLevel=2 表示延迟5s,依次类推。

既然 delayLevel 表示延迟消息的级别,我们全局搜索一下 delayLevel,发现 CommitLog 类的 putMessage 中竟然也出现了 delayLevel 相关的处理,我们重点来看下该代码org.apache.rocketmq.store.CommitLog#putMessage

在消息存入 commitlog 之前,如果发现延迟 level 大于 0,会将消息的主题设置为SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"然后备份原主题名称。那就很清晰了,延迟消息统一由 ScheduleMessageService 来处理。

RocketMQ 延时消息实现类为 ScheduleMessageService,该类在 DefaultMessageStore 中创建。通过在 DefaultMessageStore 中调用 load 方法加载该类并调用 start 方法启动。

1、ScheduleMessageService#load

// 加载延迟消息消费进度的加载与delayLevelTable的构造。延迟消息的进度默认存储路径为/store/config/delayOffset.json 
public boolean load() {
    boolean result = super.load();
    result = result && this.parseDelayLevel();
    return result;
}

2、ScheduleMessageService#start

// 遍历延迟队列创建定时任务,遍历延迟级别,根据延迟级别level从offsetTable中获取消费队列的消费进度。如果不存在,则使用0
public void start() {
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }
            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    // 每隔10s持久化一次延迟队列的消息消费进度
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

3、调度机制

ScheduleMessageService 的 start 方法启动后,会为每一个延迟级别创建一个调度任务,每一个延迟级别对应 SCHEDULE_TOPIC_XXXX 主题下的一个消息消费队列。定时调度任务的实现类为 DeliverDelayedMessageTimerTask,核心实现方法为 executeOnTimeup。

ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup

到这里,我们终于搞清楚了消息重试与流转,但还是没有找到RETRY+消费组(队列的订阅信息)。

那消费者是如何订阅RETRY+消费组名称的消费队列的呢?

原来在消费者启动时,就默认会订阅该消费组的重试主题的队列。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

这样跟着老周下来,相信你对消息消费高可用有了一定的了解了。

五、总结

我们最后还是要来回答前言的问题,那 RocketMQ 是如何保证消息消费不丢数据的呢?也就是如何保证消息消费高可用?答案就是消费重试机制消息 ACK 机制来保证。

1、我们从消息消费过程入手

如果返回状态 status 是 CONSUME_SUCCESS,此时ackIndex = consumeRequest.getMsgs().size() - 1;再看下面消息模式是 CLUSTERING 的场景for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)从这里可以看出如果消息成功,则无需发送 sendMsgBack 给 broker。

如果返回状态 status 是 RECONSUME_LATER,此时ackIndex = -1则这批消息都会发送给 Broker,也就是这一批消息都得重新消费。如果发送 ack 失败,则会延迟 5s 后重新在消费端重新消费。

消费者向 Broker 发送 ACK 消息,如果发送成功,重试机制由 broker 处理,如果发送 ack 消息失败,则将该任务直接在消费者这边,再次在本地处理该批消息,默认延长 5s 后在消费者重新消费。

a. 根据消费返回的状态,得到 ackIndex 的值。

b. 如果消费成功,则无需发送 sendMsgBack 给 broker;如果消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack。

c. 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)


2、Broker 端处理 CONSUMER_SEND_MSG_BACK 命令

a. 获取消费组的订阅信息

b. 根据重试主题创建

c. 根据消息偏移量尝试从 commitlog 日志文件中获取消息内容

d. 延迟级别、消费次数处理

e. 重新发送该消息到 commitlog 中,主题为 RETRY_TOPIC + 消费组名称,也就是消息重试的消息主题是基于消费组。

3、延迟消息机制

需要延迟执行的消息,在存入 commitlog 之前,如果发现延迟 level 大于 0,会将消息的主题设置为SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"然后备份原主题名称(retry + 消费组名称)、与消费队列 ID,最后会被延迟任务 ScheduleMessageService 延迟拉取。

ScheduleMessageService 在执行过程中,会再次存入 commitlog 文件中放入之前,会清空延迟等级,并恢复主题与队列。这样,就能被消费者所消费,因为消费者在启动时就订阅了该消费组的重试主题。


重试消费过程中的间隔时间使用了延时消息,重试的消息数据并非直接写入重试队列,而是先写入延时消息队列,再通过定时消息的功能转发到重试队列。

广播模式的消费进度保存在客户端本地,集群模式的消费进度保存在 Broker 上。集群模式中 RocketMQ 采用 ACK 机制确保消息一定被消费。在消息投递过程中,不是消息从 Broker 发送到 Consumer 就算消费成功了,需要 Consumer 明确给 Broker 返回消费成功状态才算。如果从 Broker 发送到 Consumer 后,已经完成了业务处理,但在给 Broker 返回消费成功状态之前,Consumer 发生宕机或者断电、断网等情况,Broker 未收到返回,则不会保存消费进度。Consumer 重启之后,消息会重新投递,此时也会出现重复消费的场景,这时消息的幂等性需要业务自行保证。

好了,消息消费高可用机制就说到这了,相信你心里有了自己的答案与思考。



欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。


喜欢的话,点赞、再看、分享三连。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 运维
|
6月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
94 0
|
3月前
|
消息中间件 存储 算法
一文详解 RocketMQ 如何利用 Raft 进行高可用保障
本文介绍 RocketMQ 如何利用 Raft(一种简单有效的分布式一致性算法)进行高可用的保障,总结了 RocketMQ 与 Raft 的前世今生。可以说 Raft 的设计给 RocketMQ 的高可用注入了非常多的养分,RocketMQ 的共识算法与高可用设计在 2023 年也得到了学术界的认可,被 CCF-A 类学术会议 ASE 23' 录用。
405 11
|
6月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
86 0
|
4月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
63 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
56 0
|
3月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
48 0
|
6月前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1334 3
|
5月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。