com.alibaba.otter.canal.client.adapter.loader.CanalAdapterKafkaWorker类以下代码,意思是只有poll到null或者执行时间大于一分钟才会执行ack操作,试了下一直正常消费数据,永远不会执行connector.ack(),这样是不是有问题呢,请问为什么要这样写?
while (running) { try { // switcher.get(); //等待开关开启
final Message message = connector.getWithoutAck(100L, TimeUnit.MILLISECONDS,timeFlag,startTime);
timeFlag = false;
executing.set(true);
if (message != null) {
.......
// 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
long currentTS = System.currentTimeMillis();
while (executing.get()) {
// 大于1分钟未消费完ack一次keep alive
if (System.currentTimeMillis() - currentTS > 60000) {
connector.ack();
currentTS = System.currentTimeMillis();
}
}
} else {
connector.ack();
}
} catch (CommitFailedException e) {
logger.warn(e.getMessage());
} catch (Exception e) {
logger.error(e.getMessage(), e);
TimeUnit.SECONDS.sleep(1L);
}
}
原提问者GitHub用户undeadwing
确实有bug,在最新pr中会修复。这里的间隔一段时间ack是为了防止消费超时kafka Consumer会切换到另外一台
原回答者GitHub用户rewerma
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。