开发者社区> 问答> 正文

Flink-1.12支持kafka join jdbc维表吗

请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗? 

在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀? 

CREATE TABLE orders ( 

order_id STRING, 

currency STRING, 

amount INT, 

order_time TIMESTAMP(3), 

WATERMARK FOR order_time AS order_time 

) WITH ( 

'connector' = 'kafka', 

'topic' = 'topic_flink', 

'properties.bootstrap.servers' = '10.3.12.113:9092', 

'properties.group.id' = 'flink', 

'format' = 'json', 

'scan.startup.mode' = 'latest-offset' 

CREATE TABLE latest_rates ( 

currency STRING, 

rate DECIMAL(38, 10), 

currency_time TIMESTAMP(3), 

WATERMARK FOR currency_time AS currency_time, 

PRIMARY KEY (currency) NOT ENFORCED 

) WITH ( 

'connector' = 'jdbc', 

'url' = 'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true&characterEncoding=utf8&serverTimezone=PRC&useSSL=false' 

'username' = 'root', 

'password' = 'root1234', 

'table-name' = 'latest_rates', 

'lookup.cache.max-rows' = '1', 

'lookup.cache.ttl' = '1min' 

SELECT 

o.order_id, 

o.order_time, 

o.amount * r.rate AS amount, 

r.currency 

FROM orders AS o 

LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r 

ON o.currency = r.currency"*来自志愿者整理的flink邮件归档

展开
收起
又出bug了-- 2021-12-02 11:45:15 707 0
1 条回答
写回答
取消 提交回答
  • 这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。 

    如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。 

    另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = ‘1min’ 优化。*来自志愿者整理的FLINK邮件归档

    2021-12-02 14:31:34
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载