开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC里我从一个kafka消费多种不同格式的json数据,接下来怎么办?

Flink CDC里我从一个kafka消费多种不同格式的json数据,生成不同的中间表,然后用flink sql关联宽表,
写入到doris数据库。

这个流程先消费kafka得到DataStream
然后使用outputtag根据不同的json格式标识先分流为多个DataStream
然后对每个outputtag对应的DataStream先转成SingleOutputStreamOperator
然后自己构建schema,再用createTemporaryViewfromDataStream,创建临时表

再根据配置的要生成的不同的宽表的配置,循环
生成insert into select from (临时表) join (catalog存在的维表) 用flink sql执行tEnv.executeSql(insertTable);
往多个大宽表写数据

现在情况是:写入没有问题,但是从checkpoint恢复一直报错,请问这个怎么解决?flink 1.17.2的报错,我刚刚实验了下,我如果改成一个job里面只有一个insert into的语句执行,是可以顺利恢复的
有多个insert into语句执行,就不能恢复了,不过如果恢复的时候只执行第一个insert into语句是没有问题的。
请问这个是flink的bug吗?
现在我是一个job里面多个insert 任务嘛,请问我要插入多个宽表的话,如何在一个job里面一个任务里面完成?就是很普通的那种无法恢复的报错,没啥特殊的看起来。我的insert into (宽表) select from (临时表) join 维表,并且是一个job里面有执行多个insert into语句
for(){
tEnv.executeSql(insertTable);
}代码是这。fd457d464c43ab362017afc9bb97424f.pngdbd8dbecf7b085c2e2c3d2770a3b02ee.png4a693b96de5087f136cee3a5f27db001.png大佬,我刚刚实验了下,我如果改成一个job里面只有一个insert into的语句执行,是可以顺利恢复的
有多个insert into语句执行,就不能恢复了,不过如果恢复的时候只执行第一个insert into语句是没有问题的。
请问这个是flink的bug吗?
现在我是一个job里面多个insert 任务嘛,请问我要插入多个宽表的话,如何在一个job里面一个任务里面完成?搞定了,還是用這個寫法,之前excute不小心写到循环里面了,就出现了多个job,后面放到外面就可以了。or(){
stmtSet.addInsertSql(insertTable);
}
stmtSet.execute();我用這個方式还是一样的报错执行多个insert语句的时候job里面是显示多个任务这样。

展开
收起
小小鹿鹿鹿 2024-04-22 19:59:48 22 0
1 条回答
写回答
取消 提交回答
  • 不是bug 是你多个循环把flink的执行拓扑图搞乱了;导致后续根据uid查找cp找不到嘞。此回答来自钉群Flink CDC 社区。

    2024-04-23 22:55:34
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载