一直不知道flink cdc自动同步模式变更是如何操作的,有操作方法吗?

一直不知道flink cdc自动同步模式变更是如何操作的,有操作方法吗?

展开
收起
毛毛虫雨 2022-11-27 10:05:09 287 分享 版权
阿里云 AI 助理回答

Flink CDC(Change Data Capture)自动同步模式变更主要涉及两种操作方式:非CDC模式和CDC模式。这两种模式分别适用于不同的数据同步需求,下面将详细介绍如何操作这两种模式进行数据同步。

非CDC模式配置示例:

在实时计算Flink版中,若不使用CDC特性,可以创建如下的Source表来消费Hologres中的Binlog数据,但不会自动识别INSERT、DELETE、UPDATE等操作类型:

CREATE TABLE test_message_src_binlog_table(
    hg_binlog_lsn BIGINT,
    hg_binlog_event_type BIGINT,
    hg_binlog_timestamp_us BIGINT,
    id INTEGER,
    title VARCHAR,
    body VARCHAR
) WITH (
    'connector'='hologres',
    'dbname'='<yourDbname>',
    'tablename'='<yourTablename>',
    'username'='<yourAccessID>',
    'password'='<yourAccessSecret>',
    'endpoint'='<yourEndpoint>',
    'binlog' = 'true',
    'binlogMaxRetryTimes' = '10',
    'binlogRetryIntervalMs' = '500',
    'binlogBatchReadSize' = '100'
);

CDC模式配置示例:

若需要利用Flink CDC的自动同步变更数据功能,即实现类似MySQL或Postgres的CDC能力,应采用如下配置创建Source表:

CREATE TABLE test_message_src_binlog_table(
    id INTEGER,
    title VARCHAR,
    body VARCHAR
) WITH (
    'connector'='hologres',
    'dbname'='<yourDbname>',
    'tablename'='<yourTablename>',
    'username'='<yourAccessID>',
    'password'='<yourAccessSecret>',
    'endpoint'='<yourEndpoint>',
    'binlog' = 'true',
    'cdcMode' = 'true',
    'binlogMaxRetryTimes' = '10',
    'binlogRetryIntervalMs' = '500',
    'binlogBatchReadSize' = '100'
);

在此模式下,Source会根据hg_binlog_event_type自动设置每行数据的Flink RowKind,从而支持INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER等操作类型,实现数据的镜像同步。

实时增全量数据写入操作步骤:

  1. 使用Flink CDC命令启动数据实时同步: 编辑YAML配置文件(例如rds-to-maxcompute.yaml),配置MySQL源和MaxCompute目标等信息后,执行以下命令启动任务:

    ./bin/flink-cdc.sh rds-to-maxcompute.yaml
    

    然后,可以在目标系统(如MaxCompute)中查看数据变化。

  2. 通过DataWorks数据集成进行实时同步: 在DataWorks中配置MySQL数据源,确保其可通过公网访问或已配置NAT网关,然后对接并验证数据源,以实现数据的实时同步。

综上所述,要操作Flink CDC自动同步模式变更,您需根据具体需求选择合适的配置模式,并正确设置相关参数,之后通过执行相应的命令或工具(如Flink CDC命令行、DataWorks界面)来启动数据同步任务。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理