RocketMQ主从如何同步消息消费进度?

简介: 前面我也跟大家讲述了 RocketMQ 读写分离的规则,但是你可能会问,主从服务器之间的消费进度是如何保持同步的?下面我来给大家解答一下。

640.jpg

前面我也跟大家讲述了 RocketMQ 读写分离的规则,但是你可能会问,主从服务器之间的消费进度是如何保持同步的?下面我来给大家解答一下。


如果消费者消费模式不同,也会有不同的保存方式,消费者端的消息消费进度保存到 OffsetStore 中,他有两个实现类:


org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore // 本地消费进度保存实现
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore // 远程消费进度保存实现


其中,如果是广播模式消费,消息的消费进度是保存到本地,如果是集群消费模式,消息的消费进度则是保存到 Broker,但无论是保存到本地,还是保存到 Broker,消费者都会在本地留一份缓存,我们暂且看看集群消费模式下,消息消费进度的缓存是如何保存的:


org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:

public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
  if (mq != null) {
    AtomicLong offsetOld = this.offsetTable.get(mq);
    if (null == offsetOld) {
      offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
    }
    if (null != offsetOld) {
      if (increaseOnly) {
        MixAll.compareAndIncreaseOnly(offsetOld, offset);
      } else {
        offsetOld.set(offset);
      }
    }
  }
}


消息者在消费完消息后,会调用以上方法,将消费进度放入 offsetTable 缓存中,当 Rebalance 负载重新分配生成 PullRequest 对象时,会调用 RemoteBrokerOffsetStore.readOffset 方法从 offsetTable 缓存中取出对应的消费进度缓存值,再将该值放进 PullRequest 对象中,接下来消息拉取时就会将消息消费进度缓存发送到 Broker 端,所以我们继续看 Broker 端的处理逻辑。


之前整理 Broker 启动流程时,发现 Broker 启动时会开启一个定时任务:


org.apache.rocketmq.broker.BrokerController#initialize:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.slaveSynchronize.syncAll();
        } catch (Throwable e) {
            log.error("ScheduledTask syncAll slave exception", e);
        }
    }
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);


如果 Broker 是从服务器,则会开启以上定时任务。


org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll:

public void syncAll() {
  this.syncTopicConfig();
  this.syncConsumerOffset();
  this.syncDelayOffset();
  this.syncSubscriptionGroupConfig();
}

在主服务器没有宕机的情况下,从服务器会定时从主服务器中同步消息消费进度等信息,那现在问题来了,由于这个同步是单方面同步,即只会从服务器同步主服务器,那如果主服务器宕机了之后,消费者切换成从服务器拉取消息进行消费,如果之后主服务器启动了,从服务器在把已经消费过的偏移量同步过来,那岂不是造成同步消费了?


其实消费者在拉取消息的时候,如果消费者的缓存中存在消费进度,也会向 Broker 更新消息消费进度,所以即使是主服务器挂了,在它重新启动之后,消费者的消费进度没有丢失,依然会更新主服务器的消息消费进度,这样一来,消费端与主服务器只挂了其中一个,并不会导致消息重新被消费,具体代码逻辑如下:


org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:

boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
    && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
 this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}


其中 brokerAllowSuspend 表示 broker 是否允许挂起,该值默认为 true,hasCommitOffsetFlag 表示消费者在内存中是否缓存了消息消费进度,从代码逻辑可看出,如果 Broker 为主服务器,并且 brokerAllowSuspend 和 hasCommitOffsetFlag 都为true,那么就会将消费者消费进度更新到本地。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
340 4
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
4月前
|
消息中间件 存储 JSON
RocketMQ 消费进度持久化
本文介绍了RocketMQ中消费进度的持久化机制,包括普通消息和延迟消息的消费偏移量是如何存储的。普通消息的消费进度存储于`consumerOffset.json`文件,格式为`{Topic}@{ConsumerGroup}`,而延迟消息则存储于`delayOffset.json`文件,以`{delayLevel:offset}`的形式记录。文章详细分析了相关文件内容及代码实现,并指出Broker分别以5秒和10秒的间隔进行持久化操作。
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
86 0
|
8月前
|
消息中间件 Shell 数据处理
rocket mq 查看消费进度,消息堆积,清除堆积数据命令
该内容是关于RocketMQ的消费进度管理和堆积数据处理的指导。首先,需进入RocketMQ的bin目录,然后使用`mqadmin consumerProgress`命令查看消费者或生产者的消费进度。`broker offset`和`consumer offset`的差值表示未消费消息。通过`resetOffsetByTime`命令可重置消费位点来清除堆积数据,未消费消息默认3天后会被丢弃。此外,`CONSUME_FROM WHERE`枚举类定义了消费起点选项,包括从最后、最开始或指定时间点消费。
1853 3
|
7月前
|
消息中间件 存储 Java
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
48 0
|
数据库
淘东电商项目(43) -MQ与Logstash实现数据库同步到ES的区别
淘东电商项目(43) -MQ与Logstash实现数据库同步到ES的区别
121 0
|
NoSQL 关系型数据库 MySQL
同步 MySQL 数据至 ES/Redis/MQ 等的五种方式
同步 MySQL 数据至 ES/Redis/MQ 等的五种方式
634 0