开发者社区> 问答> 正文

请问能拿到单条数据吗,否则往kafka发送只能是单分区,多分区顺序无法保证

请问1.10发送到kafka的话,现在只能是单分区发送,多分区顺序不能保证,我们希望能发送到不同分区,并且按照key进行hash分区,我的做法是把entrys拿出来遍历,再把rowchange拿出来遍历,最后得到一条rowdata,用库名加主键的值拼出key,按照key发送这条数据,就可以保证一条数据总是会到一个分区,但是觉着这样效率很低,请问之后会有能直接拿到单条数据的方法吗,还是现在就有呢? 修改后的代码如下:

public void send(KafkaProperties.Topic topic, Message message) throws IOException {

//获取entrys
List<ByteString> rawEntries = message.getRawEntries();
for (ByteString rawEntry : rawEntries) {
    message.addEntry(CanalEntry.Entry.parseFrom(rawEntry));
}
//遍历entrys
for (CanalEntry.Entry entry : message.getEntries()) {
    String tableName = entry.getHeader().getTableName();
    CanalEntry.RowChange rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    CanalEntry.EventType eventType = rowChage.getEventType();

// 遍历rowchange for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { String key = ""; List<CanalEntry.Column> list = null; if (eventType == CanalEntry.EventType.DELETE) { list = rowData.getBeforeColumnsList(); } else { list = rowData.getAfterColumnsList(); } //重新封装 CanalEntry.RowData.Builder rowDataBuilder = CanalEntry.RowData.newBuilder(); for (CanalEntry.Column column : list) { if (column.getIsKey()) { key = tableName + "_" + column.getValue(); } if (eventType == CanalEntry.EventType.DELETE) { rowDataBuilder.addBeforeColumns(column); } else { rowDataBuilder.addAfterColumns(column); } } CanalEntry.RowChange.Builder rowChangeBuilder = CanalEntry.RowChange.newBuilder(); rowChangeBuilder.addRowDatas(rowDataBuilder); CanalEntry.RowChange rowChangeNew = rowChangeBuilder.build();

        CanalEntry.Entry.Builder entryBuilder = CanalEntry.Entry.newBuilder();
        entryBuilder.setHeader(entry.getHeader())
                .setEntryType(entry.getEntryType())
                .setStoreValue(rowChangeNew.toByteString());
        List<CanalEntry.Entry> listEntry = new ArrayList<>();
        listEntry.add(entryBuilder.build());
        Message messageNew = new Message(message.getId());
        messageNew.setEntries(listEntry);

        //按照rowkey进行hash分区发送
        ProducerRecord<String, Message> record;
        record = new ProducerRecord<String, Message>(topic.getTopic(), key, messageNew);
        producer.send(record);
        if (logger.isDebugEnabled()) {
            logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
        }
    }
}

}

原提问者GitHub用户undeadwing

展开
收起
Java工程师 2023-05-08 19:14:22 92 0
1 条回答
写回答
取消 提交回答
  • 针对数据hash到不同分区,如果高效需要在构建protobuf之前就完成计算,否则就会有解压反序列化的问题

    原回答者GitHub用户agapple

    2023-05-09 19:45:31
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载