message[batchId=1,size=259] 15:00:18.080 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Using older server API v2 to send PRODUCE {acks=-1,timeout=30000,partitionSizes=[case_flow_topic-1=3032]} with correlation id 9 to node 1003 . . 15:00:18.623 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Using older server API v2 to send PRODUCE {acks=-1,timeout=30000,partitionSizes=[case_flow_topic-1=4021]} with correlation id 56 to node 1003
com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong with reason: something goes wrong with channel:[id: 0x6c969f24, /192.168.113.5:38172 => /192.168.113.5:11111], exception=com.alibaba.otter.canal.store.CanalStoreException: no match ack positionLogPosition[identity=LogIdentity[sourceAddress=192.168.113.243/192.168.113.243:3306,slaveId=-1],postion=EntryPosition[included=false,journalName=mysql-bin.001543,position=916315835,serverId=1803306,timestamp=1524553202000]]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.receiveMessages(SimpleCanalConnector.java:245) at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:222) at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:183) at com.zichan360.bigdata.canal.client.caseflow.SyncCaseFlowTables.main(SyncCaseFlowTables.java:32)
15:00:24.652 [main] INFO c.a.o.c.c.impl.ClusterCanalConnector - restart the connector for next round retry.
message[batchId=2,size=259] . . .
代码
String destination = args[0]; String topic = args[1]; CanalConnector connector = CanalConnectors.newClusterConnector("zk", destination, "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize,2000L, TimeUnit.MILLISECONDS); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); produceKafkaData(message.getEntries(), topic); try { Thread.sleep(1000); } catch (InterruptedException e) { } } try { connector.ack(batchId); } catch (Exception e) { e.printStackTrace(); connector.rollback(batchId); }
}
} finally {
connector.disconnect();
}
原提问者GitHub用户keysunHong
no match ack 估计是服务端重启了instance,client做一下重试恢复即可
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。