开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC里flinksql 维表join ,主表同一条数据3s内连续变更是什么造成的?

Flink CDC里flinksql 维表join ,主表同一条数据3s内连续变更,维表字段会关联出很久以前的数据,这种是什么情况造成的呢?create table source_trade_order_test1 (
schema STRING ,
table STRING ,
action string,
columns ARRAY>,
identity ARRAY>,
timestamp as PROCTIME()
) WITH (
'connector' = 'kafka', --kafka连接器
'topic' = 'ods_dev_bt1_2',
'properties.bootstrap.servers' = 'etl01:9092',
'properties.group.id' = 'ods_trade_order',
'scan.startup.mode' = 'latest-offset',
'format' = 'json' --指定格式
);

CREATE TABLE trade_order_sink_test1 (
id String,
request_bala decimal(16, 2),
request_share decimal(16, 2),
audit_time String,
request_date String,
operator_name String,
operator_id String,
operator_mobile String,
primary key(id) not ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.106.0.62:5432/dmp?currentSchema=public',
'table-name' = 'base_trade_trade_order',
'username' = 'dmp',
'password' = 'test_hdT65x'
);

-- 操作员信息
CREATE TABLE fofund_fap_operator_for_order1 (
id String, -- 主键
name String, -- 操作员name
operator_code String, -- 操作员code
mobile String, -- 手机号码
delete_flag String,
primary key(id) not ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://10.106.100.54:5432/fofund_fap?currentSchema=public',
'table-name' = 'operator',
'username' = 'fofund_fap',
'password' = 'Test123',
'lookup.cache.max-rows' = '100', -- 缓存记录的最大行数
'lookup.cache.ttl' = '30s' -- 单位s,超过时间自动刷新
);

upsert into trade_order_sink_test1
select
columns[1].value as id,
CAST(columns[4].value AS DECIMAL) as request_bala,
CAST(columns[5].value AS DECIMAL) as request_share,
columns[7].value as audit_time,
columns[8].value as request_date,
m.name as operator_name,
m.id as operator_id,
m.mobile as operator_mobile

from source_trade_order_test1 t
left join fofund_fap_operator_for_order1 FOR SYSTEM_TIME AS OF t.timestamp AS m
on m.operator_code = columns[11].value
and m.delete_flag = '0'
where table='trade_order' and action in ('U','I')
;

84c7b6a54e05a57db3e96595b99239df.png

展开
收起
小小鹿鹿鹿 2024-05-20 20:35:27 19 0
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载