前言
在上一篇文章中RocketMQ消费者如何实现重平衡,我们简单讲述了RocketMQ
消费者是如何实现重平衡的,我们在源码中发现默认的重平衡策略是平均分配策略AllocateMessageQueueAveragely
,另外我们还可以设置其他的重平衡策略,你知道有哪几种嘛?你用过其中的哪些分配策略呢?
AllocateMessageQueueAveragely【平均分配】
在上一篇文章中,我们简单说明了平均分配
的结果,下面我们根据源码来看一下:
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); // 检查参数,不合理的参数就不做分配了 if (!check(consumerGroup, currentCID, mqAll, cidAll)) { return result; } // cidAll和mqAll已经排序好了,保证所有消费者实例拿到的顺序一致; // 获取当前消费者ID在cidAll中的索引位置 int index = cidAll.indexOf(currentCID); // 计算取模结果 int mod = mqAll.size() % cidAll.size(); // 计算每隔消费者平均分配到的数量 int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); // 计算起始索引值 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 计算遍历此时 int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { // 计算目标MessageQueue,并添加到结果集中 result.add(mqAll.get((startIndex + i) % mqAll.size())); } // 返回结果集 return result; } 复制代码
举个例子说明一下:
假如有7个MessageQueue
,3个消费者,分配结果如下:
1.消费者C1分配到的MessageQueue索引值为[0,1,2];
2.消费者C2分配到的MessageQueue索引值为[3,4];
3.消费者C3分配到的MessageQueue索引值为[5,6];
如果说MessageQueue
数量小于消费者的数量,比如当前MessageQueue
数量为2,消费者数量为3,那么第三个消费者是不会被分配MessageQueue
的,只有前两个消费者各1个MessageQueue
;
AllocateMessageQueueAveragelyByCircle【平均交叉分配】
这个平均交叉
的分配策略实现方式更简单:
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!check(consumerGroup, currentCID, mqAll, cidAll)) { return result; } // 计算当前消费者ID所在索引位置 int index = cidAll.indexOf(currentCID); for (int i = index; i < mqAll.size(); i++) { // 每次间隔cidAll.size个messageQueue就放进结果集 if (i % cidAll.size() == index) { result.add(mqAll.get(i)); } } // 返回结果集 return result; } 复制代码
依然还是之前的实例:
假如有7个MessageQueue
,3个消费者,分配结果如下:
1.消费者C1分配到的MessageQueue索引值为[0,3,6];
2.消费者C2分配到的MessageQueue索引值为[1,4];
3.消费者C3分配到的MessageQueue索引值为[2,5];
AllocateMessageQueueConsistentHash【一致性哈希分配】
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!check(consumerGroup, currentCID, mqAll, cidAll)) { return result; } // 把每一个消费者ID放进集合,准备创建一致性哈希环 Collection<ClientNode> cidNodes = new ArrayList<ClientNode>(); for (String cid : cidAll) { cidNodes.add(new ClientNode(cid)); } final ConsistentHashRouter<ClientNode> router; //for building hash ring // 创建一致性哈希环 if (customHashFunction != null) { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction); } else { router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt); } List<MessageQueue> results = new ArrayList<MessageQueue>(); for (MessageQueue mq : mqAll) { // 判断MessageQueue落在那一个虚拟节点上面 ClientNode clientNode = router.routeNode(mq.toString()); // 对比当前消费者ID与节点的key,如果一致,说明落在当前消费者ID节点上面 if (clientNode != null && currentCID.equals(clientNode.getKey())) { // 放进结果集 results.add(mq); } } // 返回结果集 return results; } 复制代码
上述源码的算法可以大概概括为以下几点:
1.假如当前有3个消费者,那么把这三个消费者ID收集好,放在一个集合中;
2.创建一个哈希环,上面遍布了若干个虚拟节点,所有的虚拟节点均匀地与3个消费者关联;
3.遍历所有的
MessageQueue
,把每一个MessageQueue
放到哈希环中,看它落在哪个虚拟节点上面,对应的虚拟节点的key
如果与当前消费者ID一样,那么就把MessageQueue
分配给当前消费者;
一致性哈希算法其实就是为了让分配结果更加均匀一点,这个算法的分配结果图示就不好画了;
AllocateMessageQueueByMachineRoom【机房编号分配】
看名称就知道这个分配策略是根据机房编号来执行的,所以里面需要我们准备好机房编号:
private Set<String> consumeridcs; 复制代码
consumeridcs
里面是当前消费者需要拉取的Broker
所在的机房编号集合,并且要求该Broker
命名格式如下:机房编号@brokerName
,一定要保持机房编号
与consumeridcs
里面的元素一致;
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!check(consumerGroup, currentCID, mqAll, cidAll)) { return result; } // 获取当前消费者所在的索引值 int currentIndex = cidAll.indexOf(currentCID); if (currentIndex < 0) { return result; } // 准备收集目标机房的MessageQueue List<MessageQueue> premqAll = new ArrayList<MessageQueue>(); for (MessageQueue mq : mqAll) { // brokerName按照@符号分割 String[] temp = mq.getBrokerName().split("@"); // 取出前面的【机房编号】,检查consumeridcs是否包含【机房编号】 if (temp.length == 2 && consumeridcs.contains(temp[0])) { // 符合要求的MessageQueue就收集起来 premqAll.add(mq); } } // MessageQueue数量除消费者数量,7/3 = 2 int mod = premqAll.size() / cidAll.size(); // MessageQueue数量对消费者数量取模 7%3 = 1 int rem = premqAll.size() % cidAll.size(); // 计算起始位置索引 2*0 = 0 int startIndex = mod * currentIndex; // 计算结束位置索引 0+2 = 2 int endIndex = startIndex + mod; // 取出指定的MessageQueue放进结果集中 0 1 6 for (int i = startIndex; i < endIndex; i++) { result.add(premqAll.get(i)); } // 如果还有多的MessageQueue,那么就按顺序每个消费者分一个 if (rem > currentIndex) { result.add(premqAll.get(currentIndex + mod * cidAll.size())); } return result; } 复制代码
举例来分析一下:
假如有两个机房,第一个机房3个MessageQueue
,名称Shanghai-A@Broker-a
,第二个机房4个MessageQueue
,名称为Hangzhou-A@Broker-b
,现在有3个消费者,每隔消费者中consumeridcs
中的元素为Shanghai-A
、Hangzhou-A
,那么意味着这三个消费者来分配两个机房总共7个MessageQueue
;
我们在上一篇文章中讲过,mqAll
是排序好的,分配的结果如下:
AllocateMachineRoomNearby【同机房优先分配】
这个分配策略需要传两个参数,第一个是AllocateMessageQueueStrategy
分配策略,第二个是MachineRoomResolver
;第二个参数用来解析MessageQueue
所在机房编号和消费者所在机房编号,第一个参数用来执行分配任务;所以这个AllocateMachineRoomNearby
其实是在更细粒度上执行的分配策略;
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); if (!check(consumerGroup, currentCID, mqAll, cidAll)) { return result; } //group mq by machine room Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>(); for (MessageQueue mq : mqAll) { // 获取messageQueue所在的机房编号 String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq); if (StringUtils.isNoneEmpty(brokerMachineRoom)) { if (mr2Mq.get(brokerMachineRoom) == null) { mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>()); } // 按编号放进Map中,同编号的在一个ArrayList中 mr2Mq.get(brokerMachineRoom).add(mq); } else { throw new IllegalArgumentException("Machine room is null for mq " + mq); } } //group consumer by machine room Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>(); for (String cid : cidAll) { // 获取消费者所在机房编号 String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid); if (StringUtils.isNoneEmpty(consumerMachineRoom)) { if (mr2c.get(consumerMachineRoom) == null) { mr2c.put(consumerMachineRoom, new ArrayList<String>()); } // 按编号放进Map中,同编号的在一个ArrayList中 mr2c.get(consumerMachineRoom).add(cid); } else { throw new IllegalArgumentException("Machine room is null for consumer id " + cid); } } List<MessageQueue> allocateResults = new ArrayList<MessageQueue>(); // 获取当前消费者所在机房编号 String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID); // 取出当前机房的所有MessageQueue List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom); // 取出当前机房的所有消费者 List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom); if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) { // 按照设置的分配策略来分配同一机房内的MessageQueue allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom)); } // 如果MessageQueue部署的机房中没有消费者,那么所有的消费者共同分配这些MessageQueue for (Entry<String, List<MessageQueue>> machineRoomEntry : mr2Mq.entrySet()) { if (!mr2c.containsKey(machineRoomEntry.getKey())) { // no alive consumer in the corresponding machine room, so all consumers share these queues allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, machineRoomEntry.getValue(), cidAll)); } } // 返回结果集 return allocateResults; } 复制代码
这个分配策略也好理解,就是同一个机房的MessageQueue
由当前机房的消费者自己分配,如果MessageQueue
所在的机房没有消费者,那么就由所有消费者共同分配;
依然举例说明:
假如当前有三个机房Hangzhou-A
、Shanghai-A
、Shenzhen-A
,Hangzhou-A
机房中有4个MessageQueue
,2个消费者,Shanghai-A
机房有3个MessageQueue
,2个消费者,Shenzhen-A
机房有2个MessageQueue
,没有消费者;我们采用采用AllocateMachineRoomNearby
分配策略,并且子分配策略是AllocateMessageQueueAveragely
平均分配,那么最终的分配结果如下所示:
AllocateMessageQueueByConfig【固定分配】
这个策略基本不用,因为它相当于用户自己写死了当前消费者消费哪一个MessageQueue
,大家可以看一下代码:
public class AllocateMessageQueueByConfig extends AbstractAllocateMessageQueueStrategy { private List<MessageQueue> messageQueueList; @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 返回用户指定的MessageQueue列表 return this.messageQueueList; } @Override public String getName() { return "CONFIG"; } public List<MessageQueue> getMessageQueueList() { return messageQueueList; } public void setMessageQueueList(List<MessageQueue> messageQueueList) { this.messageQueueList = messageQueueList; } } 复制代码
使用这个策略根本无法实现重平衡,看看就行;
自定义重平衡策略
我们也可以根据自己的实际情况来实现自定义的重平衡策略,只需要实现AllocateMessageQueueStrategy
接口即可,或者继承AbstractAllocateMessageQueueStrategy
也可以,然后在创建好消费者实例后把自定义的分配策略设置进去,如下:
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group); consumer.setNamesrvAddr(nameSrv); consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() { @Override public List<MessageQueue> allocate(String s, String s1, List<MessageQueue> list, List<String> list1) { // TODO return null; } @Override public String getName() { // TODO return null; } }); 复制代码
小结
这篇文章带我们认识了以下几种重平衡分配策略,并通过图示了解了它们的分配原理:
1.AllocateMessageQueueAveragely【平均分配】;
2.AllocateMessageQueueAveragelyByCircle【平均交叉分配】;
3.AllocateMessageQueueConsistentHash【一致性哈希分配】;
4.AllocateMessageQueueByMachineRoom【机房编号分配】;
5.AllocateMachineRoomNearby【同机房优先分配】;
6.AllocateMessageQueueByConfig【固定分配】;
7.自定义分配策略;