一、说明
分为两种,一种是直接发消息,client内部有选择queue的算法,不允许外界改变。还有一种是可以自定义queue的选择算法(内置了三种算法,不喜欢的话可以自定义算法实现)。
public class org.apache.rocketmq.client.producer.DefaultMQProducer { // 只发送消息,queue的选择由默认的算法来实现 @Override public SendResult send(Collection<Message> msgs) {} // 自定义选择queue的算法进行发消息 @Override public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {} }
二、源码
1、send(msg, mq)
1.1、使用场景
有时候我们不希望默认的queue选择算法,而是需要自定义,一般最常用的场景在顺序消息,顺序消息的发送一般都会指定某组特征的消息都发当同一个queue里,这样才能保证顺序,因为单queue是有序的。
对顺序消息不明白的请看我之前的顺序消息文章。
1.2、原理剖析
内置了三种算法,三种算法都实现了一个共同的接口:
org.apache.rocketmq.client.producer.MessageQueueSelector
SelectMessageQueueByRandom
SelectMessageQueueByHash
SelectMessageQueueByMachineRoom
- 要想自定义逻辑的话,直接实现接口重写select方法即可。
很典型的策略模式,不同算法不同实现类,有个顶层接口。
1.2.1、SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // mqs.size():队列的个数。假设队列个数是4,那么这个value就是0-3之间随机。 int value = random.nextInt(mqs.size()); return mqs.get(value); } }
so easy,就是纯随机。
mqs.size():队列的个数。假设队列个数是4,那么这个value就是0-3之间随机。
1.2.2、SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); // 防止出现负数,取个绝对值,这也是我们平时开发中需要注意到的点 if (value < 0) { value = Math.abs(value); } // 直接取余队列个数。 value = value % mqs.size(); return mqs.get(value); } }
so easy,就是纯取余。
mqs.size():队列的个数。假设队列个数是4,且value的hashcode是3,那么3 % 4 = 3,那么就是最后一个队列,也就是四号队列,因为下标从0开始。
1.2.3、SelectMessageQueueByMachineRoom
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } public Set<String> getConsumeridcs() { return consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } }
没看懂有啥鸟用,直接return null; 所以如果有自定义需求的话直接自定义就好了,这玩意没看出有啥卵用。
1.2.4、自定义算法
public class MySelectMessageQueue implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(0); } }
永远都选择0号队列,也就是第一个队列。只是举个例子,实际看你业务需求。
1.3、调用链
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg) -> org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg) -> org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg, long timeout) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl(xxx) -> mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); -> selector.select(messageQueueList, userMessage, arg) -> org.apache.rocketmq.client.producer.MessageQueueSelector#select(final List<MessageQueue> mqs, final Message msg, final Object arg)
2、send(msg)
2.1、使用场景
一般没特殊需求的场景都用这个。因为他默认的queue选择算法很不错,各种优化场景都替我们想到了。
2.2、原理剖析
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl} // 这是发送消息核心原理,不清楚的看我之前发消息源码分析的文章 // 选择消息要发送的队列 MessageQueue mq = null; for (int times = 0; times < 3; times++) { // 首次肯定是null String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 调用下面的方法进行选择queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { // 给mq赋值,如果首次失败了,那么下次重试的时候(也就是下次for的时候),mq就有值了。 mq = mqSelected; ...... // 很关键,能解答下面会提到的两个问题: // 1.faultItemTable是什么时候放进去的? // 2.isAvailable() 为什么只是判断一个时间就可以知道Broker是否可用? this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); } }
选择queue的主入口
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 默认为false,代表不启用broker故障延迟 if (this.sendLatencyFaultEnable) { try { // 随机数且+1 int index = tpInfo.getSendWhichQueue().getAndIncrement(); // 遍历 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // 先(随机数 +1) % queue.size() int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) { pos = 0; } MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 看找到的这个queue所属的broker是不是可用的 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 非失败重试,直接返回到的队列 // 失败重试的情况,如果和选择的队列是上次重试是一样的,则返回 // 也就是说如果你这个queue所在的broker可用, // 且不是重试进来的或失败重试的情况,如果和选择的队列是上次重试是一样的,那你就是天选之子了。 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 如果所有队列都不可用,那么选择一个相对好的broker,不考虑可用性的消息队列 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 随机选择一个queue return tpInfo.selectOneMessageQueue(); } // 当sendLatencyFaultEnable=false的时候选择queue的方法,默认就是false。 return tpInfo.selectOneMessageQueue(lastBrokerName); }