开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink 有遇到这个问题吗 ,是什么原因啊 ?

Flink org.apache.flink.util.FlinkRuntimeException: Error during input conversion. Conversion expects insert-only records but DataStream API record contains: UPDATE_AFTER
at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)大佬们有遇到这个问题吗 ,是什么原因啊 ?

展开
收起
真的很搞笑 2023-09-19 08:36:13 216 0
5 条回答
写回答
取消 提交回答
  • Insert-only 类型的流表转化,默认会传播时间和水位线:时间属性字段名用rowtime ,此时间属性是event_time还是processing_time和table定义有关,不支持update。

    ——参考链接

    2024-01-24 16:24:49
    赞同 1 展开评论 打赏
  • Flink处理的事件流包含了变更数据捕获(CDC)中的更新事件,但处理逻辑或转换器只支持插入操作(insert-only),不支持UPDATE或者DELETE事件。

    2024-01-21 21:28:40
    赞同 展开评论 打赏
  • 当Flink作业抛出“org.apache.flink.util.FlinkRuntimeException: Error during input conversion. Conversion expects insert-only records but DataStream API record contains: UPDATE_AFTER”异常时,这通常表示您正在尝试将包含更新操作类型的数据转换为Table API或SQL处理所期望的插入型记录,而Table API默认情况下仅支持insert-only(插入型)流。

    在Flink的动态表概念中,流式表数据只能通过插入新行来更新。如果您的数据源是 changelog stream(变更日志流),即包含了INSERT、UPDATE_BEFORE、UPDATE_AFTER等操作类型的消息,直接转换成Table时就会出现问题,因为Table API默认不知道如何处理UPDATE或DELETE类型的事件。

    若要处理包含更新和删除操作的changelog数据流,您需要在定义表源时明确指定其行为为可处理更新和删除事件,例如在创建表源时使用withRowType(...)以及toChangelogStream(...)等方法来正确解析和转换变更消息。具体解决方案取决于您的数据源和表结构,可能需要以下步骤之一或多个:

    1. 如果是从Kafka或其他支持变更日志格式的数据源读取数据,请确保在创建表源时配置正确的格式解释器或debezium等工具,它们可以自动识别并处理变更日志。

    2. 如果您是在DataStream API中生成的变更日志,并想将其转换为Table API处理,请确保先将其转换为符合Table API变更日志格式的DataStream。

    3. 在某些情况下,您可能需要编写自定义的转换逻辑,以适应Table API对变更日志记录的处理要求。例如,对于UPDATE_AFTER事件,提取更新后的完整行作为新的插入事件。
      image.png

    2024-01-15 14:11:51
    赞同 展开评论 打赏
  • 某政企事业单位运维工程师,主要从事系统运维及大数据开发工作,多次获得阿里云、华为云、腾讯云征文比赛一二等奖;CTF选手,白帽,全国交通行业网络安全大赛二等奖,手握多张EDU、CNVD、CNNVD证书

    你好!从给出的日志来看,问题是出现在Flink Table API 的InputConversionOperator 中,其中有一个名为UPDATE_AFTER的字段。这个错误意味着输入转换操作预期只能包含INSERT操作,但实际上却包含了UPDATE_AFTER操作。

    这种情况的原因可能有很多,以下是可能出现的一些常见场景及对应的解决思路:

    • 输入格式错误:确保你的输入数据符合正确的格式要求。一般来说,Flink Table API 对于每个字段都有一定的规范要求,如字段命名、数据类型等等。请仔细检查你的输入数据是否遵循了这些规范。
    • 代码逻辑错误:检查你的代码逻辑,特别是在处理更新操作的地方。有时候,即使你是有意为之,但在某些情况下,你可能无意间引入了更新操作。请确保你的代码逻辑清晰明了,尽量减少意外发生的可能性。
    • 版本冲突:如果你最近升级了Flink 或者其他的依赖包,请注意检查是否存在版本冲突。有时候,旧版的代码可能不再适用于新版的库,这就需要你手动进行一些调整。
    • 使用错误的API:确保你正在使用正确的API。Flink Table API 和DataStream API 可能有不同的用途和适用范围。如果你不确定哪个API适合你的需求,最好先阅读相关的文档资料,以便理解每一种API的特点和应用场景。
    • 缺失依赖项:检查你的项目是否遗漏了必需的依赖项。有时候,当你导入一个新的模块或者更改现有的结构时,可能会忘记添加一些必要的依赖关系。
    • Bug/缺陷:如果以上步骤都不能解决问题,那么可能是Flink自身的一个Bug或者是缺陷。这时候最好的做法就是提交一个Issue报告,附带详细的重现步骤和错误堆栈跟踪,让开发者有机会定位和修复问题。
    2024-01-14 20:04:35
    赞同 1 展开评论 打赏
  • 北京阿里云ACE会长

    问题出在 Flink 的数据转换上。具体来说,Flink 期望接收插入操作的数据,但是接收到的数据包含 UPDATE_AFTER 操作。这可能导致 Flink 在处理数据时出现问题。
    要解决这个问题,您需要检查您的数据源,确保它提供的是符合 Flink 插入操作要求的数据。这可能涉及到修改数据源,或者对数据进行预处理,将其转换为符合 Flink 要求的格式。
    此外,您还可以考虑将 UPDATE_AFTER 操作与其他操作(如批处理)结合,以便在 Flink 中正确处理此类操作。

    2024-01-12 22:16:39
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载