简述RocketMQ消息拉取过程【二】

简介: 简述RocketMQ消息拉取过程【二】

前言

在上一篇文章中,我们讲述了DefaultMQPushConsumer拉消息的原理,它是通过重平衡触发pullRequest的创建,通过阻塞队列作为pullRequest的存储容器,另一端通过定时任务从阻塞队列中取出pullRequest来向Broker发送拉消息的请求,无论消息拉取成功还是失败,都会重新把pullRequest放回阻塞队列中,这样就能保证持续不断地向Broker拉消息了;今天这篇文章我们继续讲述DefaultLitePullConsumer是如何实现消息拉取的;

DefaultLitePullConsumer拉消息代码示例

我们在使用DefaultLitePullConsumer时都是主动去poll消息,并不是像DefaultMQPushConsumer那样设置一个消息监听器:

DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group);
consumer.setNamesrvAddr(nameSrv);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, subExpression);
try {
    consumer.start();
} catch (Exception e) {
    e.printStackTrace();
}
try {
    while (true) {
        List<MessageExt> messageExts = consumer.poll(5000);
        // 处理业务逻辑
        System.out.println("消息数量:" + messageExts.size());
        System.out.println("消息内容:");
        for (MessageExt messageExt : messageExts) {
            System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8));
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}
复制代码

一般拿到消息后都会交给业务线程池去处理,上述代码我只简单地打印了一下消息内容;

消息消费

跟着poll()方法,我们最终定位到DefaultLitePullConsumerImpl.poll()这个方法:

public synchronized List<MessageExt> poll(long timeout) {
        try {
            this.checkServiceState();
            if (timeout < 0L) {
                throw new IllegalArgumentException("Timeout must not be negative");
            }
            if (this.defaultLitePullConsumer.isAutoCommit()) {
                this.maybeAutoCommit();
            }
            long endTime = System.currentTimeMillis() + timeout;
            // 从阻塞队列中取ConsumeRequest
            DefaultLitePullConsumerImpl.ConsumeRequest consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            if (endTime - System.currentTimeMillis() > 0L) {
                while(consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() <= 0L) {
                        break;
                    }
                }
            }
            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
              // 取到消息后直接更新消费点位
                this.assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                this.resetTopic(messages);
                // 下面是调用consumeMessageHook
                if (!this.consumeMessageHookList.isEmpty()) {
                    ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setNamespace(this.defaultLitePullConsumer.getNamespace());
                    consumeMessageContext.setConsumerGroup(this.groupName());
                    consumeMessageContext.setMq(consumeRequest.getMessageQueue());
                    consumeMessageContext.setMsgList(messages);
                    consumeMessageContext.setSuccess(false);
                    this.executeHookBefore(consumeMessageContext);
                  // 默认是消费成功
                    consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
                    consumeMessageContext.setSuccess(true);
                    this.executeHookAfter(consumeMessageContext);
                }
                return messages;
            }
        } catch (InterruptedException var10) {
        }
        return Collections.emptyList();
    }
复制代码

1.直接从阻塞队列consumeRequestCache中取出消息对象ConsumeRequest,这里面就包含了消息内容;

2.取出来后直接更新消费点位,默认为此次消息消费成功;

这里跟DefaultMQPushConsumer不同的是,DefaultLitePullConsumerImpl.poll()默认的是消息消费一定成功,如果消费失败的话,需要开发人员自己处理,消费失败的消息不会再次发送给消费者;

那么咱们的疑问就出来了,poll()方法光顾着从consumeRequestCache中取消息,那消息是啥时候放进去的呢?

消息拉取入口

我们可以重新了解一下消费者重平衡过程,在MessageQueue分配完毕后,会对比被分配的MessageQueue是否和分配前的不一致,大部分情况下是会发生改变的,那么就会触发messageQueueChanged()方法的调用:

@Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        // 取出所有的MessageQueueListener
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                // 依次调用messageQueueChanged方法
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }
复制代码

MessageQueueListener是什么时候被放进去的呢?可以看一下subscribe()方法:

public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            if (topic == null || "".equals(topic)) {
                throw new IllegalArgumentException("Topic can not be null or empty.");
            }
            setSubscriptionType(SubscriptionType.SUBSCRIBE);
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            // 每个subscribe()都会设置MessageQueueListener
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
            assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
            if (serviceState == ServiceState.RUNNING) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                updateTopicSubscribeInfoWhenSubscriptionChanged();
            }
        } catch (Exception e) {
            throw new MQClientException("subscribe exception", e);
        }
    }
复制代码

可以发现,每个subscribe()都会设置MessageQueueListenerMessageQueueListenerImpl里面只干了一件事情:更新MessageQueue并且创建pullTask

class MessageQueueListenerImpl implements MessageQueueListener {
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided);
        }
    }
    public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
        switch (messageModel) {
            case BROADCASTING:
                updateAssignedMessageQueue(topic, mqAll);
                // 更新拉消息任务
                updatePullTask(topic, mqAll);
                break;
            case CLUSTERING:
                updateAssignedMessageQueue(topic, mqDivided);
                // 更新拉消息任务
                updatePullTask(topic, mqDivided);
                break;
            default:
                break;
        }
    }
复制代码

现在终于快找到这个消息拉取的入口了:

