请问1.10发送到kafka的话,现在只能是单分区发送,多分区顺序不能保证,我们希望能发送到不同分区,并且按照key进行hash分区,我的做法是把entrys拿出来遍历,再把rowchange拿出来遍历,最后得到一条rowdata,用库名加主键的值拼出key,按照key发送这条数据,就可以保证一条数据总是会到一个分区,但是觉着这样效率很低,请问之后会有能直接拿到单条数据的方法吗,还是现在就有呢? 修改后的代码如下:
public void send(KafkaProperties.Topic topic, Message message) throws IOException {
//获取entrys
List<ByteString> rawEntries = message.getRawEntries();
for (ByteString rawEntry : rawEntries) {
message.addEntry(CanalEntry.Entry.parseFrom(rawEntry));
}
//遍历entrys
for (CanalEntry.Entry entry : message.getEntries()) {
String tableName = entry.getHeader().getTableName();
CanalEntry.RowChange rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChage.getEventType();
// 遍历rowchange for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { String key = ""; List<CanalEntry.Column> list = null; if (eventType == CanalEntry.EventType.DELETE) { list = rowData.getBeforeColumnsList(); } else { list = rowData.getAfterColumnsList(); } //重新封装 CanalEntry.RowData.Builder rowDataBuilder = CanalEntry.RowData.newBuilder(); for (CanalEntry.Column column : list) { if (column.getIsKey()) { key = tableName + "_" + column.getValue(); } if (eventType == CanalEntry.EventType.DELETE) { rowDataBuilder.addBeforeColumns(column); } else { rowDataBuilder.addAfterColumns(column); } } CanalEntry.RowChange.Builder rowChangeBuilder = CanalEntry.RowChange.newBuilder(); rowChangeBuilder.addRowDatas(rowDataBuilder); CanalEntry.RowChange rowChangeNew = rowChangeBuilder.build();
CanalEntry.Entry.Builder entryBuilder = CanalEntry.Entry.newBuilder();
entryBuilder.setHeader(entry.getHeader())
.setEntryType(entry.getEntryType())
.setStoreValue(rowChangeNew.toByteString());
List<CanalEntry.Entry> listEntry = new ArrayList<>();
listEntry.add(entryBuilder.build());
Message messageNew = new Message(message.getId());
messageNew.setEntries(listEntry);
//按照rowkey进行hash分区发送
ProducerRecord<String, Message> record;
record = new ProducerRecord<String, Message>(topic.getTopic(), key, messageNew);
producer.send(record);
if (logger.isDebugEnabled()) {
logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
}
}
}
}
原提问者GitHub用户undeadwing
针对数据hash到不同分区,如果高效需要在构建protobuf之前就完成计算,否则就会有解压反序列化的问题
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。