Clickhouse可以组合几种引擎达到幂等效果,ReplacingMergeTree + 物化视图 + AggregatingMergeTree(AggregateFunction:argMaxIf)
CREATETABLEIFNOTEXISTSdefault.local_dat_update ( pk1 UInt64 COMMENT '主键', pk2 String COMMENT '主键', ver DateTime64 COMMENT '版本,业务提供', col1 Int32 DEFAULT0 COMMENT '选取!=0的最大版本', col2 String DEFAULT'' COMMENT '选取!=空字符串的最大版本', col3 DateTimeDEFAULT0 COMMENT '选取!=0的最大版本' ) ENGINE = ReplacingMergeTree() PARTITIONBY (xxHash64(pk1, pk2) % 10) ORDER BY (pk1,pk2);
CREATETABLEIFNOTEXISTSdefault.local_agg_update ( pk1 UInt64, pk2 String, version AggregateFunction(max, DateTime64), col1 AggregateFunction(argMaxIf, Int32, DateTime64, UInt8), col2 AggregateFunction(argMaxIf, String, DateTime64, UInt8), col3 AggregateFunction(argMaxIf, DateTime, DateTime64, UInt8) ) ENGINE = AggregatingMergeTree() PARTITIONBY (xxHash64(pk1, pk2) % 10) ORDER BY (pk1, pk2);
CREATE MATERIALIZED VIEW IFNOTEXISTSdefault.mv_update TOdefault.local_agg_update AS SELECT pk1 AS pk1, pk2 AS pk2, maxState(ver) ASversion, -- 根据业务需求调整组合函数及判断逻辑 argMaxIfState(col1, ver, col1 !=0) AS col1, argMaxIfState(col2, ver, col2 !='') AS col2, argMaxIfState(col3, ver, col3 !=0) AS col3 FROMdefault.local_dat_update GROUP BY pk1, pk2;*来自志愿者整理的flink邮件归档
clickhouse不支持事务及幂等写入,无法保证end to end 精准一次。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。