RocketMQ源码(三)简单探索Producer和Consumer与Queue之间的负载均衡策略

本文涉及的产品
轻量应用服务器 2vCPU 4GiB,适用于网站搭建
轻量应用服务器 2vCPU 4GiB,适用于搭建容器环境
轻量应用服务器 2vCPU 4GiB,适用于搭建Web应用/小程序
简介: - Producer如何将消息负载均衡发送给queue?- Consumer如何通过负载均衡并发消费queue的消息?

RocketMQ源码(三)简单探索Producer和Consumer与Queue之间的负载均衡策略

RocketMQ架构中,我们都知道一个topic下可以创建多个queue,生产者通过负载均衡策略可以将消息均匀的分发在各个queue中,而这些queue
可以通过负载均衡给多个消费者订阅从而提升消费效率,本文将从以下两个方面从源码角度分析producer和consumer的负载均衡原理:

  • Producer如何将消息负载均衡发送给queue?
  • Consumer如何通过负载均衡并发消费queue的消息?

Tip以下是本人经过多年的工作经验集成的JavaWeb脚手架,封装了各种通用的starter可开箱即用,同时列举了互联网各种高性能场景的使用示例。

// Git代码
https://gitee.com/yeeevip/yeee-memo
https://github.com/yeeevip/yeee-memo
AI 代码解读

1 Producer下的负载均衡

在Producer下主要指的是通过负载均衡算法去选择一个queue去发送消息,这里默认使用的策略是轮询的方式按顺序选择Queue。

20231226-01.png

1.1 源码分析

  • 1 在没有指定Queue发送消息时,会调用到MQFaultStrategy.selectOneMessageQueue方法
  • 2 selectOneMessageQueue中调用tpInfo.selectOneMessageQueue()
  • 3 进而使用轮询算法从该Topic下的所有Queue里选择一个queue去发送消息

1.1.1 核心代码

public class MQFaultStrategy {
   
   
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {
   
   
        ...
        ...
        return tpInfo.selectOneMessageQueue();
    }
}
/*
    轮询算法:维护一个全局的index,每次都+1再与Queue的size求余
 */
public class TopicPublishInfo {
   
   
    public MessageQueue selectOneMessageQueue() {
   
   
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = index % this.messageQueueList.size();
        return this.messageQueueList.get(pos);
    }
}
AI 代码解读

2 Consumer下的负载均衡

由于Consumer在广播模式下会把消息发给所有消费者,所以这里我们讨论的是集群模式下queue与消费者的负载均衡;

Rocketmq中规定一个Queue只能被一个消费者订阅,而一个消费者可以订阅多个Queue,这样自然会涉及到消费者与Queue分配协调策略。
当Broker扩容或缩容、Queue扩容等场景都可能导致消费者所订阅Topic的队列数量发生变化,也会再次重新分配。

2.1 分配策略算法

Rocketmq提供了多种Queue分配策略算法,如下

  • AllocateMessageQueueAveragely

    平均算法: 算出平均值,将连续的队列按平均值分配给每个消费者。
    如果能够整除,则按顺序将平均值个Queue分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配

20231226-02.png

  • AllocateMessageQueueAveragelyByCircle

    环形平均算法:将消费者按顺序形成一个环形,然后按照这个环形顺序逐个给消费者分配一个Queue

20231226-03.png

  • AllocateMessageQueueConsistentHash

    一致性hash算法:先将消费端的hash值放于环上,同时计算队列的hash值,以顺时针方向,分配给离队列hash值最近的一个消费者节点

20231226-04.png

2.2 源码分析

  • 1 创建DefaultMQPushConsumer时默认使用AllocateMessageQueueAveragely分配策略,即平均分配算法
  • 2 consumer调用start启动时会调用到RebalanceService.start()开启一个任务
  • 3 RebalanceService会调用到RebalanceImpl.rebalanceByTopic执行具体的平衡策略
  • 4 进而拿到所有的Consumer和Queue使用设置的AllocateMessageQueueStrategy算法去分配订阅关系

2.2.1 核心代码

/*
   1.初始化:默认使用AllocateMessageQueueAveragely算法分配Queue
 */
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
   
   
    public DefaultMQPushConsumer(final String consumerGroup) {
   
   
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }
}
/*
   2.开启一个RebalanceService任务执行分配策略
 */
public class MQClientInstance {
   
   
    public void start() throws MQClientException {
   
   
        synchronized (this) {
   
   
            switch (this.serviceState) {
   
   
                case CREATE_JUST:
                    ...
                    // Start rebalance service
                    this.rebalanceService.start();
                    ...
                default:
                    break;
            }
        }
    }
}
/*
   3.RebalanceImpl.rebalanceByTopic执行具体的分配逻辑
 */
