此处只从消费线程池内消费进程讲起
ConsumeMessageConcurrentlyService 并发消息消费主类,通过submitConsumeRequest将消息提交至消费线程池consumeExecutor ,如下为ConsumeRequest.run()方法
class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue; @Override public void run() { /**省略**/ //监听器消费消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); /**省略**/ if (!processQueue.isDropped()) { //根据消息消费结果更新本地OffsetStore ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } } }
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: /** 省略**/ default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: /** 省略**/ break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } //删除ProcessQueue 消息,并且获取队列头部偏移量,集群模式下OffsetStore 为RemoteBrokerOffsetStore long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
更新RemoteBrokerOffsetStore 内 对应MessageQueue 偏移量
定时循环线程池会同步offStore 消费位点至borker
MQClientInstance.java private void startScheduledTask() { /**省略**/ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //上报消费位点 同步消费进度 MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); }


