开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC有没有办法可以做到cdc表更新以后就去数据库查询最新的jdbc表呢?

Flink CDC中flinksql进行一个3张表的全量加增量实时联表同步,现在我如果3张表都用cdc连接器,flink程序的内存就会涨的很快,如果设置状态过期时间,就会导致数据丢失,因为不同表的时间跨度可能会比较长

现在,我把这三张表更新最迟的一个表使用cdc连接器,其余两个使用jdbc连接器,但是flink会缓存jdbc的表,所以联表执行的不是数据库的最新数据,有没有办法可以做到cdc表更新以后就去数据库查询最新的jdbc表呢

展开
收起
真的很搞笑 2023-12-11 13:20:21 37 0
3 条回答
写回答
取消 提交回答
  • 在您的情况下,为了保证最新数据的准确性,可以尝试以下解决方案:

    使用Flink的异步IO功能:Flink提供了异步IO的功能,可以在流处理任务中使用异步方式去查询数据库的最新数据。您可以将两个使用JDBC连接器的表作为异步IO的请求源,当CDC表更新时,触发异步IO请求去查询最新的JDBC表数据,并将结果与CDC表进行联结操作。

    定时从JDBC表中查询最新数据:可以在Flink任务中设置定时任务,周期性地从JDBC表中查询最新的数据,并将其缓存起来。然后,在CDC表更新时,从缓存中获取最新数据,并将其与CDC表进行联结操作。

    使用外部存储:将JDBC表的数据缓存在外部存储中,例如Redis或者其他缓存数据库。当CDC表更新时,从外部存储中获取最新数据,并将其与CDC表进行联结操作。

    这些方案可以确保在CDC表更新时,能够及时查询到最新的JDBC表数据,并进行联表操作。您可以根据具体的业务需求和系统架构选择合适的方案进行实现。

    2023-12-18 22:08:33
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    您可以考虑使用Flink CDC的定时拉取功能来解决这个问题。通过设置定时器,您可以让Flink程序定期从数据库中拉取最新的数据,并将结果更新到内存中。这样可以避免因为CDC表更新而导致的数据丢失问题,同时也能够保证联表查询使用的是最新的数据。

    具体来说,您可以在Flink SQL中使用CREATE TABLE语句创建JDBC表,并使用SETTINGS子句来设置定时器。例如:

    CREATE TABLE jdbc_table (
      id INT,
      name STRING,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://localhost:3306/mydb',
      'table-name' = 'mytable',
      'username' = 'user',
      'password' = 'pass',
      'sink.buffer-flush.max-rows' = '1000',
      'sink.buffer-flush.interval' = '1m' -- 每分钟拉取一次最新的数据
    );
    

    在上面的示例中,我们设置了sink.buffer-flush.interval为1分钟,表示每分钟拉取一次最新的数据。同时,我们还设置了sink.buffer-flush.max-rows为1000,表示每次最多拉取1000行数据。这些参数可以根据实际需求进行调整。

    2023-12-12 17:39:51
    赞同 展开评论 打赏
  • 把jdbc的缓存关了,用look up join ,此回答整理自钉群“Flink CDC 社区”

    2023-12-11 22:01:47
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载