Flink SQL 如何实现列转行?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: 在 SQL 任务里面经常会遇到一列转多行的需求,今天就来总结一下在 Flink SQL 里面如何实现列转行的,先来看下面的一个具体案例.需求原始数据格式如下:name dataJasonLee [{"content_type":"flink","url":"111"},{"content_type":"spark","url":"222"},{"content_type":"hadoop","url":"333"}]data 格式化

在 SQL 任务里面经常会遇到一列转多行的需求,今天就来总结一下在 Flink SQL 里面如何实现列转行的,先来看下面的一个具体案例.


需求

原始数据格式如下:


name  data
JasonLee  [{"content_type":"flink","url":"111"},{"content_type":"spark","url":"222"},{"content_type":"hadoop","url":"333"}]


data 格式化


{
 "name": "JasonLee",
 "data": [{
   "content_type": "flink",
   "url": "111"
  }, {
   "content_type": "spark",
   "url": "222"
  },
  {
   "content_type": "hadoop",
   "url": "333"
  }
 ]
}


现在希望得到的数据格式是这样的:


name  content_type  url
JasonLee  flink 111
JasonLee  spark 222
JasonLee  hadoop  333


这是一个典型的列转行或者一行转多行的场景,需要将 data 列进行拆分成为多行多列,下面介绍两种实现方式.


使用 Flink 自带的 unnest 函数解析


使用自定义 UDTF 函数解析


建表 DDL


CREATE TABLE kafka_table (
name string,
`data` ARRAY<ROW<content_type STRING,url STRING>>
)
WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'test',
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test', -- 消费kafka的group_id
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json',  -- 数据源格式为 json
    'json.fail-on-missing-field' = 'false', -- 字段丢失任务不失败
    'json.ignore-parse-errors' = 'true'  -- 解析失败跳过
)


这里在定义 data 字段类型的时候需要定义为 ARRAY 类型,因为 unnest 函数需要一个数组类型的参数.


unnest 解析


select name,content_type,url
from kafka_table CROSS JOIN UNNEST(`data`) AS t (content_type,url)


select name,content_type,url
from kafka_table, UNNEST(`data`) AS t (content_type,url)


select name,content_type,url
from kafka_table left join UNNEST(`data`) AS t (content_type,url) on true


自定义 UDTF 解析


自定义表值函数(UDTF),自定义表值函数,将 0 个、1 个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是 1 个值。返回的行可以由 1 个或多个列组成。调用一次函数输出多行或多列数据。必须继承 TableFunction 基类,并实现一个或者多个名为 eval 的方法, 在使用 UDTF 时,需要带上 LATERAL TABLE两个关键字.


@FunctionHint(output = @DataTypeHint("ROW<content_type STRING,url STRING>"))
public class ParserJsonArrayTest extends TableFunction<Row> {
    private static final Logger log = Logger.getLogger(ParserJsonArrayTest.class);
    public void eval(String value) {
        try {
            JSONArray snapshots = JSONArray.parseArray(value);
            Iterator<Object> iterator = snapshots.iterator();
            while (iterator.hasNext()) {
                JSONObject jsonObject = (JSONObject) iterator.next();
                String content_type = jsonObject.getString("content_type");
                String url = jsonObject.getString("url");
                collect(Row.of(content_type,url));
            }
        } catch (Exception e) {
            log.error("parser json failed :" + e.getMessage());
        }
    }
}


自定义 UDTF 解析的时候,就不需要把 data 字段定义成 ARRAY 类型了,直接定义成 STRING 类型就可以了,并且这种方式会更加的灵活,比如还需要过滤数据或者更复杂的一些操作时都可以在 UDTF 里面完成.


Flink SQL 使用 UDTF


select name,content_type,url
from kafka_table CROSS JOIN lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)


select name,content_type,url
from kafka_table, lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)


select name,content_type,url
from kafka_table left join lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url) on true


注意:


unnest 和 自定义 UDTF 函数在使用的时候都有 3 种写法,前面两种写法的效果其实是一样的,第三种写法相当于 left join 的用法.区别在于 CROSS JOIN/INNER JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行不输出.LEFT JOIN: 对于左侧表的每一行,右侧 UDTF 不输出,则这一行会输出,右侧 UDTF 字段为 null


打印的结果


2> JasonLee,flink,111
2> JasonLee,spark,222
2> JasonLee,hadoop,333


总结


在实际使用的时候如果 unnest 可以满足需求就直接用 unnest 不需要带来额外的开发,如果 unnest 函数满足不了需求,那么就自定义 UDTF 去完成.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
215 15
|
24天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
102 14
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
64 0
|
4月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
55 1
|
SQL 关系型数据库 Oracle
SQL 列转行的实现
--列转行,逗号拼接指定列的值SQL Server中写法:SELECT STUFF(( SELECT  ',' + Field1 from TableA FOR XML PATH('')), 1, 1, '') Oracle中写法: 方法一:wmsys.
1066 0
|
4月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
6月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
147 13
|
6月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
|
6月前
|
SQL 存储 网络安全
关系数据库SQLserver 安装 SQL Server
【7月更文挑战第26天】
82 6
|
6月前
|
存储 SQL C++
对比 SQL Server中的VARCHAR(max) 与VARCHAR(n) 数据类型
【7月更文挑战7天】SQL Server 中的 VARCHAR(max) vs VARCHAR(n): - VARCHAR(n) 存储最多 n 个字符(1-8000),适合短文本。 - VARCHAR(max) 可存储约 21 亿个字符,适合大量文本。 - VARCHAR(n) 在处理小数据时性能更好,空间固定。 - VARCHAR(max) 对于大文本更合适,但可能影响性能。 - 选择取决于数据长度预期和业务需求。
505 1