2.2.1、不启用broker故障延迟
既然sendLatencyFaultEnable默认是false,那就先看当sendLatencyFaultEnable=false时候的逻辑
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 第一次就是null,第二次(也就是重试的时候)就不是null了。 if (lastBrokerName == null) { // 第一次选择队列的逻辑 return selectOneMessageQueue(); } else { // 第一次选择队列发送消息失败了,第二次重试的时候选择队列的逻辑 int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 过滤掉上次发送消息失败的队列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
那就继续看第一次选择队列的逻辑:
public MessageQueue selectOneMessageQueue() { // 当前线程有个ThreadLocal变量,存放了一个随机数 {@link org.apache.rocketmq.client.common.ThreadLocalIndex#getAndIncrement} // 然后取出随机数根据队列长度取模且将随机数+1 int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) { pos = 0; } return this.messageQueueList.get(pos); }
好吧,其实也有点随机一个的意思。但是亮点在于取出随机数根据队列长度取模且将随机数+1,这个+1亮了(getAndIncrement cas +1)。
当消息第一次发送失败时,lastBrokerName会存放当前选择失败的broker(mq = mqSelected),通过重试,此时lastBrokerName有值,代表上次选择的boker发送失败,则重新对sendWhichQueue本地线程变量+1,遍历选择消息队列,直到不是上次的broker,也就是为了规避上次发送失败的broker的逻辑所在。
举个例子:你这次随机数是1,队列长度是4,1%4=1,这时候失败了,进入重试,那么重试之前,也就是在上一步1%4之后,他把1进行了++操作,变成了2,那么你这次重试的时候就是2%4=2,直接过滤掉了刚才失败的broker。
那就继续看第二次重试选择队列的逻辑:
// +1 int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { // 取模 int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 过滤掉上次发送消息失败的队列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // 没找到能用的queue的话继续走默认的那个 return selectOneMessageQueue();
so easy,你上次不是失败了,进入我这里重试来了吗?我也很简单,我就还是取出随机数+1然后取模队列长度,我看这个broker是不是上次失败的那个,是他小子的话就过滤掉,继续遍历queue找下一个能用的。
2.2.2、启用broker故障延迟
也就是下面if里的逻辑
if (this.sendLatencyFaultEnable) { .... }
看上面的注释就行了,很清晰了,就是我先(随机数 +1) % queue.size()
,然后看你这个queue所属的broker是否可用,可用的话且不是重试进来的或失败重试的情况,如果和选择的队列是上次重试是一样的,那直接return你就完事了。那么怎么看broker是否可用的呢?
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable(String)} public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } // {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable()} public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
疑问:
- faultItemTable是什么时候放进去的?
- isAvailable() 为什么只是判断一个时间就可以知道Broker是否可用?
这就需要上面发送消息完成后所调用的这个方法了:
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem} // 发送开始时间 beginTimestampPrev = System.currentTimeMillis(); // 进行发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); // 发送结束时间 endTimestamp = System.currentTimeMillis(); // 更新broker的延迟情况 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
细节逻辑如下:
// {@link org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem} public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 首次isolation传入的是false,currentLatency是发送消息所耗费的时间,如下 // this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; // 根据延迟时间对比MQFaultStrategy中的延迟级别数组latencyMax 不可用时长数组notAvailableDuration 来将该broker加进faultItemTable中。 private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { // 假设currentLatency花费了10ms,那么latencyMax里的数据显然不符合下面的所有判断,所以直接return 0; if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } // {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem()} @Override // 其实主要就是给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用 // 也就是说只有notAvailableDuration == 0的时候,isAvailable()才会返回true。 public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); // 给startTimestamp赋值为当前时间+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的结果,给isAvailable()所用 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
下面这两句代码详细解释下:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyMax | notAvailableDuration |
50L | 0L |
100L | 0L |
550L | 30000L |
1000L | 60000L |
2000L | 120000L |
3000L | 180000L |
15000L | 600000L |
即
- currentLatency大于等于50小于100,则notAvailableDuration为0
- currentLatency大于等于100小于550,则notAvailableDuration为0
- currentLatency大于等于550小于1000,则notAvailableDuration为300000
- …等等
再来举个例子:
假设isolation传入true,
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
那么notAvailableDuration将传入600000L。结合isAvailable方法,大概流程如下:
RocketMQ为每个Broker预测了个可用时间(当前时间+notAvailableDuration),当当前时间大于该时间,才代表Broker可用,而notAvailableDuration有6个级别和latencyMax的区间一一对应,根据传入的currentLatency去预测该Broker在什么时候可用。
所以再来看这个
public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
根据执行时间来看落入哪个区间,在0~100的时间内notAvailableDuration都是0,都是可用的,大于该值后,可用的时间就会开始变大了,就认为不是最优解,直接舍弃。
2.3、调用链
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(xxx) -> MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(xxx) org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)
2.4、总结
- 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
- 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
- 如果上次失败的Broker可用那么还是会选择该Broker的队列
- 如果上述情况失败,则随机选择一个进行发送
- 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间
三、总结
1、疑问
他搞了两个重载send()方法,一个支持算法选择器,一个不支持算法选择,queue的算法选择是个典型的策略模式。为什么send(message)
方法内置的queue选择算法不抽出到单独的类中,然后此类实现
org.apache.rocketmq.client.producer.MessageQueueSelector
接口呢?比如叫:SelectMessageQueueByBest
,比如如下:
public class org.apache.rocketmq.client.producer.DefaultMQProducer { // 只发送消息,queue的选择由默认的算法来实现 @Override public SendResult send(Collection<Message> msgs) { this.send(msgs, new SelectMessageQueueByBest().select(xxx)); } // 自定义选择queue的算法进行发消息 @Override public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {} }
我猜测可能是这个算法过于复杂,与其它类的交互也过于多,参数也可能和内置的其他三个不同,所以没搞到一起,但是还是搞到一起规范呀,干的同一件事,只是算法不同,很典型的策略模式。
2、面试
问:发消息的时候选择queue的算法有哪些?
答:分为两种,一种是直接发消息,不能选择queue,这种的queue选择算法如下:
- 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
- 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
- 如果上次失败的Broker可用那么还是会选择该Broker的队列
- 如果上述情况失败,则随机选择一个进行发送
- 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间
另外一种是发消息的时候可以选择算法甚至还可以实现接口自定义算法:
SelectMessageQueueByRandom
:随机SelectMessageQueueByHash
:hashSelectMessageQueueByMachineRoom
- 实现接口自定义
END