cdc如果想断点续传,应该怎么搞,有大佬有类似的经验吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
要实现Flink CDC的断点续传,需要在Flink CDC的配置文件中设置checkpointingMode
为CheckpointingMode.EXACTLY_ONCE
,并设置maxConcurrentSubtasks
为1。这样,Flink CDC会每隔一段时间将数据写入Hudi表,并在发生故障时进行回滚。
具体步骤如下:
checkpointingMode
为CheckpointingMode.EXACTLY_ONCE
,并设置maxConcurrentSubtasks
为1。例如:flink:
cdc:
source:
sql: SELECT * FROM my_table
table-prefix: my_table
database: my_database
hive-conf:
hive.exec.dynamic.partition.mode=nonstrict
sink:
hudi:
path: hdfs://localhost:9000/user/hadoop/output/my_table
database: my_database
table-name: my_table
write-mode:upsert
max-commit-interval:60000
max-history-size:50000
max-batch-size:10000
max-parallelism:1
properties:
hudi.write.precombine.field.list=id,timestamp,ts_col,ts_unit,ts_type,ts_extractor,ts_format,ts_zone,ts_is_local,ts_is_utc,ts_offset,ts_tod,ts_year,ts_month,ts_day,ts_hour,ts_minute,ts_second,ts_milliseconds,ts_nanos,other_col1,other_col2,other_col3,other_col4,other_col5,other_col6,other_col7,other_col8,other_col9,other_col10,other_col11,other_col12,other_col13,other_col14,other_col15,other_col16,other_col17,other_col18,other_col19,other_col20,other_col21,other_col22,other_col23,other_col24,other_col25,other_col26,other_col27,other_col28,other_col29,other_col30,other_col31,other_col32,other_col33,other_col34,other_col35,other_col36,other_col37,other_col38,other_col39,other_col40,other_col41,other_col42,other_col43,other_col44,other_col45,other_col46,other_col47,other_col48,other_col49,other