Flink CDC里两个表关联,一个表更新了,另一个表很久后才更新。怎么做实时同步呢?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC中实现两个表的实时同步,尤其是当一个表更新后另一个表延迟更新时,需要结合Flink的流式处理能力和Hologres的宽表Merge功能来完成。以下是详细的解决方案:
根据知识库中的资料,可以通过以下步骤实现:
源表定义:
source1
和 source2
,分别对应两个表的数据更新。source1
包含字段 a, b, c
,source2
包含字段 a, d, e
。WIDE_TABLE
包含字段 a, b, c, d, e
,其中 a
是主键。目标:
source1
和 source2
的数据实时写入宽表 WIDE_TABLE
,并根据主键 a
合并数据。根据知识库中的两种宽表Merge方式,推荐使用 方式一(适用于VVR 6.0.7及以上版本),因为它支持部分列更新,能够更高效地处理延迟更新的数据。
创建Hologres结果表: 使用Flink SQL创建一张Hologres结果表,映射到宽表 WIDE_TABLE
,并启用部分列更新功能。
CREATE TEMPORARY TABLE hologres_sink (
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDbname>',
'tablename' = '<yourWideTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>',
'mutatetype' = 'insertorupdate', -- 根据主键更新数据
'ignoredelete' = 'true', -- 忽略回撤消息产生的Delete请求
'partial-insert.enabled' = 'true' -- 开启部分列更新参数
);
插入数据: 将两个数据流的数据分别插入到结果表中,仅更新指定的列。
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a, b, c) SELECT * FROM source1; -- 插入source1的数据
INSERT INTO hologres_sink(a, d, e) SELECT * FROM source2; -- 插入source2的数据
END;
延迟更新处理:
source1
更新时,宽表中 b, c
字段会被更新。source2
更新时,宽表中 d, e
字段会被更新。为了确保实时性,可以选择以下两种消费模式之一: - CDC模式: - 在CDC模式下,Flink会根据 hg_binlog_event_type
自动为每行数据设置准确的Flink RowKind类型(如INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER)。 - 这种模式适合需要镜像同步的场景,能够捕获源表的所有变更。 - 非CDC模式: - 如果只需要捕获增量数据,可以选择非CDC模式,减少不必要的开销。
binlogMaxRetryTimes
和 binlogRetryIntervalMs
)以提高容错能力。通过上述方法,您可以实现两个表的实时同步,并处理延迟更新的问题。关键点在于: - 使用Flink CDC捕获源表的变更。 - 利用Hologres的宽表Merge功能和部分列更新特性,确保宽表数据的实时性和一致性。 - 根据业务需求选择合适的消费模式(CDC或非CDC)。
如果您的实时计算引擎版本低于VVR 6.0.7,可以考虑使用 方式二(多表映射),但需要注意其对部分列更新的支持有限。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。