一直不知道flink cdc自动同步模式变更是如何操作的,有操作方法吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink CDC(Change Data Capture)自动同步模式变更主要涉及两种操作方式:非CDC模式和CDC模式。这两种模式分别适用于不同的数据同步需求,下面将详细介绍如何操作这两种模式进行数据同步。
在实时计算Flink版中,若不使用CDC特性,可以创建如下的Source表来消费Hologres中的Binlog数据,但不会自动识别INSERT、DELETE、UPDATE等操作类型:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
若需要利用Flink CDC的自动同步变更数据功能,即实现类似MySQL或Postgres的CDC能力,应采用如下配置创建Source表:
CREATE TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
在此模式下,Source会根据hg_binlog_event_type
自动设置每行数据的Flink RowKind,从而支持INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER等操作类型,实现数据的镜像同步。
使用Flink CDC命令启动数据实时同步: 编辑YAML配置文件(例如rds-to-maxcompute.yaml),配置MySQL源和MaxCompute目标等信息后,执行以下命令启动任务:
./bin/flink-cdc.sh rds-to-maxcompute.yaml
然后,可以在目标系统(如MaxCompute)中查看数据变化。
通过DataWorks数据集成进行实时同步: 在DataWorks中配置MySQL数据源,确保其可通过公网访问或已配置NAT网关,然后对接并验证数据源,以实现数据的实时同步。
综上所述,要操作Flink CDC自动同步模式变更,您需根据具体需求选择合适的配置模式,并正确设置相关参数,之后通过执行相应的命令或工具(如Flink CDC命令行、DataWorks界面)来启动数据同步任务。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。