RocketMQ - 消费者进度保存机制

简介: RocketMQ - 消费者进度保存机制

RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中

/**
 * Offset store interface
 */
public interface OffsetStore {
    /**
     * 加载位点信息
     */
    void load() throws MQClientException;
    /**
     * 更新缓存位点信息
     */
    void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
    /**
     * 读取本地位点信息
     *
     * @return The fetched offset
     */
    long readOffset(final MessageQueue mq, final ReadOffsetType type);
    /**
     * 持久化全部队列的位点信息
     */
    void persistAll(final Set<MessageQueue> mqs);
    /**
     * 持久化某一个队列的位点信息
     */
    void persist(final MessageQueue mq);
    /**
     * 删除某一个队列的位点信息
     */
    void removeOffset(MessageQueue mq);
    /**
     * 复制一份缓存位点信息
     * @return The cloned offset table of given topic
     */
    Map<MessageQueue, Long> cloneOffsetTable(String topic);
    /**
     * 将本地消费位点持久化到Broker中
     * @param mq
     * @param offset
     * @param isOneway
     */
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException;
}

客户端消费进度保存也叫消费进度持久化,开源RocketMQ 4.2.0支持定时持久化和不定时持久化两种方式

定时持久化位点实现方法是org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask()

定时持久化位点逻辑是通过定时任务来实现的,在启动程序10s后,会定时调用持久化方法MQClientInstance.this.persistAllConsumerOffset(),持久化每一个消费者消费的每一个MessageQueue的消费进度。

不定时持久化也叫Pull-And-Commit,也就是在执行Pull方法的同时,把队列最新消费位点信息发给Broker,具体实现代码在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage()方法中

该方法中有两处持久化位点信息

第一处,在拉取完成后,如果拉取位点非法,则此时客户端会主动提交一次最新的消费位点信息给Broker,以便下次能使用正确的位点拉取消息,该处更新位点信息

第二处,在执行消息拉取动作时,如果是集群消费,并且本地位点值大于0,那么把最新的位点上传给Broker

代码中通过commitOffsetEnable、sysFlag两个字段表示是否可以上报消费位点给Broker。在执行Pull请求时,将sysFlag作为网络请求的消息头传递给Broker,Broker中处理该字段的逻辑在org.apache.rocketmq.broker.processor.PullMessageProcessor.processRequest()方法中

hasCommitOffsetFlag:Pull请求中的sysFlag参数,是决定Broker是否执行持久化消费位点的一个因素。

brokerAllowSuspend:Broker是否能挂起。如果Broker是挂起状态,将不能持久化位点。

storeOffsetEnable:True表示Broker需要持久化消费位点,False则不用持久化位点。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
23天前
|
消息中间件 存储 JSON
RocketMQ 消费进度持久化
本文介绍了RocketMQ中消费进度的持久化机制,包括普通消息和延迟消息的消费偏移量是如何存储的。普通消息的消费进度存储于`consumerOffset.json`文件,格式为`{Topic}@{ConsumerGroup}`,而延迟消息则存储于`delayOffset.json`文件,以`{delayLevel:offset}`的形式记录。文章详细分析了相关文件内容及代码实现,并指出Broker分别以5秒和10秒的间隔进行持久化操作。
|
2月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
2月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
3月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
2月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
42 0
|
2月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
47 0
|
2月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
41 0
|
2月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
39 0
|
3月前
|
消息中间件 API RocketMQ
消息队列 MQ使用问题之消息在没有消费者的情况下丢失,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。