用的flink-sql-connector-postgres-cdc-3.3.0.jar
现在现象是源表物理删除数据,结果表也跟着物理删除了
-- 1. 源表定义(PostgreSQL CDC)
CREATE TABLE
service_area_vehicle_pass_test (
id BIGINT COMMENT 'ID',
service_area_id INT COMMENT '服务区ID',
plate_no STRING COMMENT '车牌号',
plate_type INT COMMENT '车牌类型',
pass_time_in STRING COMMENT '驶入时间',
pass_time_out STRING COMMENT '驶出时间',
crossing_id_in INT COMMENT '驶入收费口ID',
crossing_id_out INT COMMENT '驶出收费口ID',
pass_id_in INT COMMENT '驶入记录ID',
pass_id_out INT COMMENT '驶出记录ID',
in_out_type INT COMMENT '出入类型',
vehicle_type INT COMMENT '车辆类型',
database_name STRING METADATA
FROM
'database_name' VIRTUAL,
schema_name STRING METADATA
FROM
'schema_name' VIRTUAL,
table_name STRING METADATA
FROM
'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ (3) METADATA
FROM
'op_ts' VIRTUAL,
row_kind STRING METADATA
FROM
'row_kind' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
)
WITH
(
'connector' = 'postgres-cdc',
'hostname' = '...',
'port' = '5432',
'username' = 'postgres',
'password' = 'root',
'database-name' = 'ds_test',
'schema-name' = 'public',
'table-name' = 'service_area_vehicle_pass_test',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'flink_slot',
'scan.incremental.snapshot.enabled' = 'true',
'debezium.snapshot.mode' = 'initial',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.skipped.operations' = 'none', -- 确保不跳过任何操作
'scan.incremental.snapshot.chunk.size' = '8096' -- 适当调整chunk大小
);
-- 2. 目标表(StarRocks sink)
CREATE TABLE
ods_service_area_vehicle_pass (
id
BIGINT NOT NULL COMMENT 'ID',
service_area_id
INT COMMENT '服务区ID',
plate_no
STRING COMMENT '车牌号',
plate_type
INT COMMENT '车牌类型',
pass_time_in
STRING COMMENT '驶入时间',
pass_time_out
STRING COMMENT '驶出时间',
crossing_id_in
INT COMMENT '驶入收费口ID',
crossing_id_out
INT COMMENT '驶出收费口ID',
pass_id_in
INT COMMENT '驶入记录ID',
pass_id_out
INT COMMENT '驶出记录ID',
in_out_type
INT COMMENT '出入类型',
vehicle_type
INT COMMENT '车辆类型',
row_kind
STRING NOT NULL COMMENT '操作类型: INSERT/UPDATE/DELETE',
rid
STRING COMMENT '记录ID',
is_del
INT COMMENT '是否删除: 0-未删除,1-已删除',
idt
TIMESTAMP(3) COMMENT '创建时间',
udt
TIMESTAMP(3) COMMENT '更新时间',
ddt
TIMESTAMP(3) COMMENT '删除时间',
PRIMARY KEY (id
) NOT ENFORCED
)
WITH
(
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://...:9030',
'load-url' = '...:8030',
'database-name' = 'ds_lin',
'table-name' = 'cdc_ods_service_area_vehicle_pass',
'username' = 'root',
'password' = 'root',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.buffer-flush.interval-ms' = '5000',
'sink.max-retries' = '3'
);
-- 3. 数据处理逻辑(从PostgreSQL读取,写入StarRocks)
-- 1. 插入 +I 和 +U(正常流程)
INSERT INTO ods_service_area_vehicle_pass
SELECT
s.id,
s.service_area_id,
s.plate_no,
s.plate_type,
s.pass_time_in,
s.pass_time_out,
s.crossing_id_in,
s.crossing_id_out,
s.pass_id_in,
s.pass_id_out,
s.in_out_type,
s.vehicle_type,
s.row_kind,
CONCAT(
CAST(COALESCE(s.service_area_id, 99) AS STRING),
s.plate_no,
DATE_FORMAT(
TO_TIMESTAMP(
SUBSTRING(
COALESCE(s.pass_time_in, s.pass_time_out),
1,
CASE
WHEN LOCATE('.', COALESCE(s.pass_time_in, s.pass_time_out)) > 0
THEN LOCATE('.', COALESCE(s.pass_time_in, s.pass_time_out)) - 1
ELSE CHAR_LENGTH(COALESCE(s.pass_time_in, s.pass_time_out))
END
),
'd/M/yyyy HH:mm:ss'
),
'yyyyMMdd-HH'
)
) AS rid,
0 AS is_del,
CASE WHEN s.row_kind = '+I' THEN s.operation_ts ELSE NULL END AS idt,
s.operation_ts AS udt,
CAST(NULL AS TIMESTAMP(3)) AS ddt
FROM service_area_vehicle_pass_test s
WHERE s.row_kind IN ('+I', '+U');
-- 2. 将 -D 转为 +U 更新 is_del=1
INSERT INTO ods_service_area_vehicle_pass
SELECT
s.id,
s.service_area_id,
s.plate_no,
s.plate_type,
s.pass_time_in,
s.pass_time_out,
s.crossing_id_in,
s.crossing_id_out,
s.pass_id_in,
s.pass_id_out,
s.in_out_type,
s.vehicle_type,
'+U' AS row_kind,
CONCAT(
CAST(COALESCE(s.service_area_id, 99) AS STRING),
s.plate_no,
DATE_FORMAT(
TO_TIMESTAMP(
SUBSTRING(
COALESCE(s.pass_time_in, s.pass_time_out),
1,
CASE
WHEN LOCATE('.', COALESCE(s.pass_time_in, s.pass_time_out)) > 0
THEN LOCATE('.', COALESCE(s.pass_time_in, s.pass_time_out)) - 1
ELSE CHAR_LENGTH(COALESCE(s.pass_time_in, s.pass_time_out))
END
),
'd/M/yyyy HH:mm:ss'
),
'yyyyMMdd-HH'
)
) AS rid,
1 AS is_del,
CAST(NULL AS TIMESTAMP(3)) AS idt,
s.operation_ts AS udt,
s.operation_ts AS ddt
FROM service_area_vehicle_pass_test s
WHERE s.row_kind = '-D';
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。