开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

,jdbc sink支持回撤流吗?似乎flink sql中-D的数据并不能执行删除的mysql的对应

,jdbc sink支持回撤流吗?似乎flink sql中-D的数据并不能执行删除的mysql的对应数据image.png image.png

展开
收起
雪哥哥 2022-11-05 21:44:53 2698 0
18 条回答
写回答
取消 提交回答
  • 阿里云实时计算 Flink 中的 JDBC Sink 支持回撤流(Retract Stream),可以将撤回的数据删除。在 Flink SQL 中,使用 -D 参数指定 JDBC Sink 的 delete-sql 时,可以使用 RETRACT_STREAM() 函数来删除回撤流中的数据。例如:

    CREATE TABLE sink_table (
      id BIGINT,
      name STRING,
      age INT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector.type' = 'jdbc',
      'connector.url' = 'jdbc:mysql://localhost:3306/test',
      'connector.table' = 'sink_table',
      'connector.driver' = 'com.mysql.jdbc.Driver',
      'connector.username' = 'root',
      'connector.password' = 'root',
      'connector.write.flush.max-rows' = '1',
      'connector.write.flush.interval' = '1s',
      'connector.write.max-retries' = '3',
      'connector.write.retry-interval' = '2s',
      'connector.write.delete-sql' = 'DELETE FROM sink_table WHERE id = ?'
    );
    
    INSERT INTO sink_table
    SELECT id, name, age FROM source_table;
    
    UPDATE source_table SET age = 20 WHERE id = 1;
    
    DELETE FROM source_table WHERE id = 2;
    

    在上面的例子中,使用 -D 参数指定了一个 delete-sql,当有数据从 source_table 中删除时,会执行该 delete-sql 将数据从 sink_table 中删除。使用 RETRACT_STREAM() 函数可以将回撤流中的数据删除。例如:

    INSERT INTO sink_table
    SELECT id, name, age FROM source_table
    UNION ALL
    SELECT id, name, age FROM source_table
    WHERE age = 20 AND id IN (SELECT id FROM source_table WHERE age != 20);
    
    INSERT INTO sink_table
    SELECT id, name, age FROM source_table
    WHERE age = 20 AND id NOT IN (SELECT id FROM source_table WHERE age != 20)
    AND NOT EXISTS (SELECT * FROM retract_stream());
    

    在上面的例子中,使用 RETRACT_STREAM() 函数将回撤流中的数据删除。

    2023-05-06 23:26:07
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    阿里云实时计算 Flink的JDBC Sink支持回撤流,可以将撤回的数据删除或更新到指定的数据库中。使用JDBC Sink需要注意以下事项:

    1. 确保配置正确。请检查您在创建JDBC Sink时输入的配置是否正确,包括JDBC URL、数据库驱动、用户名和密码等。

    2. 检查数据库表结构。在使用JDBC Sink之前,请确保您的数据库表结构与输入流的数据结构相匹配。如果表结构不匹配,可能会导致数据写入失败。

    3. 确保目标数据库支持回撤流。如果您的目标数据库不支持回撤流,您将无法使用JDBC Sink来处理回撤数据。在MySQL数据库中,可以使用DELETE语句来删除回撤数据。

    另外,在Flink SQL中,-D选项用于设置外部系统或表所需的属性,在删除MySQL表数据时无法使用。您应该使用DELETE语句来删除回撤数据。例如:

    DELETE FROM table_name WHERE column_name = 'rollback_value';
    

    这里的“rollback_value”是从回撤流中获取的数据。

    2023-05-04 22:47:33
    赞同 展开评论 打赏
  • 其实sink算子是否支持回撤流,要根据sink数据源的特性而定。例如jdbc sink在Flink1.11中只支持upsert(不配置primary key会报错)。这都跟sink数据源的特性密切相关.

    2023-05-02 08:08:39
    赞同 1 展开评论 打赏
  • JDBC Sink 不支持流回撤。如果需要回撤流,可以考虑使用 Flink 原生支持的 sink,如 Kafka、Elasticsearch 等。这些 sink 支持 Flink 的 exactly-once 语义和流回撤。具体可参考 Flink 官方文档。

    2023-04-27 22:25:17
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    你这是2个问题。 第一,JDBC Sink是支持回撤流。回撤流是指Flink在某些情况下需要撤回之前发送给Sink的数据,例如在发生故障时。 对于第二个问题,可能是由于您在Flink SQL中使用了-D参数错误。如果您想要删除MySQL 中的数据,可以使用DELETE语句。 如果真的存在错误,可以把错误信息帖出来。

    2023-04-27 10:51:06
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    JDBC Sink 在 Flink 中是支持回撤流的。回撤流是指在 Flink 中取消之前已经输出到 Sink 的数据。JDBC Sink 支持回撤流的方式是通过在 Flink 中使用 Upsert 方式写入数据,即在写入数据时,如果数据已经存在,则更新数据,否则插入新数据。这种方式可以保证在回撤流时,已经输出的数据可以被覆盖或删除。

    关于您提到的在 Flink SQL 中使用 -D 参数删除 MySQL 数据的问题,可能是由于您使用了错误的 SQL 语句或参数。建议您检查 SQL 语句和参数是否正确,并确保使用的 MySQL 用户拥有删除数据的权限。如果问题仍然存在,可以提供更多详细信息,以便更好地理解问题。

    2023-04-26 12:27:56
    赞同 展开评论 打赏
  • 使用两个Sink。第一个Sink(如JDBC Sink)仅进行insert操作。第二个Sink监听第一個Sink的输出,捕获retract和delete操作,并在目标数据库执行对应update和delete语句。这种方式可以支持其他目标数据库,但增加了系统复杂度

    2023-04-25 14:13:20
    赞同 展开评论 打赏
  • 从事java行业8年至今,热爱技术,热爱以博文记录日常工作,csdn博主,座右铭是:让技术不再枯燥,让每一位技术人爱上技术

    官方对于sink的插入模式有以下三种描述:

    Append 模式 - 该模式用户在定义Sink的DDL时候不定义PK,在Apache Flink内部生成的所有只有INSERT语句;

    Upsert 模式 - 该模式用户在定义Sink的DDL时候可以定义PK,在Apache Flink内部会根据事件打标(retract机制)生成INSERT/UPDATE和DELETE 语句,其中如果定义了PK, UPDATE语句按PK进行更新,如果没有定义PK UPDATE会按整行更新;

    Retract 模式 - 该模式下会产生INSERT和DELETE两种信息,Sink Connector 根据这两种信息构造对应的数据操作指令;

    但其实sink算子是否支持回撤流,要根据sink数据源的特性而定。例如kafka sink只支持append模式,jdbc sink在Flink1.11中只支持upsert(不配置primary key会报错)。

    2023-04-25 09:52:27
    赞同 展开评论 打赏
  • JDBC Sink 不支持回撤流,因为 JDBC 需要向外发送 DML 语句才能保证数据的一致性和正确性。实时计算引擎在处理某个时间点的数据到 JDBC sink 中输出时,该时间前已经没有了状态和事件信息,所以无法根据之前的操作来执行相应的 SQL 删除语句。

    如果你想要 JDBC Sink 支持回撤流,可以考虑将 Flink 程序变成批量任务运行,并利用不同数量的 parallism 将所有的记录全部写入一个数据表中,在批量任务结束后再一次性地根据 State 数据进行各种元素的合并与更新(包括删除)。这样便能够解决您提出的问题并且也减少了等待时间。在此过程上尽可能保障表结构简洁易读,同时整理1级或2级索引方便后续查询及运营分析。

    2023-04-24 17:46:24
    赞同 展开评论 打赏
  • Flink JDBC Sink 支持回撤流 JDBC Sink 只支持 INSERT 和 UPSERT 操作,不支持 DELETE 操作。因此,在使用 Flink SQL 时,如果你想执行 DELETE 操作,需要将其转换为对应的 UPSERT 操作。

    在 Flink SQL 中,回撤流需要使用特定的语法进行定义。例如,下面是一个使用回撤流的 Flink SQL 查询:

    SELECT * FROM my_table WHERE id > 100 WITH (update-mode = 'append-only')
    

    在这个查询中,update-mode = 'append-only' 表示使用回撤流。

    如果你在 Flink SQL 中使用了回撤流,并且使用 JDBC Sink 将数据写入 MySQL 中,需要确保 MySQL 表定义中包含一个用于唯一标识每个记录的主键。这是因为 Flink JDBC Sink 使用主键来确定要更新的记录。如果表定义中没有主键,则 Flink JDBC Sink 将无法正确执行 UPSERT 操作。
    2023-04-24 15:28:19
    赞同 展开评论 打赏
  • Flink JDBC Sink将Flink的数据写入外部数据库表中,但仅支持插入(insert)操作。它不支持: 1. retract操作:将之前插入的行从表中删除。 2. delete操作:删除表中现有的行。

    这是一个已知的限制,原因是JDBC连接器目前还不支持Flink的更新操作。 那么,如果需要支持这些操作,有以下几种方法: 1. 不使用JDBC Sink,转而使用Upsert Sink(可以将retract转换为update操作)或MySQL Sink(支持delete和update)。这需要目标数据库是MySQL,并且要求使用MySQL的binlog来确保exactly-once语义。 2. 使用两个Sink。第一个Sink(如JDBC Sink)仅进行insert操作。第二个Sink监听第一個Sink的输出,捕获retract和delete操作,并在目标数据库执行对应update和delete语句。这种方式可以支持其他目标数据库,但增加了系统复杂度。 3. 不使用SQL,直接在DataStream API中使用JDBCAppendTableSink进行只插入的操作,自己手动在DataStream中处理retract和delete,执行对应的更新和删除语句。这种方式较低层,需要手动关心状态管理和exactly-once语义。 4. 将删除和更新操作在Flink SQL中进行(发出delete和update语句),但在JDBC Sink中忽略。这会导致目标数据库中的数据与Flink运算的结果逐渐偏离。只在暂时无法完成delete和update操作时可以考虑。 所以,总体来说,如果需要支持实时的数据库更新和删除操作,JDBC Sink并不是一个很好的选择。我的建议是考虑Upsert Sink或MySQL Sink来替代。

    2023-04-24 15:25:21
    赞同 展开评论 打赏
  • 发表文章、提出问题、分享经验、结交志同道合的朋友

    JDBC sink 在 Flink 中支持撤回流(Retract Stream)和删除流(Delete Stream)两种类型的流。回撤流用于将更新的数据以插入-删除的形式写入目标数据库,而删除流则直接删除目标数据库中的数据。

    在 Flink SQL 中,可以通过设置 -D 参数来选择写入撤回流还是删除流。例如,使用以下命令可以将数据写入撤回流:

    INSERT INTO jdbc_sink
    SELECT ...
    FROM ...
    
    
    2023-04-24 11:32:41
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    根据官方文档,Flink 的 JDBC Sink 支持 Upsert 和 Append 两种模式,不支持 Delete 写入。

    考虑到回撤流需要支持数据的删除操作,在使用 Flink JDBC Sink 进行数据写入时,如果需要支持回撤流的功能,建议将 Upsert 或 Append 模式改为 Delete + Insert 或者 Replace,然后通过设置合适的时间窗口来控制旧数据的删除和新数据的写入。这样可以确保能够正确地处理回撤流场景。

    除此之外,还有一些其他的存储技术可以支持回撤流的操作,例如 Apache Hudi、Apache Iceberg 等,可以根据实际的使用场景进行选择。

    2023-04-24 09:35:56
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    根据您提供的截图,这是一个Flink SQL中使用JDBC Sink进行数据输出时的问题。您的问题是JDBC Sink是否支持回撤流,并且在使用Flink SQL中的-D参数进行数据删除时是否能够对MySQL进行相应的删除操作。

    首先,需要明确的是,JDBC Sink是一个Flink的官方插件,可以将Flink处理的数据写入到JDBC支持的各种关系型数据库中。它可以支持批量写入、异步写入、写入前缓存等功能,并且可以自定义数据写入的方式和格式。但是,它并不支持回撤流功能。

    回撤流是指当Flink的数据流处理出错时,可以将处理过的数据回撤到之前的状态,并重新处理数据流。Flink中的回撤流功能由Flink的StateBackend和Checkpoint机制来支持。而JDBC Sink并不支持状态回退和Checkpoint,因此无法支持回撤流。

    对于您的问题,如果您使用Flink SQL中的-D参数进行数据删除操作,但是在MySQL中没有对应的删除操作,可能是由于数据删除操作没有正确地传递到MySQL。您可以检查Flink的日志和MySQL的日志,以确定数据删除操作是否被正确执行。如果您确认数据删除操作已经被正确执行,但MySQL中仍然存在数据,那么可能是MySQL的配置或设置问题,建议您检查MySQL的配置和设置,以确保其正确性。

    希望这些信息能够帮助您解决问题。如果您仍然遇到问题,请提供更多详细的信息,以便我们更好地帮助您。

    2023-04-23 20:13:04
    赞同 展开评论 打赏
  • 热爱开发

    JDBC Sink 在 Flink 中是不支持回撤流的,因为它本质上是将数据写入到外部系统中,而在这种场景下,回撤操作可能会导致外部系统数据的不一致性。如果您需要支持回撤流,可以考虑使用其他类型的 Sink,比如 Apache Kafka、Apache Hudi 等。

    另外,在 Flink SQL 中,-D 数据删除语句默认也是不起作用的。这是因为 Flink SQL 的 DELETE 语句本质上是将数据标记为已删除,而并非真正地从外部系统中删除数据。如果您希望从外部系统中删除数据,可以考虑使用 TRUNCATE TABLE 或 DROP TABLE 语句来执行删除操作。但需要注意的是,这些语句可能会造成数据的永久丢失,请谨慎使用。

    2023-04-23 16:51:25
    赞同 展开评论 打赏
  • 技术架构师 阿里云开发者社区技术专家博主 CSDN签约专栏技术博主 掘金签约技术博主 云安全联盟专家 众多开源代码库Commiter

    JDBC Sink 在 Flink 中支持流的回撤。什么是回撤流?在 Flink 中,当一个 operator 发现之前输出的结果是错误的,这时候它可以把新的结果发出来,同时把之前错误的结果回撤掉(相当于删除之前输出的记录)。回撤流被设计用于保证数据的一致性和正确性。

    那么在使用 JDBC Sink 时如何支持回撤流呢?具体来说,需要在创建 JDBC Sink 对应的 TableSink 时传入 upsertMode 参数为 UpsertMode.UPDATE_BEFORE。这个参数指定了更新操作之前需要先执行 DELETE 操作。下面是一个示例代码:

    // 创建一个 JDBC TableSink
    JdbcTableSink sink = JdbcTableSink.builder()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl("jdbc:mysql://localhost:3306/mydb")
        .setUsername("myuser")
        .setPassword("mypassword")
        .setQuery("INSERT INTO mytable (id, name) VALUES (?, ?)")
        .setBatchSize(1000)
        .setParameterTypes(Types.INT, Types.STRING)
        .setUpsertMode(UpsertMode.UPDATE_BEFORE)
        .build();
    
    // 把该 TableSink 注册到 Flink 的 TableEnvironment 中
    tEnv.registerTableSink("mytable", new String[] {"id", "name"}, new TypeInformation[] {Types.INT, Types.STRING}, sink);
    
    

    在上述代码中,我们通过调用 setUpsertMode(UpsertMode.UPDATE_BEFORE) 将 JDBC Sink 的 Upsert 模式设为 UPDATE_BEFORE 模式,这样每次执行 Upsert 操作前都会先执行 DELETE 操作。注意,要支持回撤流,需要保证表中有主键或唯一索引,否则 DELETE 操作将无法正常执行。

    至于您提到的 Flink SQL 中 -D 参数不能执行删除操作的问题,可能是由于您没有指定正确的主键或唯一索引导致的

    2023-04-23 16:40:02
    赞同 展开评论 打赏
  • Flink SQL 的 JDBC Sink 在默认情况下是不支持回撤流的。即使将 JdbcSink 的参数 "sink.buffer-flush.max-rows" 设置为 1 (即每次只提交一个数据行),它仍然无法保证每个数据行的提交,因此只适用于 FLINK 从本地文件读取数据的批处理作业,而不适合流处理作业。

    若需要在 Flink SQL 中使用回撤流,可以考虑使用 Flink CDC (Change Data Capture) 或 Debezium 等工具来实现。这些工具可以 capture 数据源的变更数据,并将其转换为带有元数据的流式数据,从而支持流式数据回撤的操作。

    2023-04-23 16:33:56
    赞同 展开评论 打赏
  • JDBC Sink 支持回撤流(Retract Stream)模式,可以将 Flink SQL 中的流式查询结果写入到 JDBC 数据库中。回撤流模式是指 Flink SQL 中的查询结果包含了 INSERT 和 DELETE 操作,JDBC Sink 会根据 INSERT 和 DELETE 操作来更新数据库中的数据。 在使用 Flink SQL 中的-D参数时,如果您执行了 DELETE 操作,那么这个 DELETE 操作会被转换为回撤流模式中的 DELETE 操作,JDBC Sink 会根据这个 DELETE 操作来更新数据库中的数据。但是需要注意的是,JDBC Sink 只能更新数据库中的数据,不能删除已经写入到数据库中的数据。如果您需要删除已经写入到数据库中的数据,需要手动执行 DELETE SQL 语句来删除。

    另外,需要注意的是,在使用 Flink SQL 中的-D参数时,如果您的查询结果包含了 UPDATE 操作,那么这个 UPDATE 操作会被转换为 INSERT 操作和 DELETE 操作,JDBC Sink 会根据这两个操作来更新数据库中的数据。因此,在使用-D参数时,需要特别注意您的查询结果中是否包含了 UPDATE 操作,以免造成数据更新异常。

    2023-04-23 11:40:43
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    SQL Server在电子商务中的应用与实践 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载

    相关镜像