Flink同时使用sql api执行入库操作 然后还有另一条链路用 stream api进行数据湖入库操作 为什么最终生成的图 只有sql的链路 datastream api的算子不生成啊?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用Flink同时通过SQL API和DataStream API执行不同的入库操作时,如果最终生成的执行图中只显示了SQL链路而没有DataStream API的算子,可能是由于以下原因导致的:
env.execute()
方法中正确触发了作业提交。StreamExecutionEnvironment
,可能会导致DataStream API的链路未被合并到最终的执行图中。解决方法:
StreamExecutionEnvironment
实例。示例代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// SQL API 链路
tableEnv.executeSql("INSERT INTO ...");
// DataStream API 链路
DataStream<String> stream = env.fromElements("data");
stream.addSink(new YourCustomSink());
// 提交作业
env.execute("Unified Job");
table.exec.resource.default-parallelism
等参数来避免优化器过度裁剪。createTemporaryView
)并参与SQL查询。如果未正确注册,Flink可能无法识别该链路。StreamTableEnvironment
将DataStream注册为表:
tableEnv.createTemporaryView("DataStreamTable", dataStream);
tableEnv.executeSql("INSERT INTO TargetTable SELECT * FROM DataStreamTable");
为了确保SQL API和DataStream API的链路都能正确生成并执行,请按照以下步骤排查: 1. 确保两条链路使用同一个StreamExecutionEnvironment
。 2. 检查DataStream链路是否正确注册到StreamTableEnvironment
。 3. 确认DataStream链路有明确的输出目标。 4. 启用DEBUG日志级别,检查作业提交和执行过程中的详细信息。 5. 升级到最新的Flink版本以避免潜在的兼容性问题。
如果问题仍未解决,建议提供更详细的代码和配置信息,以便进一步分析。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。