RocketMQ重平衡策略你用过几种?

简介: RocketMQ重平衡策略你用过几种?

前言

在上一篇文章中RocketMQ消费者如何实现重平衡,我们简单讲述了RocketMQ消费者是如何实现重平衡的,我们在源码中发现默认的重平衡策略是平均分配策略AllocateMessageQueueAveragely,另外我们还可以设置其他的重平衡策略,你知道有哪几种嘛?你用过其中的哪些分配策略呢?

AllocateMessageQueueAveragely【平均分配】

在上一篇文章中,我们简单说明了平均分配的结果,下面我们根据源码来看一下:

@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        // 检查参数,不合理的参数就不做分配了
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }
        // cidAll和mqAll已经排序好了,保证所有消费者实例拿到的顺序一致;
        // 获取当前消费者ID在cidAll中的索引位置
        int index = cidAll.indexOf(currentCID);
        // 计算取模结果
        int mod = mqAll.size() % cidAll.size();
        // 计算每隔消费者平均分配到的数量
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // 计算起始索引值
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 计算遍历此时
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            // 计算目标MessageQueue,并添加到结果集中
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        // 返回结果集
        return result;
    }
复制代码

举个例子说明一下:

假如有7个MessageQueue,3个消费者,分配结果如下:

1.消费者C1分配到的MessageQueue索引值为[0,1,2];

2.消费者C2分配到的MessageQueue索引值为[3,4];

3.消费者C3分配到的MessageQueue索引值为[5,6];

image.png


如果说MessageQueue数量小于消费者的数量,比如当前MessageQueue数量为2,消费者数量为3,那么第三个消费者是不会被分配MessageQueue的,只有前两个消费者各1个MessageQueue

AllocateMessageQueueAveragelyByCircle【平均交叉分配】

这个平均交叉的分配策略实现方式更简单:

@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }
        // 计算当前消费者ID所在索引位置
        int index = cidAll.indexOf(currentCID);
        for (int i = index; i < mqAll.size(); i++) {
            // 每次间隔cidAll.size个messageQueue就放进结果集
            if (i % cidAll.size() == index) {
                result.add(mqAll.get(i));
            }
        }
        // 返回结果集
        return result;
    }
复制代码

依然还是之前的实例:

假如有7个MessageQueue,3个消费者,分配结果如下:

1.消费者C1分配到的MessageQueue索引值为[0,3,6];

2.消费者C2分配到的MessageQueue索引值为[1,4];

3.消费者C3分配到的MessageQueue索引值为[2,5];


image.png


AllocateMessageQueueConsistentHash【一致性哈希分配】

@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }
        // 把每一个消费者ID放进集合,准备创建一致性哈希环
        Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
        for (String cid : cidAll) {
            cidNodes.add(new ClientNode(cid));
        }
        final ConsistentHashRouter<ClientNode> router; //for building hash ring
        // 创建一致性哈希环
        if (customHashFunction != null) {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
        } else {
            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
        }
        List<MessageQueue> results = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            // 判断MessageQueue落在那一个虚拟节点上面
            ClientNode clientNode = router.routeNode(mq.toString());
            // 对比当前消费者ID与节点的key,如果一致,说明落在当前消费者ID节点上面
            if (clientNode != null && currentCID.equals(clientNode.getKey())) {
                // 放进结果集
                results.add(mq);
            }
        }
        // 返回结果集
        return results;
    }
复制代码

上述源码的算法可以大概概括为以下几点:

1.假如当前有3个消费者,那么把这三个消费者ID收集好,放在一个集合中;

2.创建一个哈希环,上面遍布了若干个虚拟节点,所有的虚拟节点均匀地与3个消费者关联;

3.遍历所有的MessageQueue,把每一个MessageQueue放到哈希环中,看它落在哪个虚拟节点上面,对应的虚拟节点的key如果与当前消费者ID一样,那么就把MessageQueue分配给当前消费者;

一致性哈希算法其实就是为了让分配结果更加均匀一点,这个算法的分配结果图示就不好画了;

AllocateMessageQueueByMachineRoom【机房编号分配】

看名称就知道这个分配策略是根据机房编号来执行的,所以里面需要我们准备好机房编号:

