Flink CDC里flinksql 维表join ,主表同一条数据3s内连续变更,维表字段会关联出很久以前的数据,这种是什么情况造成的呢?create table source_trade_order_test1 (
schema
STRING ,
table
STRING ,
action
string,
columns
ARRAY>,
identity
ARRAY>,
timestamp
as PROCTIME()
) WITH (
'connector' = 'kafka', --kafka连接器
'topic' = 'ods_dev_bt1_2',
'properties.bootstrap.servers' = 'etl01:9092',
'properties.group.id' = 'ods_trade_order',
'scan.startup.mode' = 'latest-offset',
'format' = 'json' --指定格式
);
CREATE TABLE trade_order_sink_test1 (
id
String,
request_bala
decimal(16, 2),
request_share
decimal(16, 2),
audit_time
String,
request_date
String,
operator_name
String,
operator_id
String,
operator_mobile
String,
primary key(id) not ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.106.0.62:5432/dmp?currentSchema=public',
'table-name' = 'base_trade_trade_order',
'username' = 'dmp',
'password' = 'test_hdT65x'
);
-- 操作员信息
CREATE TABLE fofund_fap_operator_for_order1 (
id String, -- 主键
name String, -- 操作员name
operator_code String, -- 操作员code
mobile String, -- 手机号码
delete_flag String,
primary key(id) not ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.106.100.54:5432/fofund_fap?currentSchema=public',
'table-name' = 'operator',
'username' = 'fofund_fap',
'password' = 'Test123',
'lookup.cache.max-rows' = '100', -- 缓存记录的最大行数
'lookup.cache.ttl' = '30s' -- 单位s,超过时间自动刷新
);
upsert into trade_order_sink_test1
select
columns
[1].value
as id,
CAST(columns
[4].value
AS DECIMAL) as request_bala,
CAST(columns
[5].value
AS DECIMAL) as request_share,
columns
[7].value
as audit_time,
columns
[8].value
as request_date,
m.name as operator_name,
m.id as operator_id,
m.mobile as operator_mobile
from source_trade_order_test1 t
left join fofund_fap_operator_for_order1 FOR SYSTEM_TIME AS OF t.timestamp
AS m
on m.operator_code = columns
[11].value
and m.delete_flag = '0'
where table
='trade_order' and action
in ('U','I')
;
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。