阿里二面:RocketMQ 消费失败了,怎么处理?

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 阿里二面:RocketMQ 消费失败了,怎么处理?

大家好,我是君哥。今天来聊一聊 RocketMQ 客户端消息消费失败,怎么办?

下面是 RocketMQ 推模式的一段代码:

public static void main(String[] args) throws InterruptedException, MQClientException {
 Tracer tracer = initTracer();
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
 consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
 consumer.subscribe("TopicTest", "*");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 consumer.setConsumeTimestamp("20181109221800");
 consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
   try{
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
   }catch (Exception e){
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
   }
   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
 });
 consumer.start();
}

从这段代码可以看出,消费者消费消息后会返回一个消费状态,那消费状态有哪些呢?参见类 ConsumeConcurrentlyStatus 中定义:

  • 消费成功,返回 CONSUME_SUCCESS;
  • 消费失败,返回 RECONSUME_LATER。

下面代码就是返回上面两个状态的逻辑,对于消费状态,如果返回 null,会给它赋值 RECONSUME_LATER,处理逻辑如下:

//ConsumeRequest 类
public void run() {
 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
 //省略部分逻辑
 long beginTimestamp = System.currentTimeMillis();
 ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
 try {
  //省略部分逻辑
  status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {}
    //省略部分逻辑
 if (null == status) {
  //省略日志
  status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
 }
    //省略部分逻辑
 if (!processQueue.isDropped()) {
  ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
 } else {}
}

这部分代码的 UML 类图如下:

微信图片_20221213113805.png

上面代码中的 processConsumeResult 方法就是消费失败后客户端的处理逻辑:

public void processConsumeResult(
 final ConsumeConcurrentlyStatus status,
 final ConsumeConcurrentlyContext context,
 final ConsumeRequest consumeRequest
) {
    //ackIndex 初始值是 Integer.MAX_VALUE;
 int ackIndex = context.getAckIndex();
 switch (status) {
  case CONSUME_SUCCESS:
   if (ackIndex >= consumeRequest.getMsgs().size()) {
    ackIndex = consumeRequest.getMsgs().size() - 1;
   }
   //省略部分逻辑
   break;
  case RECONSUME_LATER:
   ackIndex = -1;
   //省略部分逻辑
   break;
  default:
   break;
 }
 switch (this.defaultMQPushConsumer.getMessageModel()) {
  case BROADCASTING:
   //广播模式下这里只打印日志
   break;
  case CLUSTERING:
   List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
   for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
    MessageExt msg = consumeRequest.getMsgs().get(i);
    boolean result = this.sendMessageBack(msg, context);
    if (!result) {
     msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
     msgBackFailed.add(msg);
    }
   }
   if (!msgBackFailed.isEmpty()) {
    consumeRequest.getMsgs().removeAll(msgBackFailed);
    //发送回 Broker 失败的消息,5s 后再次消费
    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
   }
   break;
  default:
   break;
 }
     //更新本地保存的偏移量
 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
 }
}

1 消费成功

上面的代码逻辑中,如果消费成功,ackIndex 变量的值就是消息数量减 1,所以上面的 switch 逻辑是不会执行的,因为广播模式下,只是打印一段日志(没有其他逻辑),而集群模式下,for 循环的起始 i 变量已经等于消息数量,循环里面的代码不会执行

因此,如果消息消费成功,只会走最下面的逻辑,更新本地保存的消息偏移量。

2 消费失败

ackIndex 变量值等于 -1。

2.1 广播模式

在消费失败的情况下,广播模式的代码只是打印了一段日志,之后更新了本地保存的消息偏移量,因此我们知道广播模式消息消费失败后就不会重新消费了,相当于丢弃了消息

2.2 集群模式

从上面代码的 for 循环中,会把所有的消息都发送回 Broker,这样这批消息还能再次被拉取到进行消费。

对于发送给 Broker 失败的消息,会延迟 5s 后再次消费。代码如下:

private void submitConsumeRequestLater(
 final List<MessageExt> msgs,
 final ProcessQueue processQueue,
 final MessageQueue messageQueue
) {
 this.scheduledExecutorService.schedule(new Runnable() {
  @Override
  public void run() {
   ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
  }
 }, 5000, TimeUnit.MILLISECONDS);
}

更新本地保存的消息偏移量时,会从消息列表中把发送回 Broker 失败的消息先删除掉。

注意:从上面逻辑可以看到,在拉取到一批消息进行消费时,只要有一条消息消费失败,这批消息都会进行重试,因此消费端做好幂等是必要的

下面再看一下发送失败消息给 Broker 的代码,发送消息是,请求的 code 码是 CONSUMER_SEND_MSG_BACK。根据这个请求码就能找 Broker 端的处理逻辑。

