Flink CDC里整库同步得时候怎么排除没有主键得表?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用Flink CDC进行整库同步时,如果需要排除没有主键的表,可以通过以下方法实现:
CDAS
语句结合正则表达式过滤在使用CREATE DATABASE AS (CDAS)
语句进行整库同步时,可以通过INCLUDING
或EXCLUDING
关键字结合正则表达式来筛选需要同步的表。具体操作如下:
明确主键表的命名规则
如果数据库中主键表和无主键表的命名有规律(例如主键表以_pk
结尾),可以利用正则表达式匹配主键表。
配置CDAS语句
在CDAS语句中,使用EXCLUDING
关键字排除无主键表。示例如下:
USE CATALOG holo;
CREATE DATABASE IF NOT EXISTS holo_tpcds
WITH ('sink.parallelism' = '4')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
EXCLUDING TABLES 'no_pk_table1|no_pk_table2|.*_nopk'
/*+ OPTIONS('server-id'='8001-8004') */;
说明:
- EXCLUDING TABLES
后接正则表达式,用于指定需要排除的表名模式。 - 示例中排除了名为no_pk_table1
、no_pk_table2
以及以_nopk
结尾的表。
验证同步结果
执行上述语句后,只有符合规则且包含主键的表会被同步到目标端。
如果无法通过正则表达式直接排除无主键表,可以手动筛选出包含主键的表,并为这些表单独创建同步任务。具体步骤如下:
查询MySQL中的表结构
使用以下SQL语句查询数据库中所有表的主键信息:
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'your_database_name'
AND table_name IN (
SELECT table_name
FROM information_schema.key_column_usage
WHERE constraint_name = 'PRIMARY'
);
说明:
- 该查询会返回指定数据库中所有包含主键的表名。 - 将结果保存为后续同步任务的依据。
为包含主键的表创建同步任务
根据查询结果,为每张主键表单独编写CREATE TABLE AS (CTAS)
或CREATE DATABASE AS (CDAS)
语句。例如:
CREATE TABLE target_table1
WITH ('connector' = 'mysql-cdc', ...)
AS TABLE source_table1;
CREATE TABLE target_table2
WITH ('connector' = 'mysql-cdc', ...)
AS TABLE source_table2;
部署并运行任务
将上述任务逐一部署到Flink集群中,确保仅同步包含主键的表。
scan.incremental.snapshot.chunk.key-column
处理无主键表如果某些无主键表必须同步,但又希望避免因缺少主键导致的数据一致性问题,可以通过设置scan.incremental.snapshot.chunk.key-column
参数来指定一个非空字段作为分片列。具体配置如下:
检查表结构
确认无主键表中是否存在非空字段,例如id
或created_at
。
配置CDC源表参数
在WITH
参数中添加scan.incremental.snapshot.chunk.key-column
,指定分片列。例如:
CREATE TABLE no_pk_table (
id BIGINT,
name STRING,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'your_hostname',
'port' = '3306',
'username' = 'your_username',
'password' = 'your_password',
'database-name' = 'your_database',
'table-name' = 'no_pk_table',
'scan.incremental.snapshot.chunk.key-column' = 'id'
);
说明:
- 指定的分片列必须是非空字段,否则会导致作业失败。 - 该方法仅适用于Flink计算引擎VVR 6.0.7及以上版本。
注意数据一致性
如果指定的分片列可能发生更新操作,则只能保证At least once
语义。建议结合下游系统的幂等性逻辑,确保数据正确性。
server-id
范围和分片参数,避免Binlog消费冲突和内存溢出问题。通过上述方法,您可以有效排除无主键表或针对特定场景灵活处理无主键表的同步需求。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。