请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗?
在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀?
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
'connector' = 'kafka',
'topic' = 'topic_flink',
'properties.bootstrap.servers' = '10.3.12.113:9092',
'properties.group.id' = 'flink',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true&characterEncoding=utf8&serverTimezone=PRC&useSSL=false'
'username' = 'root',
'password' = 'root1234',
'table-name' = 'latest_rates',
'lookup.cache.max-rows' = '1',
'lookup.cache.ttl' = '1min'
)
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o
LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency"*来自志愿者整理的flink邮件归档
这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。
另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = ‘1min’ 优化。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。