如果发送回 Broker 时抛出异常,需要重新发送一个新的消息,这里有四点需要注意:

  • 新消息的 Topic 变成【 %RETRY% + consumerGroup】;
  • 新消息的 RETRY_TOPIC 这个属性赋值为之前的 Topic;
  • 新消息的重试次数属性加 1;
  • 新消息的 DELAY 属性等于重试次数 + 3.
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
 throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
 try {
  this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
   this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
 } catch (Exception e) {
  //Topic 变成 %RETRY% + consumerGroup
  Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
  String originMsgId = MessageAccessor.getOriginMessageId(msg);
  MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
        //RETRY_TOPIC 赋值
  MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
  //重试次数+1
  MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
  //最大重试次数
  MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
  //DELAY = 重试次数 + 3
  newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
  this.mQClientFactory.getDefaultMQProducer().send(newMsg);
 } finally {
  msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
 }
}

2.3 Broker 处理

上面已经讲过,对于处理失败的消息,消费端会发送回 Broker,不过这里有一点需要注意,发送回 Broker 时,消息的 Topic 变成【"%RETRY%" + namespace + "%" + 原始 topic】,封装逻辑在源码 ClientConfig.withNamespace。

根据请求码 CONSUMER_SEND_MSG_BACK 可以定位到 Broker 的处理逻辑在类 SendMessageProcessor,方法 asyncConsumerSendMsgBack。

2.3.1 进死信队列

如果重试次数超过了最大重试次数(默认 16 次),或者 delayLevel 值小于0,则消息进死信队列,死信队列的 Topic 为【"%DLQ%" + 消费组】,代码如下:

//asyncConsumerSendMsgBack 方法
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
 || delayLevel < 0) {
 newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
 queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
   DLQ_NUMS_PER_GROUP,
   PermName.PERM_WRITE | PermName.PERM_READ, 0);
 msgExt.setDelayTimeLevel(0);
}

2.3.2 发送 CommitLog

如果延迟级别(DELAY)等于 0,则延迟级别就等于重试次数加 3。

有个地方需要注意,发送到延迟队列的消息重新进行了封装,封装这个消息用的并不是客户端发来的那个消息,而是从 CommitLog 中根据偏移量查找的,代码如下:

MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
 response.setCode(ResponseCode.SYSTEM_ERROR);
 response.setRemark("look message by offset failed, " + requestHeader.getOffset());
 return CompletableFuture.completedFuture(response);
}

如果查询失败,就会给客户端返回系统错误。

这里有个重要的细节,这个消息写入 CommitLog 时,会判断 DELAY 是否大于 0,如果大于 0,就会修改 Topic。代码如下:

//CommitLog 类 asyncPutMessage 方法
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
  || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 // Delay Delivery
 if (msg.getDelayTimeLevel() > 0) {
  if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
      //从源码看,这里最大值是18
   msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
  }
  topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
  //queueId = delayLevel - 1
  int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
  // Backup real topic, queueId
  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
  msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
  msg.setTopic(topic);
  msg.setQueueId(queueId);
 }
}

这里把 Topic 修改为 SCHEDULE_TOPIC_XXXX,供延时队列来调度。进入延时队列后,延时队列会按照下面的时间进行调度:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

上面代码可以看到,延时消息的调度有 18 个等级,最小的 1s,最大的 2h。而从下面的代码我们可以看到,调度使用第三个等级开始的:

if (0 == delayLevel) {
 delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);

2.3.3 延时队列

延时队列的代码逻辑在类 ScheduleMessageService,这里的 start 方法触发延时队列的调度,而 start 方法的业务入口在 BrokerStartup 的初始化。

首先,会计算出每个延时等级对应的延时时间(处理到 ms 级别),放到 delayLevelTable,它是一个 ConcurrentHashMap,然后创建一个核心线程数等于 18 的定时线程池,依次对每个级别的延时进行调度。这个任务启动后,会每 100ms 执行一次。代码如下:

public void start() {
 if (started.compareAndSet(false, true)) {
  this.load();
  this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
  //省略异步
  for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
   Integer level = entry.getKey();
   Long timeDelay = entry.getValue();
   Long offset = this.offsetTable.get(level);
   if (null == offset) {
    offset = 0L;
   }
   if (timeDelay != null) {
    //省略异步
    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
   }
  }
  //省略其他逻辑
 }
}

调度逻辑中,首先根据 Topic 和 queueId 找到对应的消费队列,然后从里面连续读取消息:

