一、前言
在前两篇我们介绍了
这一篇我们来说一下消息消费是如何保证高可用的。
要想知道消息消费的高可用,那我们得知道消息是怎么消费的吧。我们知道 Consumer 集群中每个 Consumer 都有消费组,那一个消费组的多个消费者是如何对消息队列(一个主题对应多个消息队列),那消费者是如何做的负载均衡呢?一个消费者又是如何并发消费消息队列的消息呢(一个消费者可以对应多个消息队列,而一个消息队列只能被一个消费者消费)?这就是消费端消息负载均衡与重新分布机制,这个留给你自己去思考。
本文重点讲消息消费高可用的机制。我们的业务场景,确实无法避免消息消费失败的情况下,比如网络异常、业务逻辑本身异常、还有的自身业务没异常而调用的第三方异常等。但无论啥情况,作为一款消息中间件,你得保证我消息消费的时候消息不会丢失吧,要是重要的业务,数据丢了,那没人敢用这款产品了,所以消息消费高可用也异常的重要。即使消息一直消费失败,也不能丢失数据。那 RocketMQ 是如何保证消息消费不丢数据的呢?答案就是消费重试机制
和消息 ACK 机制
来保证。
二、消息重试机制
1、消息消费过程
PullMessageService 负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存储 ProcessQueue 消息队列处理队列中,然后调用 ConsumeMessageService#submitConsumeRequest 方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。ConsumeMessageService 支持顺序消息和并发(无序)消息,核心类图如下:
我们这里暂时先分享
ConsumeMessageConcurrentlyService 无序消息。
不难发现代码入口
ConsumeMessageConcurrentlyService 的内部类 ConsumeRequest#run 方法。
处理消费结果继续跟进去:
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
从源码中可以看出,
如果返回状态 status 是 CONSUME_SUCCESS,此时ackIndex = consumeRequest.getMsgs().size() - 1;
再看下面消息模式是 CLUSTERING 的场景,for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)
,从这里可以看出如果消息成功,则无需发送 sendMsgBack 给 broker。
如果返回状态 status 是 RECONSUME_LATER,此时ackIndex = -1
,则这批消息都会发送给 Broker,也就是这一批消息都得重新消费。如果发送 ack 失败,则会延迟 5s 后重新在消费端重新消费。
if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); }
消费者向 Broker 发送 ACK 消息,如果发送成功,重试机制由 broker 处理,如果发送 ack 消息失败,则将该任务直接在消费者这边,再次在本地处理该批消息,默认延长 5s 后在消费者重新消费,其关键总结如下:
a. 根据消费返回的状态,得到 ackIndex 的值。
b. 如果消费成功,则无需发送 sendMsgBack 给 broker;如果消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack。
c. 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }
然后我们重点跟踪 sendMessageBack 方法:
DefaultMQPushConsumerImpl#sendMessageBack
其核心实现要点如下:
a. 首先根据 brokerName 得到 broker 地址信息,然后通过网络发送到指定的 Broker上。
b. 如果上述过程失败,则创建一条新的消息重新发送给 Broker,此时新消息的主题为重试主题:"%RETRY%" + ConsumeGroupName
,注意,这里的主题和原先的消息主题没任何关系而是和消费组相关。
我们再来看下 Broker 端怎么处理消费端发送过来的消息,代码在:
SendMessageProcessor#processRequest
Broker 端会处理 CONSUMER_SEND_MSG_BACK 命令。
三、Broker 端处理 CONSUMER_SEND_MSG_BACK 命令
1、获取消费组的订阅信息
代码入口:
SendMessageProcessor#consumerSendMsgBack
其核心实现要点如下:
a. groupName
消费组名称,RocketMQ 消息消费重试不是以主题,而是以消费组。
b. retryQueueNums
重试队列的数量,读队列,写队列个数(主题)。
c. retryMaxTimes
允许最大的重复次数。
2、根据重试主题创建
SendMessageProcessor#consumerSendMsgBack
TopicConfigManager#createTopicInSendMessageBackMethod
如果创建主题配置信息错误,会抛出系统异常,产生的效果是消费端发送 ACK 消息错误,会创建一条新的消息,消息内部 ID 为原消息 ID,然后重新发送给 Broker。
3、根据消息偏移量尝试从 commitlog 日志文件中获取消息内容
SendMessageProcessor#consumerSendMsgBack
4、延迟级别、消费次数处理
SendMessageProcessor#consumerSendMsgBack
如果消息次数或延迟级别小于 0,设置消息的主题为%DLQ% + 消费组名称
,如果消息的延迟级别为 0,则 3 + 消息重试的次数。
5、重新发送该消息到 commitlog 中
如果消息发送成功,则返回成功,否则返回错误,消费端会将这些消息直接在消费端延迟 5s 后重新消费。
现在成功将消息发送到 commitlog 中,主题为RETRY_TOPIC + 消费组名称
,也就是消息重试的消息主题是基于消费组。不是每一个主题都有一个重试主题。而是每一个消费组有一个重试主题。那这些主题的消息,又是如何在被消费者获取并进行消费的。
然后进行消费进度更新:
ConsumeMessageConcurrentlyService#processConsumeResult
消息现在是存储到 commitlog 文件里了,那怎么消费呢?
四、延迟消息机制
延时消息是消息发送到 Broker 后,并不立即被消费者消费而是要等到特定的时间后才能被消费, RocketMQ 并不支持任意的时间精度,如果要支持任意时间精度定时调度,不可避免地需要在 Broker 层做消息排序,再加上持久化方面的考量,将不可避免的带来巨大的性能消耗,所以 RocketMQ 只支持特 定级别的延迟消息。消息延迟级别在 Broker 端通过 messageDelayLevel 配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
,delayLevel=1 表示延迟消息 1s,delayLevel=2 表示延迟5s,依次类推。
既然 delayLevel 表示延迟消息的级别,我们全局搜索一下 delayLevel,发现 CommitLog 类的 putMessage 中竟然也出现了 delayLevel 相关的处理,我们重点来看下该代码:org.apache.rocketmq.store.CommitLog#putMessage
在消息存入 commitlog 之前,如果发现延迟 level 大于 0,会将消息的主题设置为SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"
,然后备份原主题名称。那就很清晰了,延迟消息统一由 ScheduleMessageService 来处理。
RocketMQ 延时消息实现类为 ScheduleMessageService,该类在 DefaultMessageStore 中创建。通过在 DefaultMessageStore 中调用 load 方法加载该类并调用 start 方法启动。
1、ScheduleMessageService#load
// 加载延迟消息消费进度的加载与delayLevelTable的构造。延迟消息的进度默认存储路径为/store/config/delayOffset.json public boolean load() { boolean result = super.load(); result = result && this.parseDelayLevel(); return result; }
2、ScheduleMessageService#start
// 遍历延迟队列创建定时任务,遍历延迟级别,根据延迟级别level从offsetTable中获取消费队列的消费进度。如果不存在,则使用0 public void start() { if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 每隔10s持久化一次延迟队列的消息消费进度 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
3、调度机制
ScheduleMessageService 的 start 方法启动后,会为每一个延迟级别创建一个调度任务,每一个延迟级别对应 SCHEDULE_TOPIC_XXXX 主题下的一个消息消费队列。定时调度任务的实现类为 DeliverDelayedMessageTimerTask,核心实现方法为 executeOnTimeup。
ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
到这里,我们终于搞清楚了消息重试与流转,但还是没有找到RETRY+消费组
(队列的订阅信息)。
那消费者是如何订阅RETRY+消费组名称
的消费队列的呢?
原来在消费者启动时,就默认会订阅该消费组的重试主题的队列。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
这样跟着老周下来,相信你对消息消费高可用有了一定的了解了。
五、总结
我们最后还是要来回答前言的问题,那 RocketMQ 是如何保证消息消费不丢数据的呢?也就是如何保证消息消费高可用?答案就是消费重试机制
和消息 ACK 机制
来保证。
1、我们从消息消费过程入手
如果返回状态 status 是 CONSUME_SUCCESS,此时ackIndex = consumeRequest.getMsgs().size() - 1;
再看下面消息模式是 CLUSTERING 的场景,for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++)
,从这里可以看出如果消息成功,则无需发送 sendMsgBack 给 broker。
如果返回状态 status 是 RECONSUME_LATER,此时ackIndex = -1
,则这批消息都会发送给 Broker,也就是这一批消息都得重新消费。如果发送 ack 失败,则会延迟 5s 后重新在消费端重新消费。
消费者向 Broker 发送 ACK 消息,如果发送成功,重试机制由 broker 处理,如果发送 ack 消息失败,则将该任务直接在消费者这边,再次在本地处理该批消息,默认延长 5s 后在消费者重新消费。
a. 根据消费返回的状态,得到 ackIndex 的值。
b. 如果消费成功,则无需发送 sendMsgBack 给 broker;如果消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack。
c. 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)
2、Broker 端处理 CONSUMER_SEND_MSG_BACK 命令
a. 获取消费组的订阅信息
b. 根据重试主题创建
c. 根据消息偏移量尝试从 commitlog 日志文件中获取消息内容
d. 延迟级别、消费次数处理
e. 重新发送该消息到 commitlog 中,主题为 RETRY_TOPIC + 消费组名称,也就是消息重试的消息主题是基于消费组。
3、延迟消息机制
需要延迟执行的消息,在存入 commitlog 之前,如果发现延迟 level 大于 0,会将消息的主题设置为SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX"
,然后备份原主题名称(retry + 消费组名称)、与消费队列 ID,最后会被延迟任务 ScheduleMessageService 延迟拉取。
ScheduleMessageService 在执行过程中,会再次存入 commitlog 文件中放入之前,会清空延迟等级,并恢复主题与队列。这样,就能被消费者所消费,因为消费者在启动时就订阅了该消费组的重试主题。
重试消费过程中的间隔时间使用了延时消息,重试的消息数据并非直接写入重试队列,而是先写入延时消息队列,再通过定时消息的功能转发到重试队列。
广播模式的消费进度保存在客户端本地,集群模式的消费进度保存在 Broker 上。集群模式中 RocketMQ 采用 ACK 机制确保消息一定被消费。在消息投递过程中,不是消息从 Broker 发送到 Consumer 就算消费成功了,需要 Consumer 明确给 Broker 返回消费成功状态才算。如果从 Broker 发送到 Consumer 后,已经完成了业务处理,但在给 Broker 返回消费成功状态之前,Consumer 发生宕机或者断电、断网等情况,Broker 未收到返回,则不会保存消费进度。Consumer 重启之后,消息会重新投递,此时也会出现重复消费的场景,这时消息的幂等性需要业务自行保证。
好了,消息消费高可用机制就说到这了,相信你心里有了自己的答案与思考。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。
喜欢的话,点赞、再看、分享三连。