Flink CDC 纯写sql的方式可以实现1对多关联,把mysql数据写入es么?
写入后数据长这样
{
"id": 1,
"name": "张三",
"scoreList": [
{
"id": 1,
"name": "语文",
"score": 90
},
{
"id": 2,
"name": "数学",
"score": 80
}
]
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
要实现将MySQL中的数据通过Flink CDC以纯SQL的方式写入Elasticsearch,并生成类似以下结构的嵌套JSON文档:
{
"id": 1,
"name": "张三",
"scoreList": [
{
"id": 1,
"name": "语文",
"score": 90
},
{
"id": 2,
"name": "数学",
"score": 80
}
]
}
以下是详细的实现步骤和配置说明。
假设MySQL中有两张表: - 用户表(user):存储用户基本信息。
CREATE TABLE user (
id INT PRIMARY KEY,
name STRING
);
user_id
字段关联。
CREATE TABLE score (
id INT PRIMARY KEY,
user_id INT,
subject STRING,
score DOUBLE
);
使用Flink CDC连接器读取MySQL中的user
表和score
表。需要为每个表指定server-id
以避免冲突。
-- 用户表CDC源
CREATE TABLE mysql_user (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<MySQL主机名>',
'port' = '<MySQL端口>',
'username' = '<用户名>',
'password' = '<密码>',
'database-name' = '<数据库名>',
'table-name' = 'user',
'server-id' = '5400-5401'
);
-- 成绩表CDC源
CREATE TABLE mysql_score (
id INT,
user_id INT,
subject STRING,
score DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<MySQL主机名>',
'port' = '<MySQL端口>',
'username' = '<用户名>',
'password' = '<密码>',
'database-name' = '<数据库名>',
'table-name' = 'score',
'server-id' = '5402-5403'
);
定义Elasticsearch的目标表结构,支持嵌套JSON格式。使用ARRAY
类型表示scoreList
字段。
CREATE TABLE es_sink (
id INT,
name STRING,
scoreList ARRAY<ROW<id INT, name STRING, score DOUBLE>>
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '<Elasticsearch地址>',
'index' = '<索引名称>',
'document-type' = '_doc',
'username' = '<用户名>',
'password' = '<密码>',
'format' = 'json'
);
通过SQL实现1对多关联,并将score
表的数据聚合为数组,嵌套到scoreList
字段中。
INSERT INTO es_sink
SELECT
u.id AS id,
u.name AS name,
ARRAY_AGG(
ROW(s.id, s.subject, s.score)
) AS scoreList
FROM
mysql_user AS u
LEFT JOIN
mysql_score AS s
ON
u.id = s.user_id
GROUP BY
u.id, u.name;
server-id
范围不重叠,避免Binlog消费冲突。server-id
指定范围,范围大小应不小于并行度。ARRAY
类型,用于表示嵌套的JSON数组。ROW
类型用于定义数组中每个元素的结构。sink.parallelism
和batchSize
参数以提升写入性能。通过上述步骤,您可以使用Flink CDC纯SQL方式实现MySQL到Elasticsearch的1对多关联写入,并生成嵌套JSON文档。关键在于正确配置MySQL CDC源表、定义Elasticsearch目标表结构,以及利用ARRAY_AGG
函数完成数据聚合。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。