您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
clickhouse建表语句如下: CREATE TABLE test_local.tzling_tb3( uuid String, product String, platform String, batchId String, id String, account String, customerId String, reportName String, dt String, campaign String, adGroup String, generalField String, currency String, impressions String, cost String, clicks String, conversions String, createDateTime String, createTime BIGINT, key String, pdate String )engine = MergeTree PARTITION BY pdate order by createTime;
将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
processData.addSink(new MSKUpsertClickHouseSink()); 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为: 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?*来自志愿者整理的flink邮件归档
Hi!
你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData 是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。