Flink org.apache.flink.util.FlinkRuntimeException: Error during input conversion. Conversion expects insert-only records but DataStream API record contains: UPDATE_AFTER
at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)大佬们有遇到这个问题吗 ,是什么原因啊 ?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink处理的事件流包含了变更数据捕获(CDC)中的更新事件,但处理逻辑或转换器只支持插入操作(insert-only),不支持UPDATE或者DELETE事件。
当Flink作业抛出“org.apache.flink.util.FlinkRuntimeException: Error during input conversion. Conversion expects insert-only records but DataStream API record contains: UPDATE_AFTER”异常时,这通常表示您正在尝试将包含更新操作类型的数据转换为Table API或SQL处理所期望的插入型记录,而Table API默认情况下仅支持insert-only(插入型)流。
在Flink的动态表概念中,流式表数据只能通过插入新行来更新。如果您的数据源是 changelog stream(变更日志流),即包含了INSERT、UPDATE_BEFORE、UPDATE_AFTER等操作类型的消息,直接转换成Table时就会出现问题,因为Table API默认不知道如何处理UPDATE或DELETE类型的事件。
若要处理包含更新和删除操作的changelog数据流,您需要在定义表源时明确指定其行为为可处理更新和删除事件,例如在创建表源时使用withRowType(...)
以及toChangelogStream(...)
等方法来正确解析和转换变更消息。具体解决方案取决于您的数据源和表结构,可能需要以下步骤之一或多个:
如果是从Kafka或其他支持变更日志格式的数据源读取数据,请确保在创建表源时配置正确的格式解释器或debezium等工具,它们可以自动识别并处理变更日志。
如果您是在DataStream API中生成的变更日志,并想将其转换为Table API处理,请确保先将其转换为符合Table API变更日志格式的DataStream。
在某些情况下,您可能需要编写自定义的转换逻辑,以适应Table API对变更日志记录的处理要求。例如,对于UPDATE_AFTER事件,提取更新后的完整行作为新的插入事件。
你好!从给出的日志来看,问题是出现在Flink Table API 的InputConversionOperator 中,其中有一个名为UPDATE_AFTER的字段。这个错误意味着输入转换操作预期只能包含INSERT操作,但实际上却包含了UPDATE_AFTER操作。
这种情况的原因可能有很多,以下是可能出现的一些常见场景及对应的解决思路:
问题出在 Flink 的数据转换上。具体来说,Flink 期望接收插入操作的数据,但是接收到的数据包含 UPDATE_AFTER 操作。这可能导致 Flink 在处理数据时出现问题。
要解决这个问题,您需要检查您的数据源,确保它提供的是符合 Flink 插入操作要求的数据。这可能涉及到修改数据源,或者对数据进行预处理,将其转换为符合 Flink 要求的格式。
此外,您还可以考虑将 UPDATE_AFTER 操作与其他操作(如批处理)结合,以便在 Flink 中正确处理此类操作。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。