private void startPullTask(Collection<MessageQueue> mqSet) {
        for (MessageQueue messageQueue : mqSet) {
            if (!this.taskTable.containsKey(messageQueue)) {
                // 创建消息拉取任务
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                this.taskTable.put(messageQueue, pullTask);
                // 这个就是任务执行的入口
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }
复制代码

消息拉取的入口寻找起来还是有点困难的,但是主要思路还是从【重平衡】开始,另外就是触发了MessageQueueListener,此时才会创建pullTask

虽然DefaultMQPushConsumer也是【重平衡】触发pullRequest的创建,但是它是将pullRequest放进阻塞队列,另一端由消息拉取任务去取pullRequestBroker发送请求;而DefaultLitePullConsumer是直接创建pullTask去拉消息;

PullTaskImpl拉消息

很显然,PullTaskImpl就是一个Runnable,那么最重要的就是它的run()方法,这个方法就是负责从Broker拉消息并放进consumeRequestCache阻塞队列中,这样poll()方法才能从consumeRequestCache阻塞队列中取到消息;

  • messageQueue暂停
if (DefaultLitePullConsumerImpl.this.assignedMessageQueue.isPaused(this.messageQueue)) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 1000L, TimeUnit.MILLISECONDS);
    DefaultLitePullConsumerImpl.this.log.debug("Message Queue: {} has been paused!", this.messageQueue);
                    return;
}
复制代码

如果messageQueue处于暂停状态,那么延迟1秒重新执行这个任务;

  • ProcessQueue被移除
ProcessQueue processQueue = DefaultLitePullConsumerImpl.this.assignedMessageQueue.getProcessQueue(this.messageQueue);
if (null == processQueue || processQueue.isDropped()) {
    DefaultLitePullConsumerImpl.this.log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
    return;
}
复制代码

如果processQueue不存在或者已经被移除了,那么这个任务也不用执行了;

  • 流量控制
if ((long)DefaultLitePullConsumerImpl.this.consumeRequestCache.size() * (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize() > DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForAll()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", DefaultLitePullConsumerImpl.this.consumeRequestCache.size(), DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes);
    }
    return;
}
复制代码

如果consumeRequestCache中的消息数量超过了PullThresholdForAll阈值,那么触发限流机制,当前任务将不会继续拉消息,并且50毫秒后才会重新执行该任务;

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L;
// 单个processQueue上面消息数量限制
if (cachedMessageCount > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
    }
    return;
}
// 单个processQueue中消息总大小限制
if (cachedMessageSizeInMiB > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
    }
    return;
}
复制代码
  1. 如果当前processQueue中消息的数量大于PullThresholdForQueue阈值,也同样触发限流机制,当前任务不再执行,50毫秒后重新执行该任务;
  2. 如果当前processQueue中消息的总大小超过PullThresholdSizeForQueue(单位:MB)阈值,将触发限流机制,当前任务不再执行,50毫秒后重新执行该任务;
if (processQueue.getMaxSpan() > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumeMaxSpan()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
    if (DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
        DefaultLitePullConsumerImpl.this.log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes});
    }
    return;
}
复制代码

如果processQueue中的maxSpan大于消费者的ConsumeMaxSpan,也就是第一个消息与最后一个消息的点位偏差大于ConsumeMaxSpan(默认是2000),将触发限流机制,当前任务不执行,50毫秒后重新执行该任务;

  • 计算拉取点位
long offset = 0L;
try {
    offset = DefaultLitePullConsumerImpl.this.nextPullOffset(this.messageQueue);
} catch (Exception var17) {
    DefaultLitePullConsumerImpl.this.log.error("Failed to get next pull offset", var17);
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 3000L, TimeUnit.MILLISECONDS);
    return;
}
复制代码

计算消息拉取的点位,如果产生异常,那么简隔3秒后再来重新开始任务;

  • 拉消息
PullResult pullResult = DefaultLitePullConsumerImpl.this.pull(this.messageQueue, subscriptionData, offset, DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize());
复制代码

这个就是发请求给Broker拉消息;

  • 放进消息缓存区
switch(pullResult.getPullStatus()) {
    case FOUND:
        Object objLock = DefaultLitePullConsumerImpl.this.messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized(objLock) {
            if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && DefaultLitePullConsumerImpl.this.assignedMessageQueue.getSeekOffset(this.messageQueue) == -1L) {
                processQueue.putMessage(pullResult.getMsgFoundList());
                  DefaultLitePullConsumerImpl.this.submitConsumeRequest(DefaultLitePullConsumerImpl.this.new ConsumeRequest(pullResult.getMsgFoundList(), this.messageQueue, processQueue));
     }
     break;
}
复制代码

找到消息的情况下,将调用submitConsumeRequest()方法把消息放进阻塞队列中,等待poll()方法来消费;

  • 重新开启拉取任务
if (!this.isCancelled()) {
    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
    DefaultLitePullConsumerImpl.this.log.warn("The Pull Task is cancelled after doPullTask, {}", this.messageQueue);
}
复制代码

如果当前任务还没有被取消的话,那么重新开启下一个轮回,准备下一次消息拉取;


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
794 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
706 0
|
消息中间件 Java RocketMQ
【消息中间件】默认RocketMQ消息发送者是如何启动的?
上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。
|
消息中间件 负载均衡 算法
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(上)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 存储 负载均衡
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(下)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67803 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2784 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
726 1
5张图带你理解 RocketMQ 顺序消息实现机制
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
271 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
435 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息