有大佬全库同步Flink cdc 到doris吗 ,小白,目前我这边部署了一个全库同步搞过吗?

有大佬全库同步Flink cdc 到doris吗 ,小白,目前我这边部署了一个Flink Session 在k8s 上,但是只支持单表同步,还有重启后会重复写入的问题,我应该怎么写?多表变成一个表吗,这个我试过可以 全库同步搞过吗image.png https://github.com/StarRocks/starrocks-connector-for-apache-flink/issues/226

展开
收起
真的很搞笑 2023-05-28 21:03:17 118 分享 版权
阿里云 AI 助理回答

要实现全库同步Flink CDC到Doris(或StarRocks),您需要解决以下几个关键问题:全库同步配置、避免重复写入、多表同步的处理方式。以下是基于您描述的问题和知识库资料提供的详细解决方案。


一、全库同步的实现方法

1. 使用CTAS/CDAS语句

CTAS(Create Table As Select)和CDAS(Create Database As Select)是Flink中用于整库同步的语法糖。通过CTAS/CDAS,您可以将MySQL中的整个数据库同步到目标存储(如Doris或StarRocks)。

  • CTAS示例

    CREATE TABLE IF NOT EXISTS target_table
    WITH (
      'connector' = 'doris',
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'username' = 'admin',
      'password' = '****'
    )
    AS TABLE mysql_source_table;
    
  • CDAS示例

    CREATE DATABASE IF NOT EXISTS sr_db WITH (
      'starrocks.create.table.properties' = 'buckets 8',
      'starrocks.create.table.mode' = 'simple',
      'jdbc-url' = 'jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'username' = 'test',
      'password' = '1qaz!QAZ'
    );
    
    CREATE DATABASE IF NOT EXISTS sr_db AS DATABASE mysql_db INCLUDING TABLES '.*';
    

    注意INCLUDING TABLES支持正则表达式,可以灵活选择需要同步的表。

2. 配置Flink CDC Connector

在Flink Session集群上部署Flink CDC Connector,并确保其版本与Flink版本兼容。以下是一个典型的Flink CDC任务启动命令:

<FLINK_HOME>/bin/flink run \
  -Dexecution.checkpointing.interval=10s \
  -Dparallelism.default=1 \
  -c org.apache.doris.flink.tools.cdc.CdcTools \
  lib/flink-doris-connector-1.16-1.5.2.jar \
  sqlserver-sync-database \
  -database db1 \
  -sqlserver-conf hostname=127.0.0.1 \
  -sqlserver-conf port=1433 \
  -sqlserver-conf username=sa \
  -sqlserver-conf password="123456" \
  -sqlserver-conf database-name=CDC_DB \
  -sqlserver-conf schema-name=dbo \
  -including-tables "tbl1|test.*" \
  -sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
  -sink-conf username=admin \
  -sink-conf password=****

二、避免重启后重复写入

1. 检查Checkpoint配置

Flink的Checkpoint机制可以保证任务重启时从上次的状态恢复,而不是重新消费数据。确保您的Flink任务启用了Checkpoint:

execution.checkpointing.interval=10s
execution.checkpointing.mode=EXACTLY_ONCE

2. 跳过Snapshot阶段

如果希望跳过全量数据同步(Snapshot阶段),直接从增量数据开始读取,可以通过以下参数配置:

debezium.snapshot.mode=never

注意:此配置仅适用于增量数据同步场景,且要求目标表已存在。

3. 确保幂等性

在目标存储(如Doris或StarRocks)中,确保写入操作具有幂等性。例如,使用UPSERT模式更新数据,避免重复写入。


三、多表同步的处理方式

1. 单任务多表同步

Flink CDC支持在一个任务中同步多个表。通过INCLUDING TABLES参数指定需要同步的表,例如:

-including-tables "table1|table2|test.*"

2. 多任务并行同步

如果单任务性能不足,可以为每个表创建独立的Flink任务。这种方式适合表数量较多且数据量较大的场景。

3. 合并多表为一个表

如果您尝试将多表合并为一个表,可以通过Flink SQL的UNION ALL操作实现。例如:

CREATE TABLE merged_table AS
SELECT * FROM table1
UNION ALL
SELECT * FROM table2;

注意:此方法适用于表结构相同或相似的场景。


四、其他注意事项

1. 表结构变更的处理

如果MySQL源表发生表结构变更(如新增列、修改列类型),可能会导致同步失败。此时需要: - 停止Flink任务。 - 删除目标表并重新创建。 - 无状态地重启Flink任务。

2. 数据库权限配置

确保MySQL用户具有以下权限: - REPLICATION SLAVE - REPLICATION CLIENT - SELECT

3. 性能优化

  • 调整并行度:根据数据量和集群资源调整Flink任务的并行度。
  • 增加带宽:如果数据量较大,建议增加网络带宽以避免瓶颈。

五、总结与建议

  1. 全库同步:推荐使用CTAS/CDAS语句,结合Flink CDC Connector实现。
  2. 避免重复写入:启用Checkpoint机制,跳过Snapshot阶段,并确保目标存储的幂等性。
  3. 多表同步:根据实际需求选择单任务多表同步或多任务并行同步。
  4. 表结构变更:及时处理表结构变更,避免同步失败。

如果您在实施过程中遇到具体问题,可以参考官方文档或社区讨论,进一步优化配置。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

大数据领域前沿技术分享与交流,这里不止有技术干货、学习心得、企业实践、社区活动,还有未来。

还有其他疑问?
咨询AI助理