消费进度文件内容
消息消费完毕,如何保持消费进度呢?带着这个疑问,来看下 RocketMQ
的实现。
RocketMQ
保存消费偏移的文件位置在 ${user.home}/store/config
目录下。
consumerOffset.json
保存正常消费的消费进度,来看下,文件里面的内容。
json
代码解读
复制代码
{
"offsetTable":{
"%RETRY%TestConsumer@TestConsumer":{0:0},
"TopicTest@TestConsumer":{0:1,1:2,2:1,3:0}
}
}
${Topic}
@${ConsumerGroup}
格式是正常的消费者。
%Retry%${Topic}
@${ConsumerGroup}
格式是重试主题。
{1:2}
则表示 1 号队列已经消费完 2 条消息。
在讲解 ConsumerQueue
时,我提到过它的子条目是定长的,所以在存储消费偏移时,可以存储消费到第几个子条目。
delayOffset.json
保存延迟消息的消费进度,文件的内容如下
json
代码解读
复制代码
{
"offsetTable":{1:14,3:10,4:10,5:10,6:10,7:10,8:10,9:0}
}
{4:10}
表示:延迟等级为 4 的消费完了 10 个。
讲解完,消费偏移在文件上怎么存储的,我们看下 RocketMQ
在什么时候会消费进度持久化。
消费进度持久化代码实现
正常的普通消息
Broker
每隔 5s
持久化消费偏移。
代码位置:
BrokerController#initialize()
java
代码解读
复制代码
public boolean initialize() throws CloneNotSupportedException {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval()/* 默认5000 */, TimeUnit.MILLISECONDS);
}
延迟消息
Broker
每隔 10s
就会将 延迟消息消费偏移 持久化。
代码位置:ScheduleMessageService#start()
java
代码解读
复制代码
public class ScheduleMessageService extends ConfigManager {
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 获取配置的延迟等级
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
// 初始化延迟调度任务
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 默认每隔 10s,执行一次持久化
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()/*默认 10000 */);
}
}
}