RocketMQ生产者负载均衡(轮询机制)核心原理

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: 文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。

前言

上文已经分析了RocketMQ消费者负载均衡核心原理,我们经常会讨论消费者负载均衡的原理,我们可能忽略了生产者这端其实也有负载均衡机制。本文将分析RocketMQ生产者是如何负载均衡的。

RocketMQ生产者为什么需要负载均衡?

在RocketMQ中,队列是消息发送的基本单位。每个Topic下可能存在多个队列,因此一个生产者实例可以向不同的队列发送消息。当生产者发送消息时,如果不能均衡的将消息发送到不同的队列,那么会导致队列里的消息分布不均衡,这样最终会导致消息性能下降,因此生产者负载均衡机制也是非常重要的。

image.png

RocketMQ生产者原理分析

既然生产者负载均衡如此重要,我们看下是如何实现的。

我们通常使用如下方法发送消息:

构建消息
Message msg = new Message("TopicTest",
    "TagA",
    "OrderID188",
    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息    
SendResult sendResult = producer.send(msg);
AI 代码解读

RocketMQ发送消息的核心逻辑在DefaultMQProducerImplsendDefaultImpl

image.png

在发送消息流程利里面有一行非常关键的逻辑,selectOneMessageQueue,看方法名称就可以知道其含义,选择一个消息队列。


public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
   
   
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }
AI 代码解读

里面是通过策略类来实现的。

image.png

策略类最终通过org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 实现。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
   
   
        //生产者第一次发消息
        if (lastBrokerName == null) {
   
   
            return selectOneMessageQueue();
        } else {
   
   
            //非第一次,重试发消息的情况,
            for (int i = 0; i < this.messageQueueList.size(); i++) {
   
   
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //重试的情况,不取上一个broker的队列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
   
   
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
AI 代码解读

第一次发消息选择队列核心逻辑在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()

//线程安全的index
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

public MessageQueue selectOneMessageQueue() {
   
   
        //获取一个基础索引,每次自增1 这个全局存在TopicPublishInfo 每一个topic
        int index = this.sendWhichQueue.getAndIncrement();
        // 基础索引和 消息写队列大小 进行取模 用来实现轮训的算法
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;

        return this.messageQueueList.get(pos);
    }
AI 代码解读

哈哈,这里就是生产者负载均衡轮询机制的核心逻辑了,使用到了ThreadLocal技术,sendWhichQueue为每个生产者线程维护一个自己的下标索引。

基础索引计算器,使用ThreadLocal技术针对不同的生产者线程第一次随机,后面递增,可以更加负载均衡。

public class ThreadLocalIndex {
   
   
    //关键技术
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
   
   
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
   
   
            //第一次随机
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }
        //第二次索引位置开始自增1
        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }
}
AI 代码解读

哈哈,有没有觉得这个实现非常巧妙了。不同的生产者线程都拥有自己的索引因子,分配队列更加均衡。

image.png

总结

本文分析了RocketMQ生产者底层的实现,设计地方有巧妙之处,值得我们学习,上面是发送消息消息选择队列的默认实现的场景, RocketMQ提供了更加灵活的api,我们作为使用者可以指定负载均衡策略,比如顺序消息场景

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   
   
    return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}
AI 代码解读

RocketMQ也提供了了几个负载均衡实现。支持hash,随机,当然我们也可以自己实现一个负载均衡策略,满足我们的业务需求。

image.png

如果文章内容对您有帮助,欢迎关注我,我们一起学习。

image.png

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
3
3
0
42
分享
相关文章
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
437 21
RocketMQ原理—5.高可用+高并发+高性能架构
RocketMQ原理—4.消息读写的性能优化
本文详细解析了RocketMQ消息队列的核心原理与性能优化机制,涵盖Producer消息分发、Broker高并发写入、Consumer拉取消息流程等内容。重点探讨了基于队列的消息分发、Hash有序分发、CommitLog内存写入优化、ConsumeQueue物理存储设计等关键技术点。同时分析了数据丢失场景及解决方案,如同步刷盘与JVM OffHeap缓存分离策略,并总结了写入与读取流程的性能优化方法,为理解和优化分布式消息系统提供了全面指导。
RocketMQ原理—4.消息读写的性能优化
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
107 11
RocketMQ原理—3.源码设计简单分析下
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳
RocketMQ原理—1.RocketMQ整体运行原理
本文详细解析了RocketMQ的整体运行原理,涵盖从生产者到消费者的全流程。首先介绍生产者发送消息的机制,包括Topic与MessageQueue的关系及写入策略;接着分析Broker如何通过CommitLog和ConsumeQueue实现消息持久化,并探讨同步与异步刷盘的优缺点。同时,讲解基于DLedger技术的主从同步原理,确保高可用性。消费者部分则重点讨论消费模式(集群 vs 广播)、拉取消息策略及负载均衡机制。网络通信层面,基于Netty的高性能架构通过多线程池分工协作提升并发能力。最后,揭示mmap与PageCache技术优化文件读写的细节,总结了RocketMQ的核心运行机制。
RocketMQ原理—1.RocketMQ整体运行原理
slb健康检查的基本原理
slb健康检查的基本原理
140 6
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
5大负载均衡算法及原理,图解易懂!
本文详细介绍负载均衡的5大核心算法:轮询、加权轮询、随机、最少连接和源地址散列,帮助你深入理解分布式架构中的关键技术。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
5大负载均衡算法及原理,图解易懂!
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
150 3
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问