public void executeOnTimeup() {
 ConsumeQueue cq =
  ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
   delayLevel2QueueId(delayLevel));
    //省略空处理
 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
 //省略空处理
 long nextOffset = this.offset;
 try {
  int i = 0;
  ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
  //CQ_STORE_UNIT_SIZE = 20,因为 ConsumeQueue 中一个元素占 20 字节
  for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
      //offset占8个字节
   long offsetPy = bufferCQ.getByteBuffer().getLong();
   //消息大小占4个字节
   int sizePy = bufferCQ.getByteBuffer().getInt();
   //ConsumeQueue中tagsCode是一个投递时间点
   long tagsCode = bufferCQ.getByteBuffer().getLong();
   if (cq.isExtAddr(tagsCode)) {
    if (cq.getExt(tagsCode, cqExtUnit)) {
     tagsCode = cqExtUnit.getTagsCode();
    } else {
     //can't find ext content.So re compute tags code.
     long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
     tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
    }
   }
   long now = System.currentTimeMillis();
   long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
   nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
   long countdown = deliverTimestamp - now;
   if (countdown > 0) {
       //时间未到,等待下次调度
    this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
    return;
   }
   MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
   MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
   //省略事务消息
   boolean deliverSuc;
   //同步异步都有,只保留同步代码
   deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
  }
  nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 } catch (Exception e) {
 } finally {
  bufferCQ.release();
 }
    //DELAY_FOR_A_WHILE是 100ms
 this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

因为 messageTimeup 方法使用了原始的 Topic 和 QueueId 新建了消息,所以上面的 syncDeliver 方式是将消息重新投递到原始的队列中,这样消费者可以再次拉取到这条消息进行消费。注意:上面 ConsumeQueue 的 tagsCode 是一个时间点,很容易误解为是 tag 的 hashCode,MessageQueue 的存储元素中最后 8 字节确实是 tag 的 hashCode。

3 总结

消费者消费失败后,会把消费发回给 Broker 进行处理。下图是客户端处理流程:

微信图片_20221213113840.png

Broker 收到消息后,会把消息重新发送到 CommitLog,发送到 CommitLog 之前,首先会修改 Topic 为 SCHEDULE_TOPIC_XXXX,这样就发送到了延时队列,延时队列再根据延时级别把消息投递到原始的队列,这样消费者就能再次拉取到。流程如下图:

微信图片_20221213113908.png

从流程来看,消费者批量拉取消息,如果部分消息消费失败,那就会整批全部重试。所以做好幂等是必要的。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
物联网
MQTT常见问题之用单片机接入阿里MQTT实例失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
2月前
|
消息中间件 存储 canal
阿里面试:canal+MQ,会有乱序的问题吗?
本文详细探讨了在阿里面试中常见的问题——“canal+MQ,会有乱序的问题吗?”以及如何保证RocketMQ消息有序。文章首先介绍了消息有序的基本概念,包括全局有序和局部有序,并分析了RocketMQ中实现消息有序的方法。接着,针对canal+MQ的场景,讨论了如何通过配置`canal.mq.partitionsNum`和`canal.mq.partitionHash`来保证数据同步的有序性。最后,提供了多个与MQ相关的面试题及解决方案,帮助读者更好地准备面试,提升技术水平。
阿里面试:canal+MQ,会有乱序的问题吗?
|
4月前
|
消息中间件 人工智能 监控
|
消息中间件 中间件 Kafka
限时开源!阿里内部消息中间件合集:MQ+Kafka+体系图+笔记
近好多小伙伴说在准备金三银四的面试突击了,但是遇到消息中间件不知道该怎么学了,问我有没有成体系的消息中间件的学习方式。 额,有点不知所措,于是乎小编就想着做一次消息中间件的专题,归类整理了一些纯手绘知识体系图、面试以及相关的学习笔记。
239 1
|
消息中间件 程序员 Apache
阿里RocketMQ创始人首次分享出这份RocketMQ技术内幕神级架构手册
RocketMQ的发展史? RocketMQ的开源正是源于对这种开源文化的认同,开放是为了更好的协同创新,并将这一技术推向新的高度。在经历了阿里巴巴集团内部多年“双11”交易核心链路工业级场景在验证,2016年11月,团队将RocketMQ捐献给全球享有盛誉的Apache软件基金会正式质为孵化项目。 至此,RocketMQ开启了迈向全球顶级开源软件的新征程。
|
消息中间件 运维 Kubernetes
阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
2020 年双十一交易峰值达到 58.3 W 笔/秒,消息中间件 RocketMQ 继续数年 0 故障丝般顺滑地完美支持了整个集团大促的各类业务平稳。
184 0
阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
|
消息中间件 存储 负载均衡
阿里IM技术分享(九):深度揭密RocketMQ在钉钉IM系统中的应用实践
在钉钉的IM中,我们通过 RocketMQ实现了系统解耦、异步削峰填谷,还通过定时消息实现分布式定时任务等高级特性。同时与 RocketMQ 深入共创,不断优化解决了很多RocketMQ本身的问题,并且孵化出 POP 消费模式等新特性,使 RocketMQ 能够完美支持对性能稳定性和时延要求非常高的 IM 系统。本文将为你分享这些内容。
436 0
阿里IM技术分享(九):深度揭密RocketMQ在钉钉IM系统中的应用实践
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
645 1
|
消息中间件 缓存 Java
rocketmq消费源码
rocketmq消费源码
378 0
rocketmq消费源码