重复打印以下日志,这意味着重复使用reviveTopic的消息。
2021-10-29 15:51:12 INFO PopReviveService_0 - reviveQueueId=0,find ack, offset:142765, raw : {"ackOffset":283798,"consumerGroup":"pop-consumer-1","popTime":1635476075611,"queueId":0,"startOffset":283791,"topic":"%RETRY%pop-consumer-1_-topic"}
1、 当popReviveService消耗reviveTopic时,当消息列表没有ck消息时,它无法提交消耗偏移。相关代码是PopReviveService.java中的第365-371行
2、 此外,为什么newOffset是由ck消息决定的IMO,这应该是popReviveService所消耗的真正抵消。相关代码如下。
long newOffset = consumeReviveObj.oldOffset; for (PopCheckPoint popCheckPoint : sortList) { if (!checkAndSetMaster()) { POP_LOGGER.info("slave skip ck process , revive topic={}, reviveQueueId={}", reviveTopic, queueId); break; } if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) { break; }
// check normal topic, skip ck , if normal topic is not exist
String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId());
if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) {
POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic());
newOffset = popCheckPoint.getReviveOffset(); //Is this right?
continue;
}
if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) {
POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
reviveMsgFromCk(popCheckPoint);
newOffset = popCheckPoint.getReviveOffset(); //and this line
}
原提问者GitHub用户cserwen
为了防止消息丢失。恢复主题队列偏移量更新需要传递ck消息。复活主题包含确认消息和确认消息。如果偏移量直接由Revive主题队列消息的数量更新,则可能会丢失消息。例如,一些ack消息没有被ack消息确认,并且在不发送重试主题的情况下,直接更新接收主题的偏移量。
由于网络或其他原因,有可能重复ack消息。ack对应的消息已经被确认,ack会被重复消耗,但没有任何作用。当更新新的确认消息时,这些确认消息将不会被重复消耗。
原回答者GitHub用户odbozhou
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。