开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问下大家.flink 这是啥问题?

请问下大家.flink mysql cdc conversion expects insert-only records but.datastream api record contains:Delete啥问题呀?

展开
收起
三分钟热度的鱼 2023-11-08 21:16:35 95 0
3 条回答
写回答
取消 提交回答
  • 这个问题可能是因为您正在尝试使用 Flink CDC Connector 读取包含 DELETE 操作的 MySQL binlog 数据,但是 DataStream API 在处理这些数据时只会接受 INSERT 操作,不支持 DELETE 操作。

    为了解决这个问题,您可以使用以下两种方法之一:

    1. 将 CDC 插件升级到最新版本,或者尝试使用社区版本的 Flink CDC Connector,如 Debezium 或 Canonicus。这些插件通常提供了更为全面的支持,包括 DELETE、UPDATE 等操作。

    2. 在读取 CDC 数据之前先对其进行预处理,将 DELETE 操作转化为 INSERT 操作。您可以使用 SQL DML 语句(如 INSERT INTO...SELECT)将原表中的所有数据复制到临时表中,然后删除原始表的数据。这种做法意味着 Flink 将只接收到 INSERT 操作,因此能够正确处理。

    2023-11-09 22:05:01
    赞同 1 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    这个问题可能是因为您的 Flink 作业中的 DataStream API 输入类型不符合预期的要求所致。
    FLINK-MySQL CDC(Change Data Capture)期望接收插入操作(INSERT_ONLY)类型的记录,而不是 DELETE 记录。当使用 DataStream API 接收源数据时,您必须确保只向源表插入记录,而不是删除或更新记录。如果源表包含 DELETE 或 UPDATE 记录,则可能导致异常。
    解决这个问题的方法有两种:

    1. 尽量只向源表插入记录

    如果您不能控制源表中的记录类型,则可以尝试在输入表上创建触发器,以将 DELETE 或 UPDATE 记录转换为 INSERT 操作。这种方法比较复杂,但在有些场景下很有用。

    1. 修改 Flink 代码

    您可以使用 filter 函数过滤掉不符合要求的记录。以下是一个简单的示例:

    DataStream<String> input = ...;
    input.filter(record -> !record.contains("DELETE"))
        .addSink(...);
    

    上述代码只会发送不含 DELETE 字符串的记录。您可以根据实际情况进行修改,以适应您的需求。

    2023-11-09 13:21:30
    赞同 展开评论 打赏
  • 这个问题似乎是你在使用Apache Flink和MySQL Change Data Capture (CDC)功能时遇到了一些问题。从你的问题描述来看,Flink期望只插入记录的数据流,但实际接收到的数据流中包含了删除操作。

    这可能是由于Flink的CDC转换器设计为只处理插入操作,而不处理删除或更新操作。在MySQL的CDC中,删除操作通常是通过在sysinformation_schema等系统表中标记删除来实现的,而不是直接在数据表中删除。因此,如果你在数据流中收到了一个删除操作,这可能意味着你的数据源在发出这个删除操作时有一些问题。

    为了解决这个问题,你可能需要检查并更改你的数据源或数据流以只包含插入操作。或者,你可能需要更改Flink的CDC转换器以处理删除操作。

    2023-11-09 10:09:53
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载