开发者社区> 问答> 正文

有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢?

各位大佬好,请教一个问题: 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl ,是否由于'update-mode' = 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。

table_ddl = """ CREATE TABLE table_ddl (  trck_id VARCHAR ) WITH (  'connector.type' = 'kafka',  'connector.version' = 'universal',      'connector.topic' = 'w',      'connector.startup-mode' = 'group-offsets',  'connector.properties.group.id' = 'trck_w',   'update-mode' = 'append',  'connector.properties.zookeeper.connect' = '*',  'connector.properties.bootstrap.servers' = '%#',  'format.type' = 'json',       'format.derive-schema' = 'true'  ) """

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 20:15:41 447 0
1 条回答
写回答
取消 提交回答
  • 我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。 至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗? 我个人猜可能有两种方案: 1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始; 2.定期向文件系统写入数据。

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:42:58
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载