版本:1.1.4-SNAPSHO 描述:canal投递kafka(库.表 多分区模式) 动态topic 单表数据 按照id分片 投递消息为flatMessage可能会存在 同一个topic下同一分区消息乱序,核心代码如下:
// 发送扁平数据json List flatMessages = MQMessageUtils.messageConverter(message); List records = new ArrayList(); if (flatMessages != null) { for (FlatMessage flatMessage : flatMessages) { if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) { FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash()); int length = partitionFlatMessage.length; for (int i = 0; i < length; i++) { FlatMessage flatMessagePart = partitionFlatMessage[i]; if (flatMessagePart != null) { records.add(new ProducerRecord<String, String>(topicName, i, null, JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue))); } } } else { final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0; records.add(new ProducerRecord<String, String>(topicName, partition, null, JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue))); }
// 每条记录需要flush
produce(topicName, records, true);
records.clear();
}
}
投递kafka端代码:
// 异步发送,因为在partition hash的时候已经按照每个分区合并了消息,走到这一步不需要考虑单个分区内的顺序问题 for (ProducerRecord record : records) { futures.add(producerTmp.send(record)); }
以上两处代码存在一个逻辑漏洞:
假设此处十条flatMessage, 这边逻辑是for循环对 单条flatMessage进行了分区,并非十条flatMessage的分区,最终拼接的发送顺序可能如下:
flatMessage1[0],flatMessage1[1],flatMessage2[0],flatMessage2[1]........ 然后kafka异步发送这个序列(kafka底层是nio发送 并不会阻塞) ,最终到达到kafka服务端 可能会是如下顺序: flatMessage2[0],flatMessage2[1],flatMessage1[0],flatMessage1[1]........ 然后 同一分区 顺序 就乱了------
建议:把这十条 flatMessage 进行统一分区 再异步提交 或者 单条分区完成后 就同步提交一次。
原提问者GitHub用户yangyiweigege
kafka底层会按照分区做batch批量提交,同一个分区batch的合并过程是上层的for循环顺序,不同分区之间顺序无法保证
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。