private Set<String> consumeridcs;
复制代码

consumeridcs里面是当前消费者需要拉取的Broker所在的机房编号集合,并且要求该Broker命名格式如下:机房编号@brokerName,一定要保持机房编号consumeridcs里面的元素一致;

@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }
        // 获取当前消费者所在的索引值
        int currentIndex = cidAll.indexOf(currentCID);
        if (currentIndex < 0) {
            return result;
        }
        // 准备收集目标机房的MessageQueue
        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            // brokerName按照@符号分割
            String[] temp = mq.getBrokerName().split("@");
            // 取出前面的【机房编号】,检查consumeridcs是否包含【机房编号】
            if (temp.length == 2 && consumeridcs.contains(temp[0])) {
                // 符合要求的MessageQueue就收集起来
                premqAll.add(mq);
            }
        }
        // MessageQueue数量除消费者数量,7/3 = 2
        int mod = premqAll.size() / cidAll.size();
        // MessageQueue数量对消费者数量取模 7%3 = 1
        int rem = premqAll.size() % cidAll.size();
        // 计算起始位置索引 2*0 = 0
        int startIndex = mod * currentIndex;
        // 计算结束位置索引 0+2 = 2
        int endIndex = startIndex + mod;
        // 取出指定的MessageQueue放进结果集中  0 1 6
        for (int i = startIndex; i < endIndex; i++) {
            result.add(premqAll.get(i));
        }
        // 如果还有多的MessageQueue,那么就按顺序每个消费者分一个
        if (rem > currentIndex) {
            result.add(premqAll.get(currentIndex + mod * cidAll.size()));
        }
        return result;
    }
复制代码

举例来分析一下:

假如有两个机房,第一个机房3个MessageQueue,名称Shanghai-A@Broker-a,第二个机房4个MessageQueue,名称为Hangzhou-A@Broker-b,现在有3个消费者,每隔消费者中consumeridcs中的元素为Shanghai-AHangzhou-A,那么意味着这三个消费者来分配两个机房总共7个MessageQueue

我们在上一篇文章中讲过,mqAll是排序好的,分配的结果如下:


image.png


AllocateMachineRoomNearby【同机房优先分配】

这个分配策略需要传两个参数,第一个是AllocateMessageQueueStrategy分配策略,第二个是MachineRoomResolver;第二个参数用来解析MessageQueue所在机房编号和消费者所在机房编号,第一个参数用来执行分配任务;所以这个AllocateMachineRoomNearby其实是在更细粒度上执行的分配策略;

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
        return result;
    }
    //group mq by machine room
    Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    for (MessageQueue mq : mqAll) {
        // 获取messageQueue所在的机房编号
        String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
        if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
            if (mr2Mq.get(brokerMachineRoom) == null) {
                mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
            }
            // 按编号放进Map中,同编号的在一个ArrayList中
            mr2Mq.get(brokerMachineRoom).add(mq);
        } else {
            throw new IllegalArgumentException("Machine room is null for mq " + mq);
        }
    }
    //group consumer by machine room
    Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
    for (String cid : cidAll) {
        // 获取消费者所在机房编号
        String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
        if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
            if (mr2c.get(consumerMachineRoom) == null) {
                mr2c.put(consumerMachineRoom, new ArrayList<String>());
            }
            // 按编号放进Map中,同编号的在一个ArrayList中
            mr2c.get(consumerMachineRoom).add(cid);
        } else {
            throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
        }
    }
    List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
    // 获取当前消费者所在机房编号
    String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
    // 取出当前机房的所有MessageQueue
    List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    // 取出当前机房的所有消费者
    List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
        // 按照设置的分配策略来分配同一机房内的MessageQueue
        allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    }
    // 如果MessageQueue部署的机房中没有消费者,那么所有的消费者共同分配这些MessageQueue
    for (Entry<String, List<MessageQueue>> machineRoomEntry : mr2Mq.entrySet()) {
        if (!mr2c.containsKey(machineRoomEntry.getKey())) { // no alive consumer in the corresponding machine room, so all consumers share these queues
            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, machineRoomEntry.getValue(), cidAll));
        }
    }
    // 返回结果集
    return allocateResults;
}
复制代码

