开发者社区> 问答> 正文

请问CanalAdapterKafkaWorker的ack是否存在问题?

com.alibaba.otter.canal.client.adapter.loader.CanalAdapterKafkaWorker类以下代码,意思是只有poll到null或者执行时间大于一分钟才会执行ack操作,试了下一直正常消费数据,永远不会执行connector.ack(),这样是不是有问题呢,请问为什么要这样写?

while (running) { try { // switcher.get(); //等待开关开启

                final Message message = connector.getWithoutAck(100L, TimeUnit.MILLISECONDS,timeFlag,startTime);
                timeFlag = false;

                executing.set(true);
                if (message != null) {
                   .......
                    // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
                    long currentTS = System.currentTimeMillis();
                    while (executing.get()) {
                        // 大于1分钟未消费完ack一次keep alive
                        if (System.currentTimeMillis() - currentTS >  60000) {
                            connector.ack();
                            currentTS = System.currentTimeMillis();
                        }
                    }
                } else {
                    connector.ack();
                }
            } catch (CommitFailedException e) {
                logger.warn(e.getMessage());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                TimeUnit.SECONDS.sleep(1L);
            }
        }

原提问者GitHub用户undeadwing

展开
收起
Java工程师 2023-05-08 19:12:37 66 0
1 条回答
写回答
取消 提交回答
  • 确实有bug,在最新pr中会修复。这里的间隔一段时间ack是为了防止消费超时kafka Consumer会切换到另外一台

    原回答者GitHub用户rewerma

    2023-05-09 19:07:18
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载