public abstract class RebalanceImpl {
   
   
    private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
   
   
        boolean balanced = true;
        switch (messageModel) {
   
   
            ...
            case CLUSTERING: {
   
   
                // 拿到所有的Queue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 拿到所有的消费者ID
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                ...
                if (mqSet != null && cidAll != null) {
   
   
                    List<MessageQueue> mqAll = new ArrayList<>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    // 初始化设置的分配算法:即AllocateMessageQueueAveragely平均分配算法
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
   
   
                        // 调用分配算法的具体实现
                        allocateResult = strategy.allocate(...mqAll, cidAll);
                    } catch (Throwable e) {
   
   
                        log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
                        return false;
                    }
                    ...
                }
                break;
            }
            default:
                break;
        }
        return balanced;
    }
}
/*
   4.执行AllocateMessageQueueAveragely平均分配算法的具体实现
 */
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
   
   
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
   
   

        List<MessageQueue> result = new ArrayList<>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
   
   
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                        + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
   
   
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}
AI 代码解读

最后

至此,通过源码探索我们就大致了解了Producer及Consumer的负载均衡策略,其中Producer主要是通过轮询方式依次把消息发送到Queue,而Consumer默认使用的
平均分配算法去解决消费者节点与Queue之间的分配订阅关系。

Tip以下是本人经过多年的工作经验集成的JavaWeb脚手架,封装了各种通用的starter可开箱即用,同时列举了互联网各种高性能场景的使用示例。

// Git代码
https://gitee.com/yeeevip/yeee-memo
https://github.com/yeeevip/yeee-memo
AI 代码解读
相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
8
分享
相关文章
电商API接口性能优化技术揭秘:缓存策略与负载均衡详解
电商API接口性能优化是提升系统稳定性和用户体验的关键。本文聚焦缓存策略与负载均衡两大核心,详解其在电商业务中的实践。缓存策略涵盖本地、分布式及CDN缓存,通过全量或部分缓存设计和一致性维护,减少后端压力;负载均衡则利用反向代理、DNS轮询等技术,结合动态调整与冗余部署,提高吞吐量与可用性。文中引用大型及跨境电商平台案例,展示优化效果,强调持续监控与迭代的重要性,为电商企业提供了切实可行的性能优化路径。
云原生之负载均衡策略
ai必学之负载均衡 @[TOC]轮询处理;weight权重;ip_hash
架构学习:7种负载均衡算法策略
四层负载均衡包括数据链路层、网络层和应用层负载均衡。数据链路层通过修改MAC地址转发帧;网络层通过改变IP地址实现数据包转发;应用层有多种策略,如轮循、权重轮循、随机、权重随机、一致性哈希、响应速度和最少连接数均衡,确保请求合理分配到服务器,提升性能与稳定性。
1150 11
架构学习:7种负载均衡算法策略
SpringBoot整合XXL-JOB【04】- 以GLUE模式运行与执行器负载均衡策略
在本节中,我们将介绍XXL-JOB的GLUE模式和集群模式下的路由策略。GLUE模式允许直接在线上改造方法为定时任务,无需重新部署。通过一个测试方法,展示了如何在调度中心配置并使用GLUE模式执行定时任务。接着,我们探讨了多实例环境下的负载均衡策略,确保任务不会重复执行,并可通过修改路由策略(如轮训)实现任务在多个实例间的均衡分配。最后,总结了GLUE模式和负载均衡策略的应用,帮助读者更深入理解XXL-JOB的使用。
239 9
SpringBoot整合XXL-JOB【04】-  以GLUE模式运行与执行器负载均衡策略
常见的Ribbon/Spring LoadBalancer的负载均衡策略
自SpringCloud 2020版起,Ribbon被弃用,转而使用Spring Cloud LoadBalancer。Ribbon支持轮询、随机、加权响应时间和重试等负载均衡策略;而Spring Cloud LoadBalancer则提供轮询、随机及Nacos负载均衡策略,基于Reactor实现,更高效灵活。
386 0
Nginx的6大负载均衡策略及权重轮询手写配置
【10月更文挑战第9天】 Nginx是一款高性能的HTTP服务器和反向代理服务器,它在处理大量并发请求时表现出色。Nginx的负载均衡功能可以将请求分发到多个服务器,提高网站的吞吐量和可靠性。以下是Nginx支持的6大负载均衡策略:
846 7
腾讯面试:说说6大Nginx负载均衡?手写一下权重轮询策略?
尼恩,一位资深架构师,分享了关于负载均衡及其策略的深入解析,特别是基于权重的负载均衡策略。文章不仅介绍了Nginx的五大负载均衡策略,如轮询、加权轮询、IP哈希、最少连接数等,还提供了手写加权轮询算法的Java实现示例。通过这些内容,尼恩帮助读者系统化理解负载均衡技术,提升面试竞争力,实现技术上的“肌肉展示”。此外,他还提供了丰富的技术资料和面试指导,助力求职者在大厂面试中脱颖而出。
腾讯面试:说说6大Nginx负载均衡?手写一下权重轮询策略?
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
1894 2
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
132 2
负载均衡原理分析与源码解读
负载均衡原理分析与源码解读

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问