有没有大佬做过Flink的mysql表的数据?

有没有大佬做过Flink的mysql表的数据?把一对多的父子表同步到es中的需求呢,我想知道flink sql是否有这样的能力,然后我就写sql去试,就发现这一条sql可以跑通,但是是固定数据
INSERT INTO es_album_info
SELECT a.id,a.name,author_id authorId,(select ROW(id,name) from author where a.author_id = id) as author,
ARRAY[ROW(1,'张三'),ROW(2,'李四')] as tags
FROM album_info a;
我按照我的想法把它改成读取表中的数据,就会报错了像这样
INSERT INTO es_album_info
SELECT a.id,a.name,author_id authorId,(select ROW(id,name) from author where a.author_id = id) as author,
ARRAY[(select ROW(id,name) from tags where a.id = album_id)] as tags
FROM album_info a;

展开
收起
三分钟热度的鱼 2023-09-04 21:01:33 80 分享 版权
1 条回答
写回答
取消 提交回答
  • Flink SQL支持从MySQL表中读取数据,并且可以进行一对多的父子表同步到ES的需求。对于您的第一个SQL语句,您将一对多的关系直接写在SQL中,所以它可以成功运行。

    而对于您的第二个SQL语句,您尝试将数据从MySQL表中读取,并将其作为数组插入到ES中,但遇到了错误。这是因为Flink SQL当前的版本(包括1.13版本)不支持将子查询的结果作为数组直接插入到ES中。

    解决这个问题的一种方法是使用Flink的Table API或DataStream API来编写自定义的转换逻辑,将MySQL表中的数据读取为流数据,并将其转换为ES需要的格式,然后将结果写入ES中。这种方式可以更灵活地处理复杂的数据转换和逻辑。

    另外,Flink 1.14版本引入了新的SQL语法LATERAL TABLE,可以在SQL中使用LATERAL TABLE关键字执行类似子查询的操作,但目前该版本还处于预览阶段(Preview)。

    如果您仍希望继续使用SQL来实现该需求,建议您考虑使用Flink的UDF(User-Defined Function)来编写自定义的函数,以处理复杂的数据转换逻辑,并在SQL中调用这些函数。

    总结:Flink SQL当前版本(包括1.13版本)不支持将子查询的结果直接作为数组插入到ES中。您可以考虑使用Table API或DataStream API进行自定义转换,或等待Flink 1.14版本正式发布,以使用新的LATERAL TABLE语法。另外,使用UDF可以帮助您在SQL中处理复杂的数据转换逻辑。

    2023-09-28 14:25:55
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理