试试 去掉env.execute() 只保留tenv.execute此回答整理自钉群“【③群】Apache Flink China社区”
在 Flink 中,可以通过使用 toRetractStream 方法将表转换为数据流。使用该方法后,每次表中的数据发生变化时,Flink 会将旧的数据发送到流中,并标记为删除,同时将新的数据也发送到流中,标记为添加。这就是所谓的“撤回流”(Retract Stream)。
如果对同一个表进行两次 toRetractStream 转换,将会产生两个不同的数据流,一个数据流包含了上一次转换的删除和添加操作,另一个数据流包含了最新一次转换的删除和添加操作。
为了解决这个问题,可以尝试以下两种方法:
1、只转换一次表为数据流 将表转换为数据流一次,并在后续处理过程中直接使用该数据流。避免重复转换,从而避免产生多个数据流。
2、合并多个数据流 如果已经出现了多个数据流,可以尝试使用 Flink 提供的 union 方法将它们合并为一个数据流。这样可以将多个数据流中的元素合并到同一个流中,并保留所有操作的最新状态。
// 转换表为数据流
val dataStream1 = table1.toRetractStream[Row]
// 再次转换表为数据流
val dataStream2 = table2.toRetractStream[Row]
// 合并两个数据流
val mergedStream = dataStream1.union(dataStream2)
// 对合并后的数据流进行操作
mergedStream.print()
上述代码将 dataStream1 和 dataStream2 两个数据流合并为 mergedStream,并对其进行打印操作。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。