本文首先会介绍消费者的推模式,拉模式,接着会介绍 生产者负载均衡策略。
介绍之前我们首先需要拉取RocketMQ的源码,源码地址是:https://github.com/apache/rocketmq.git。
1. 消费者的消费模式
RocketMQ 同时支持消费者的推模式以及拉模式。推模式顾名思义就是broker将消息推送给消费者,拉模式则是消费者主动到队列中拉取消息。默认情况下,RocketMQ使用的是推模式。
在IDEA中导入RocketMQ源码之后,找到 example模块,然后在此模块中找到各种例子。
1.1.推模式
消费者推模式的例子就是 org.apache.rocketmq.example.simple.PushConsumer
。推模式的消费者的实现类是 DefaultMQPushConsumer 。之前的文章已经做了详细介绍,在此就不在赘述了。推模式适合于大部分正常消费的情况
public static final String TOPIC = "TopicTest"; public static final String CONSUMER_GROUP = "CID_JODIE_1"; public static final String NAMESRV_ADDR = "127.0.0.1:9876"; public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); // Uncomment the following line while debugging, namesrvAddr should be set to your local address // consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(TOPIC, "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
1.2. 拉模式
消费者拉模式的例子是:org.apache.rocketmq.example.simple.LitePullConsumerAssign
。拉模式主要适用于回溯消费消息。比如:某个消息你消费失败了,你现在想重新消费该消息的情况。我们知道RocketMQ中消息消费完之后不会里面会被删除,默认会在队列中保留48小时。通过broker配置文件中的fileReservedTime参数进行设置。
//1.创建DefaultLitePullConsumer实例 DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); litePullConsumer.setAutoCommit(false); //2.启动litePullConsumer实例 litePullConsumer.start(); //3.获取TopicTest主题下所有的队列 Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List<MessageQueue> list = new ArrayList<>(mqSet); List<MessageQueue> assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } //4.消费者需要拉取的队列的集合 litePullConsumer.assign(assignList); //5.消费者需要定位,哪个队列,多少偏移量的消息。 litePullConsumer.seek(assignList.get(0), 10); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commit(); } } finally { litePullConsumer.shutdown(); }
2. 生产者负载均衡策略
我们都知道一个主题下会有多个消息队列(MessageQueue),那么,生产者在发送消息的时候如何选择消息队列呢?
首先找到生产者的示例代码类:org.apache.rocketmq.example.simple.Producer
。在该类中找到发送消息的方法 producer.send(msg)
。
接着找到发送消息的默认实现方法 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
在此方法中可以找到 selectOneMessageQueue 方法,从方法名可以知道此方法就是用来选出一个MessageQueue的
//省略部分代码 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex); //省略部分代码
在selectOneMessageQueue方法中通过调用 tpInfo.selectOneMessageQueue 方法来获取
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) { BrokerFilter brokerFilter = threadBrokerFilter.get(); brokerFilter.setLastBrokerName(lastBrokerName); if (this.sendLatencyFaultEnable) { if (resetIndex) { tpInfo.resetIndex(); } MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter); if (mq != null) { return mq; } mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter); if (mq != null) { return mq; } return tpInfo.selectOneMessageQueue(); } MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter); if (mq != null) { return mq; } return tpInfo.selectOneMessageQueue(); }
那么最终的实现逻辑就是在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue
方法中了。我们可以查看此方法。
private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) { //省略非核心代码 if (filter != null && filter.length != 0) { for (int i = 0; i < messageQueueList.size(); i++) { int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); MessageQueue mq = messageQueueList.get(index); boolean filterResult = true; for (QueueFilter f: filter) { Preconditions.checkNotNull(f); filterResult &= f.filter(mq); } if (filterResult) { return mq; } } return null; }
这里的核心代码就是下面这句代码:
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());
首先通过 sendQueue.incrementAndGet()
方法获取当前线程下index值。然后对该主题下所有的队列数进行求模取余。也就是说RocketMQ默认会采取轮询的方式选择消息队列 接着我们来看下该方法的实现。
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<>(); private final Random random = new Random(); public int incrementAndGet() { Integer index = this.threadLocalIndex.get(); if (null == index) { index = random.nextInt(); } this.threadLocalIndex.set(++index); return index & POSITIVE_MASK; }
首先从线程本地变量 threadLocalIndex 中获取索引值index,如果没有的话则随机取一个值。然后将取到index中进行加一操作放回threadLocalIndex中。