在Flink CDC中,假设场景是将多个MySQL库中的多个表数据写入到一个汇总库的多个表中,该如何操作?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在Flink CDC中,将多个MySQL库中的多个表数据写入到一个汇总库的多个表中,可以通过以下步骤实现。此操作利用了Flink SQL的CREATE DATABASE AS
(CDAS)和CREATE TABLE AS
(CTAS)语句,结合正则表达式匹配、Source复用优化以及分库分表合并同步的能力。
首先需要为源MySQL数据库创建Catalog,以便通过Flink SQL访问其元数据和数据。
CREATE CATALOG mysql_catalog WITH (
'type' = 'mysql',
'hostname' = '<MySQL实例地址>',
'port' = '3306',
'username' = '<用户名>',
'password' = '<密码>',
'default-database' = '<默认数据库>',
'catalog.table.metadata-columns' = 'table_name;database_name'
);
说明: - hostname
和 port
需要根据实际MySQL实例配置。 - catalog.table.metadata-columns
参数用于添加元数据列(如表名和库名),便于后续处理。
为目标汇总库(如Hologres)创建Catalog,用于写入数据。
CREATE CATALOG holo_catalog WITH (
'type' = 'hologres',
'hostname' = '<Hologres实例地址>',
'port' = '80',
'username' = '<用户名>',
'password' = '<密码>',
'default-database' = '<默认数据库>'
);
通过正则表达式匹配多个MySQL库和表,将其数据同步到目标库中。例如,假设MySQL中有多个分库(如db01
到db99
),每个分库下有相同的表结构(如order
和order_detail
)。
USE CATALOG holo_catalog;
BEGIN STATEMENT SET;
-- 同步分库分表数据到Hologres
CREATE TABLE IF NOT EXISTS order_summary
AS TABLE mysql_catalog.`db[0-9]+`.`order`
/*+ OPTIONS('server-id'='8001-8004') */;
CREATE TABLE IF NOT EXISTS order_detail_summary
AS TABLE mysql_catalog.`db[0-9]+`.`order_detail`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
说明: - 正则表达式db[0-9]+
匹配所有以db
开头的分库名称。 - OPTIONS('server-id'='8001-8004')
用于指定MySQL CDC的server-id
范围,确保多并发读取时不会冲突。
如果需要将多个分库分表的数据合并到一张表中,可以利用Flink的联合主键机制。例如,将db01.order
到db99.order
的所有数据合并到Hologres的order_summary
表中。
USE CATALOG holo_catalog;
CREATE TABLE IF NOT EXISTS order_summary (
_db_name STRING,
_table_name STRING,
id BIGINT,
product VARCHAR(50),
user_id BIGINT,
PRIMARY KEY (_db_name, _table_name, id) NOT ENFORCED
)
WITH ('sink.parallelism' = '4')
AS TABLE mysql_catalog.`db[0-9]+`.`order`
/*+ OPTIONS('server-id'='8001-8004') */;
说明: - _db_name
和 _table_name
是系统字段,表示分库分表的来源信息。 - 联合主键(_db_name, _table_name, id)
确保主键唯一性。
为了减少对MySQL的连接数和读取压力,可以使用STATEMENT SET
语法将多个CTAS语句合并为一个作业提交,并复用同一个Source节点。
USE CATALOG holo_catalog;
BEGIN STATEMENT SET;
-- 同步订单表
CREATE TABLE IF NOT EXISTS order_summary
AS TABLE mysql_catalog.`db[0-9]+`.`order`
/*+ OPTIONS('server-id'='8001-8004') */;
-- 同步订单详情表
CREATE TABLE IF NOT EXISTS order_detail_summary
AS TABLE mysql_catalog.`db[0-9]+`.`order_detail`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
重要: - 复用Source的前提是这些表的OPTIONS
参数完全一致。 - 如果server-id
范围不足,可能导致CDC读取失败,请合理分配范围。
如果MySQL源库新增表,可以通过以下方式支持动态捕获新增表并同步数据。
在SQL作业开发时,增加以下配置:
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
当新增表需要同步时: 1. 停止当前作业,并勾选停止前创建一次快照。 2. 在SQL开发页面重新部署作业。 3. 在作业运维页面找到历史快照,选择从该快照恢复作业。
通过上述步骤,您可以实现将多个MySQL库中的多个表数据写入到一个汇总库的多个表中。关键点包括: - 使用正则表达式匹配分库分表。 - 利用联合主键保证数据唯一性。 - 通过STATEMENT SET
优化Source复用。 - 支持新增表的动态同步。
以上方案适用于实时计算Flink版与Hologres的集成场景,能够高效完成复杂的数据同步任务。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。