这个分配策略也好理解,就是同一个机房的MessageQueue由当前机房的消费者自己分配,如果MessageQueue所在的机房没有消费者,那么就由所有消费者共同分配;

依然举例说明:

假如当前有三个机房Hangzhou-AShanghai-AShenzhen-AHangzhou-A机房中有4个MessageQueue,2个消费者,Shanghai-A机房有3个MessageQueue,2个消费者,Shenzhen-A机房有2个MessageQueue,没有消费者;我们采用采用AllocateMachineRoomNearby分配策略,并且子分配策略是AllocateMessageQueueAveragely平均分配,那么最终的分配结果如下所示:


image.png


AllocateMessageQueueByConfig【固定分配】

这个策略基本不用,因为它相当于用户自己写死了当前消费者消费哪一个MessageQueue,大家可以看一下代码:

public class AllocateMessageQueueByConfig extends AbstractAllocateMessageQueueStrategy {
    private List<MessageQueue> messageQueueList;
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        // 返回用户指定的MessageQueue列表
        return this.messageQueueList;
    }
    @Override
    public String getName() {
        return "CONFIG";
    }
    public List<MessageQueue> getMessageQueueList() {
        return messageQueueList;
    }
    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
        this.messageQueueList = messageQueueList;
    }
}
复制代码

使用这个策略根本无法实现重平衡,看看就行;

自定义重平衡策略

我们也可以根据自己的实际情况来实现自定义的重平衡策略,只需要实现AllocateMessageQueueStrategy接口即可,或者继承AbstractAllocateMessageQueueStrategy也可以,然后在创建好消费者实例后把自定义的分配策略设置进去,如下:

DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group);
consumer.setNamesrvAddr(nameSrv);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {
        @Override
        public List<MessageQueue> allocate(String s, String s1, List<MessageQueue> list, List<String> list1) {
          // TODO
          return null;
        }
        @Override
        public String getName() {
          // TODO
          return null;
        }
      });
复制代码

小结

这篇文章带我们认识了以下几种重平衡分配策略,并通过图示了解了它们的分配原理:

1.AllocateMessageQueueAveragely【平均分配】;

2.AllocateMessageQueueAveragelyByCircle【平均交叉分配】;

3.AllocateMessageQueueConsistentHash【一致性哈希分配】;

4.AllocateMessageQueueByMachineRoom【机房编号分配】;

5.AllocateMachineRoomNearby【同机房优先分配】;

6.AllocateMessageQueueByConfig【固定分配】;

7.自定义分配策略;


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 容灾
RabbitMQ的故障恢复与容灾策略
【8月更文第28天】RabbitMQ是一个开源的消息代理软件,它支持多种消息协议,如AMQP(Advanced Message Queuing Protocol)。在实际应用中,为了保证服务的连续性,需要实施一系列的故障恢复与容灾策略。
232 2
|
5月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
160 2
|
5月前
|
消息中间件 存储 Kubernetes
消息队列 MQ使用问题之支持哪些消息分配策略
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1351 3
|
7月前
|
消息中间件 负载均衡 算法
RocketMQ源码(三)简单探索Producer和Consumer与Queue之间的负载均衡策略
- Producer如何将消息负载均衡发送给queue? - Consumer如何通过负载均衡并发消费queue的消息?
585 0
|
消息中间件 RocketMQ 索引
RocketMQ消费者如何实现重平衡
RocketMQ消费者如何实现重平衡
538 0
|
消息中间件 存储 网络协议
|
消息中间件 缓存 监控
RocketMQ消息积压,异步方案,缓存策略解决方案
RocketMQ消息积压,异步方案,缓存策略解决方案
RocketMQ消息积压,异步方案,缓存策略解决方案
|
消息中间件 缓存 Kafka
关于RocketMQ消息拉取与重平衡的一些问题探讨
其实最好的学习方式就是互相交流,最近也有跟网友讨论了一些关于 RocketMQ 消息拉取与重平衡的问题,我姑且在这里写下我的一些总结。
481 0
关于RocketMQ消息拉取与重平衡的一些问题探讨