开发者社区> 问答> 正文

canal 报错:no match ack

message[batchId=1,size=259] 15:00:18.080 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Using older server API v2 to send PRODUCE {acks=-1,timeout=30000,partitionSizes=[case_flow_topic-1=3032]} with correlation id 9 to node 1003 . . 15:00:18.623 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Using older server API v2 to send PRODUCE {acks=-1,timeout=30000,partitionSizes=[case_flow_topic-1=4021]} with correlation id 56 to node 1003

com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong with reason: something goes wrong with channel:[id: 0x6c969f24, /192.168.113.5:38172 => /192.168.113.5:11111], exception=com.alibaba.otter.canal.store.CanalStoreException: no match ack positionLogPosition[identity=LogIdentity[sourceAddress=192.168.113.243/192.168.113.243:3306,slaveId=-1],postion=EntryPosition[included=false,journalName=mysql-bin.001543,position=916315835,serverId=1803306,timestamp=1524553202000]]

at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.receiveMessages(SimpleCanalConnector.java:245) at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:222) at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:183) at com.zichan360.bigdata.canal.client.caseflow.SyncCaseFlowTables.main(SyncCaseFlowTables.java:32)

15:00:24.652 [main] INFO c.a.o.c.c.impl.ClusterCanalConnector - restart the connector for next round retry.

message[batchId=2,size=259] . . .

代码

String destination = args[0]; String topic = args[1]; CanalConnector connector = CanalConnectors.newClusterConnector("zk", destination, "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize,2000L, TimeUnit.MILLISECONDS); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); produceKafkaData(message.getEntries(), topic); try { Thread.sleep(1000); } catch (InterruptedException e) { } } try { connector.ack(batchId); } catch (Exception e) { e.printStackTrace(); connector.rollback(batchId); }

    }
} finally {
    connector.disconnect();
}

原提问者GitHub用户keysunHong

展开
收起
绿子直子 2023-05-09 10:34:32 78 0
1 条回答
写回答
取消 提交回答
  • no match ack 估计是服务端重启了instance,client做一下重试恢复即可

    原回答者GitHub用户agapple

    2023-05-10 09:58:18
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
ACK 云原生弹性方案—云原生时代的加速器 立即下载
ACK集群类型选择最佳实践 立即下载
企业运维之云原生和Kubernetes 实战 立即下载