Flink CDC里我从一个kafka消费多种不同格式的json数据,生成不同的中间表,然后用flink sql关联宽表,
写入到doris数据库。
这个流程先消费kafka得到DataStream
然后使用outputtag根据不同的json格式标识先分流为多个DataStream
然后对每个outputtag对应的DataStream先转成SingleOutputStreamOperator
然后自己构建schema,再用createTemporaryViewfromDataStream,创建临时表
再根据配置的要生成的不同的宽表的配置,循环
生成insert into select from (临时表) join (catalog存在的维表) 用flink sql执行tEnv.executeSql(insertTable);
往多个大宽表写数据
现在情况是:写入没有问题,但是从checkpoint恢复一直报错,请问这个怎么解决?flink 1.17.2的报错,我刚刚实验了下,我如果改成一个job里面只有一个insert into的语句执行,是可以顺利恢复的
有多个insert into语句执行,就不能恢复了,不过如果恢复的时候只执行第一个insert into语句是没有问题的。
请问这个是flink的bug吗?
现在我是一个job里面多个insert 任务嘛,请问我要插入多个宽表的话,如何在一个job里面一个任务里面完成?就是很普通的那种无法恢复的报错,没啥特殊的看起来。我的insert into (宽表) select from (临时表) join 维表,并且是一个job里面有执行多个insert into语句
for(){
tEnv.executeSql(insertTable);
}代码是这。大佬,我刚刚实验了下,我如果改成一个job里面只有一个insert into的语句执行,是可以顺利恢复的
有多个insert into语句执行,就不能恢复了,不过如果恢复的时候只执行第一个insert into语句是没有问题的。
请问这个是flink的bug吗?
现在我是一个job里面多个insert 任务嘛,请问我要插入多个宽表的话,如何在一个job里面一个任务里面完成?搞定了,還是用這個寫法,之前excute不小心写到循环里面了,就出现了多个job,后面放到外面就可以了。or(){
stmtSet.addInsertSql(insertTable);
}
stmtSet.execute();我用這個方式还是一样的报错执行多个insert语句的时候job里面是显示多个任务这样。
不是bug 是你多个循环把flink的执行拓扑图搞乱了;导致后续根据uid查找cp找不到嘞。此回答来自钉群Flink CDC 社区。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。