RocketMQ主从读写分离机制

简介: 一般来说,选择主从备份实现高可用的架构中,都会具备读写分离机制,比如 MySql 读写分离,客户端可以向主从服务器读取数据,但客户写数据只能通过主服务器。RocketMQ 的读写分离机制又跟上述描写的不太一致,RocketMQ 有属于自己的一套读写分离逻辑,它会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。

640.jpg一般来说,选择主从备份实现高可用的架构中,都会具备读写分离机制,比如 MySql 读写分离,客户端可以向主从服务器读取数据,但客户写数据只能通过主服务器。


RocketMQ 的读写分离机制又跟上述描写的不太一致,RocketMQ 有属于自己的一套读写分离逻辑,它会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。


决定消费者是否向从服务器拉取消息消费的值存在 GetMessageResult 类中:

org.apache.rocketmq.store.GetMessageResult:


private boolean suggestPullingFromSlave = false;


其默认值为 false,即默认消费者不会消费从服务器,以下逻辑可以改变该值:


org.apache.rocketmq.store.DefaultMessageStore#getMessage:

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);


其中 maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量,TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,以上逻辑即可算出当前消息堆积量是否大于物理内存的 40 %,如果大于则将 suggestPullingFromSlave 设置为 true。


接下来该参数值会在消息拉取逻辑里面产生作用:


org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:

if (getMessageResult.isSuggestPullingFromSlave()) {
  responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
  responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
  case ASYNC_MASTER:
  case SYNC_MASTER:
    break;
  case SLAVE:
    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
      response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
      responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
    }
    break;
}
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
  // consume too slow ,redirect to another machine
  if (getMessageResult.isSuggestPullingFromSlave()) {
    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
  }
  // consume ok
  else {
    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
  }
} else {
  responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}


如果发现主服务器的消息堆积超过了物理内存的 40%,则会设置 suggestWhichBrokerId 为从服务器 broker ID。


这里还会有个 slaveReadEnable 值来决定是否可以从从服务器拉取消息:


1.如果 slaveReadEnable=true,并且堆积量已经超过物理内存 40%时,则建议从从服务器拉取消息,否则还是从主服务器拉取消息;2.如果 slaveReadEnable=false,则消息者只能从主服务器中拉取消息。


org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#updatePullFromWhichNode:

public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
    if (null == suggest) {
        this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
    } else {
        suggest.set(brokerId);
    }
}

当消费者收到拉取响应回来的数据后,会将下次建议拉取的 brokerID 缓存起来。下次拉取消息就会从 pullFromWhichNodeTable 中取出拉取 brokerId。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 存储 监控
|
9月前
|
消息中间件 存储 运维
|
9月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
129 0
|
9月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
145 0
|
7月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
6月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
94 0
|
6月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
99 0
|
6月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
81 0
|
8月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
9月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程