消息消费负载和重新分布机制|学习笔记

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: 快速学习消息消费负载和重新分布机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息消费负载和重新分布机制】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12502


消息消费负载和重新分布机制


负载均衡的过程与算法介绍

RocketMQ 消息队列重新分配是由 RebalanceService 线程负责的。线程的启动会随着MQClientinstance的启动而启动。

MQClientinstance  其中会有这么一行代码:

this.rebalanceService.start();

也就是start会启动 rebalanceService 线程。

这个线程在启动之后,就会去调用客户端的  dorebalance  方法,下面是这个方法的代码:

public void doRebalance(){

for (Map.Entry entry : this.consumerTable.entryset()){

MQConsumerInner impl = entry .getValue();

if ( impl !=null){

try {  impl.doRebalance();

catch ( Throwable e) {

log.error( "doRebalance exception", e);}

它会进行负载均衡,重新分布的时候,以上代码部分会去遍历每个主题的订阅的队列,然后重新进行一个负载啊,所以整个地方是根据主题的一个订阅信息。然后会去遍历,针对每一个消费方去进行一个负载。

image.png进入  doRebalance  之后,发现有两种方式,一种是推送方式(DefaultMQPushConsumerImpl),一种是拉取方式(DefaultMQPullConsumerImpl),

推送方式。代码如下:

public void doRebalance(final boolean isOrder) {

Map subTable = this.getSubscriptionInner();

if ( subTable != null){

for (final Map.Entry entry : subTable.entrySet()){

final String topic = entry.getKey();

try {

this.rebalancByTopic(topic, isOrder);

}catch (Throwable e) {

if(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){

Log.warn("rebalanceByTopic Exception", e);}

真正的负载会根据主题(Topic)去进行负载,这个地方传了一个topic, 所以进入 rebalanceByTopic,

负载均衡对于广播模式(messageModel)来讲其实没有太大的意义,因为广播模式的每一个客户端都要去消费当前主题下所有的队列,最多做更新的处理。

case CLUSTERING:{

Set mqSet = this.topicSubscribeInfoTable.get(topic);

List cidAll =

this.mQClientFactory.findConsumerIdList(topic,consumerGroup);

if(null == mqSet){

if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){

Log.warn("doRebalance,{},but the topic[{}] not exist.",

consumerGroup,topic);}

}

if(null == cidAll){

log.warn("doRebalance, {} {},get consumer id list failed", consumerGroup,topic);}

而真正的负载均衡是发生在集群模式下。它首先是拿到当前主题的这个队列(topic),再去拿到 cidAll ,也就是当前的所有主题客户端,

如下两个进行排序:

Collections.sort(mqAll)  Collections.sort(cidAll);

排完序之后,根据负载均衡的策略去进行一个重新的一个分布。

负载均衡的策略有五种。比较常用的有两种,一种叫做AllocateMessageQueueAveragely,另外一种就是AllocateMessageQueueAveragelyByCircle。

RocketMQ 默认提供5中负载均衡分配算法,下面是常见的两种:

AllocateMessageQueueAveragely : 平均分配

举例:8个队列q1,q2,q3,q4, q5 ,q6 ,q7,q8 ,消费者3个:c1,c2,c3分配如下:

c1:q1,q2 ,q3; c2:q4, q5 ,q6;c3:q7,q8

AllocateMessageQueueAveragelyByCircle:平均轮询分配

举例:8个队列q1,q2,q3,q4, q5 ,q6,q7 ,q8 ,消费者3个:c1,c2,c3分配如下:

c1:q1,q4 ,q7;c2:q2,q5,q8;c3:q3, q6

这两种的区别是:举一个例子:比如现在有八个队列,三个消费者,如果是第一种方式,真正负载的情况是:就是 C1 客户端负责消费一23,C2 负责消费456,C3 负责消费78;如果是第二种方式,在这种情况下,它的这三个消费者的负载是这样的: C1负责消费 Q1,Q4,Q7。C2 负责消费258, C3 负责消费36,它是一个循环的过程。这是两种比较常用的负载均衡的方式,在 MQ 当中这两种方式都可以使用。

注意事项:消息队列的分配遵循一个消费者可以分配到多个队列,但是同一个消息队列只能分配给一个消费者。所以如果出现消费者的个数大于队列的个数,那有些消费者就无法消费消息。以上就是关于负载均衡的过程以及具体负载均衡的算法。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 监控
自顶向下学习 RocketMQ(十):消息重投和消息重试
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。
自顶向下学习 RocketMQ(十):消息重投和消息重试
|
10月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
366 7
Spring Boot 入门:简化 Java Web 开发的强大工具
|
10月前
|
开发者
阿里云开发者社区入选 2024 中国技术品牌影响力企业榜
阿里云开发者社区入选 2024 中国技术品牌影响力企业榜。
|
11月前
|
存储 NoSQL Redis
Redis的数据过期策略有哪些 ?
Redis 采用两种过期键删除策略:惰性删除和定期删除。惰性删除在读取键时检查是否过期并删除,对 CPU 友好但可能积压大量过期键。定期删除则定时抽样检查并删除过期键,对内存更友好。默认每秒扫描 10 次,每次检查 20 个键,若超过 25% 过期则继续检查,单次最大执行时间 25ms。两者结合使用以平衡性能和资源占用。
169 11
|
JavaScript 前端开发 安全
js逆向实战之烯牛数据请求参数加密和返回数据解密
【9月更文挑战第20天】在JavaScript逆向工程中,处理烯牛数据的请求参数加密和返回数据解密颇具挑战。本文详细分析了这一过程,包括网络请求监测、代码分析、加密算法推测及解密逻辑研究,并提供了实战步骤,如确定加密入口点、逆向分析算法及模拟加密解密过程。此外,还强调了法律合规性和安全性的重要性,帮助读者合法且安全地进行逆向工程。
332 11
|
11月前
|
数据采集 Java Python
爬取小说资源的Python实践:从单线程到多线程的效率飞跃
本文介绍了一种使用Python从笔趣阁网站爬取小说内容的方法,并通过引入多线程技术大幅提高了下载效率。文章首先概述了环境准备,包括所需安装的库,然后详细描述了爬虫程序的设计与实现过程,包括发送HTTP请求、解析HTML文档、提取章节链接及多线程下载等步骤。最后,强调了性能优化的重要性,并提醒读者遵守相关法律法规。
307 0
|
Java
"揭秘Java IO三大模式:BIO、NIO、AIO背后的秘密!为何AIO成为高并发时代的宠儿,你的选择对了吗?"
【8月更文挑战第19天】在Java的IO编程中,BIO、NIO与AIO代表了三种不同的IO处理机制。BIO采用同步阻塞模型,每个连接需单独线程处理,适用于连接少且稳定的场景。NIO引入了非阻塞性质,利用Channel、Buffer与Selector实现多路复用,提升了效率与吞吐量。AIO则是真正的异步IO,在JDK 7中引入,通过回调或Future机制在IO操作完成后通知应用,适合高并发场景。选择合适的模型对构建高效网络应用至关重要。
326 2
|
机器学习/深度学习 算法 数据处理
《零基础实践深度学习》波士顿房价预测任务1.3.3.4训练过程
这篇文章详细阐述了如何使用线性回归对波士顿房价进行预测,包括构建神经网络模型、数据处理、模型设计、训练过程、梯度下降法以及随机梯度下降法(SGD)的应用,并提供了完整的Python代码实现。
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
696 0
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
267 0