RocketMQ 消费进度持久化

简介: 本文介绍了RocketMQ中消费进度的持久化机制,包括普通消息和延迟消息的消费偏移量是如何存储的。普通消息的消费进度存储于`consumerOffset.json`文件,格式为`{Topic}@{ConsumerGroup}`,而延迟消息则存储于`delayOffset.json`文件,以`{delayLevel:offset}`的形式记录。文章详细分析了相关文件内容及代码实现,并指出Broker分别以5秒和10秒的间隔进行持久化操作。

消费进度文件内容

消息消费完毕,如何保持消费进度呢?带着这个疑问,来看下 RocketMQ 的实现。

RocketMQ 保存消费偏移的文件位置在 ${user.home}/store/config 目录下。

consumerOffset.json

保存正常消费的消费进度,来看下,文件里面的内容。

json

代码解读

复制代码

{
    "offsetTable":{

        "%RETRY%TestConsumer@TestConsumer":{0:0},
        
        "TopicTest@TestConsumer":{0:1,1:2,2:1,3:0}
    }
}

${Topic}@${ConsumerGroup} 格式是正常的消费者。

%Retry%${Topic}@${ConsumerGroup} 格式是重试主题。

{1:2} 则表示 1 号队列已经消费完 2 条消息。

在讲解 ConsumerQueue 时,我提到过它的子条目是定长的,所以在存储消费偏移时,可以存储消费到第几个子条目。

delayOffset.json

保存延迟消息的消费进度,文件的内容如下

json

代码解读

复制代码

{
"offsetTable":{1:14,3:10,4:10,5:10,6:10,7:10,8:10,9:0}
}

{4:10} 表示:延迟等级为 4 的消费完了 10 个。

讲解完,消费偏移在文件上怎么存储的,我们看下 RocketMQ 在什么时候会消费进度持久化。

消费进度持久化代码实现

正常的普通消息

Broker 每隔 5s 持久化消费偏移。

代码位置:

BrokerController#initialize()

java

代码解读

复制代码

public boolean initialize() throws CloneNotSupportedException {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.consumerOffsetManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e);
            }
        }
    }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval()/* 默认5000 */, TimeUnit.MILLISECONDS);
}

延迟消息

Broker 每隔 10s 就会将 延迟消息消费偏移 持久化。

代码位置:ScheduleMessageService#start()

java

代码解读

复制代码

public class ScheduleMessageService extends ConfigManager {
    public void start() {
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            // 获取配置的延迟等级
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }

                // 初始化延迟调度任务
                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }

            // 默认每隔 10s,执行一次持久化
            this.timer.scheduleAtFixedRate(new TimerTask() {

                @Override
                public void run() {
                    try {
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()/*默认 10000 */);
        }
    }
}


转载来源:https://juejin.cn/post/6993870092572295181

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 存储 缓存
RabbitMQ之消息应答和持久化
【1月更文挑战第11天】 一、消息应答 1.概念 2.自动应答 3.消息应答方法 4.Multiple 的解释 5.消息自动重新入队 6.消息手动应答代码 7.手动应答效果演示 二、RabbitMQ持久化 1.概念 2.队列如何实现持久化 3.消息实现持久化 4.不公平分发 5.预取值
330 8
|
消息中间件 存储 Kubernetes
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
753 1
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
82 0
|
7月前
|
消息中间件 Shell 数据处理
rocket mq 查看消费进度,消息堆积,清除堆积数据命令
该内容是关于RocketMQ的消费进度管理和堆积数据处理的指导。首先,需进入RocketMQ的bin目录,然后使用`mqadmin consumerProgress`命令查看消费者或生产者的消费进度。`broker offset`和`consumer offset`的差值表示未消费消息。通过`resetOffsetByTime`命令可重置消费位点来清除堆积数据,未消费消息默认3天后会被丢弃。此外,`CONSUME_FROM WHERE`枚举类定义了消费起点选项,包括从最后、最开始或指定时间点消费。
1777 3
|
消息中间件 Cloud Native
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
咱们今天再来进一步学习一下 RabbitMQ 的知识点,整理了如下相关知识点
135 0
|
消息中间件
RabbitMQ 的持久化防丢失
RabbitMQ 的持久化防丢失
111 0
|
7月前
|
消息中间件 存储 Java
RabbitMQ中的消息持久化是如何实现的?
RabbitMQ中的消息持久化是如何实现的?
131 0
|
消息中间件
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
|
消息中间件 存储 缓存
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2
74 0
|
消息中间件
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1
75 0
下一篇
DataWorks