有大佬全库同步Flink cdc 到doris吗 ,小白,目前我这边部署了一个Flink Session 在k8s 上,但是只支持单表同步,还有重启后会重复写入的问题,我应该怎么写?多表变成一个表吗,这个我试过可以 全库同步搞过吗 https://github.com/StarRocks/starrocks-connector-for-apache-flink/issues/226
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
要实现全库同步Flink CDC到Doris(或StarRocks),您需要解决以下几个关键问题:全库同步配置、避免重复写入、多表同步的处理方式。以下是基于您描述的问题和知识库资料提供的详细解决方案。
CTAS(Create Table As Select)和CDAS(Create Database As Select)是Flink中用于整库同步的语法糖。通过CTAS/CDAS,您可以将MySQL中的整个数据库同步到目标存储(如Doris或StarRocks)。
CTAS示例:
CREATE TABLE IF NOT EXISTS target_table
WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
'username' = 'admin',
'password' = '****'
)
AS TABLE mysql_source_table;
CDAS示例:
CREATE DATABASE IF NOT EXISTS sr_db WITH (
'starrocks.create.table.properties' = 'buckets 8',
'starrocks.create.table.mode' = 'simple',
'jdbc-url' = 'jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username' = 'test',
'password' = '1qaz!QAZ'
);
CREATE DATABASE IF NOT EXISTS sr_db AS DATABASE mysql_db INCLUDING TABLES '.*';
注意:INCLUDING TABLES
支持正则表达式,可以灵活选择需要同步的表。
在Flink Session集群上部署Flink CDC Connector,并确保其版本与Flink版本兼容。以下是一个典型的Flink CDC任务启动命令:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
sqlserver-sync-database \
-database db1 \
-sqlserver-conf hostname=127.0.0.1 \
-sqlserver-conf port=1433 \
-sqlserver-conf username=sa \
-sqlserver-conf password="123456" \
-sqlserver-conf database-name=CDC_DB \
-sqlserver-conf schema-name=dbo \
-including-tables "tbl1|test.*" \
-sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
-sink-conf username=admin \
-sink-conf password=****
Flink的Checkpoint机制可以保证任务重启时从上次的状态恢复,而不是重新消费数据。确保您的Flink任务启用了Checkpoint:
execution.checkpointing.interval=10s
execution.checkpointing.mode=EXACTLY_ONCE
如果希望跳过全量数据同步(Snapshot阶段),直接从增量数据开始读取,可以通过以下参数配置:
debezium.snapshot.mode=never
注意:此配置仅适用于增量数据同步场景,且要求目标表已存在。
在目标存储(如Doris或StarRocks)中,确保写入操作具有幂等性。例如,使用UPSERT
模式更新数据,避免重复写入。
Flink CDC支持在一个任务中同步多个表。通过INCLUDING TABLES
参数指定需要同步的表,例如:
-including-tables "table1|table2|test.*"
如果单任务性能不足,可以为每个表创建独立的Flink任务。这种方式适合表数量较多且数据量较大的场景。
如果您尝试将多表合并为一个表,可以通过Flink SQL的UNION ALL
操作实现。例如:
CREATE TABLE merged_table AS
SELECT * FROM table1
UNION ALL
SELECT * FROM table2;
注意:此方法适用于表结构相同或相似的场景。
如果MySQL源表发生表结构变更(如新增列、修改列类型),可能会导致同步失败。此时需要: - 停止Flink任务。 - 删除目标表并重新创建。 - 无状态地重启Flink任务。
确保MySQL用户具有以下权限: - REPLICATION SLAVE
- REPLICATION CLIENT
- SELECT
如果您在实施过程中遇到具体问题,可以参考官方文档或社区讨论,进一步优化配置。