Flink SQL 解析嵌套的 JSON 数据

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 在日常的开发中,最常用的数据格式是 JSON ,并且有的时候 JSON 的格式是非常复杂的(嵌套的格式),那在 Flink SQL 中进行解析的时候也会相当麻烦一点,下面将会演示如何在 DDL 里面定义 Map、Array、Row 类型的数据,以及在 SQL 里面如何获里面的值数据格式如下:以下数据完全是自己造的,没有任何实际含义

在日常的开发中,最常用的数据格式是 JSON ,并且有的时候 JSON 的格式是非常复杂的(嵌套的格式),那在 Flink SQL 中进行解析的时候也会相当麻烦一点,下面将会演示如何在 DDL 里面定义 Map、Array、Row 类型的数据,以及在 SQL 里面如何获里面的值


数据格式如下:


以下数据完全是自己造的,没有任何实际含义


{
 "funcName": "test",
 "data": {
  "snapshots": [{
   "content_type": "application/x-gzip-compressed-jpeg",
   "url": "https://blog.csdn.net/xianpanjia4616"
  }],
  "audio": [{
   "content_type": "audio/wav",
   "url": " https://bss.csdn.net/m/topic/blog_star2020/detail?username=xianpanjia4616"
  }]
 },
 "resultMap": {
  "result": {
   "cover": "/data/test/log.txt"
  },
  "isSuccess": true
 },
 "meta": {
  "video_type": "normal"
 },
 "type": 2,
 "timestamp": 1610549997263,
 "arr": [{
  "address": "北京市海淀区",
  "city": "beijing"
 }, {
  "address": "北京市海淀区",
  "city": "beijing"
 }, {
  "address": "北京市海淀区",
  "city": "beijing"
 }],
 "map": {
  "flink": 456
 },
 "doublemap": {
  "inner_map": {
   "key": 123
  }
 }
}


上面的数据包含了 Map、Array、Row 等类型, 对于这样的数据格式,在建表 DDL 里面应该如何定义呢?


定义 DDL


CREATE TABLE kafka_source (
    funcName STRING,
    data ROW<snapshots ARRAY<ROW<content_type STRING,url STRING>>,audio ARRAY<ROW<content_type STRING,url STRING>>>,
    resultMap ROW<`result` MAP<STRING,STRING>,isSuccess BOOLEAN>,
    meta  MAP<STRING,STRING>,
    `type` INT,
    `timestamp` BIGINT,
    arr ARRAY<ROW<address STRING,city STRING>>,
    map MAP<STRING,INT>,
    doublemap MAP<STRING,MAP<STRING,INT>>,
    proctime as PROCTIME()
) WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'test',  -- kafka topic
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test', -- 消费kafka的group_id
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json',  -- 数据源格式为 json
    'json.fail-on-missing-field' = 'true', -- 字段丢失任务不失败
    'json.ignore-parse-errors' = 'false'  -- 解析失败跳过
)


解析 SQL


select
funcName,
doublemap['inner_map']['key'],
count(data.snapshots[1].url),
`type`,
TUMBLE_START(proctime, INTERVAL '30' second) as t_start
from kafka_source
group by TUMBLE(proctime, INTERVAL '30' second),funcName,`type`,doublemap['inner_map']['key']


SQL 运行的结果


4> (true,test,123,6,2,2021-01-15T03:31)
4> (false,test,123,6,2,2021-01-15T03:31)
4> (true,test,123,8,2,2021-01-15T03:31)
4> (false,test,123,8,2,2021-01-15T03:31)
4> (true,test,123,10,2,2021-01-15T03:31)
4> (false,test,123,10,2,2021-01-15T03:31)
4> (true,test,123,13,2,2021-01-15T03:31)
4> (false,test,123,13,2,2021-01-15T03:31)
4> (true,test,123,15,2,2021-01-15T03:31)
4> (true,test,123,3,2,2021-01-15T03:31:30)


说明数据都可以正常的解析出来,如果遇到更加复杂的 JSON 格式的数据,只需要比葫芦画瓢就可以了,在复杂的格式都不是问题.


数据类型映射


目前,JSON 模式总是从表模式派生。目前还不支持显式定义 JSON 模式。Flink JSON 格式使用 jackson databind API 来解析和生成JSON 字符串。下表列出了从 Flink 类型到 JSON 类型的映射。



注意事项:


Json 中的每个 {} 都需要用 Row 类型来表示


Json 中的每个 [] 都需要用 Arrary 类型来表示


数组的下标是从 1 开始的不是 0 如上面 SQL 中的 data.snapshots[1].url


关键字在任何地方都需要加反引号 如上面 SQL 中的 `type`


select 语句中的字段类型和顺序一定要和结果表的字段类型和顺序保持一致


UDF 可以直接在建表语句中使用


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8天前
|
SQL 数据库
SQL解析相关报错
SQL解析相关报错
28 5
|
26天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
90 15
|
13天前
|
SQL 安全 数据库
Python Web开发者必看!SQL注入、XSS、CSRF全面解析,守护你的网站安全!
在Python Web开发中,构建安全应用至关重要。本文通过问答形式,详细解析了三种常见Web安全威胁——SQL注入、XSS和CSRF,并提供了实用的防御策略及示例代码。针对SQL注入,建议使用参数化查询;对于XSS,需对输出进行HTML编码;而防范CSRF,则应利用CSRF令牌。通过这些措施,帮助开发者有效提升应用安全性,确保网站稳定运行。
28 1
|
28天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
56 2
|
28天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
31 1
|
27天前
|
存储 JSON API
Python编程:解析HTTP请求返回的JSON数据
使用Python处理HTTP请求和解析JSON数据既直接又高效。`requests`库的简洁性和强大功能使得发送请求、接收和解析响应变得异常简单。以上步骤和示例提供了一个基础的框架,可以根据你的具体需求进行调整和扩展。通过合适的异常处理,你的代码将更加健壮和可靠,为用户提供更加流畅的体验。
64 0
|
2月前
|
SQL 数据处理 数据库
|
2月前
|
监控 网络协议 Java
Tomcat源码解析】整体架构组成及核心组件
Tomcat,原名Catalina,是一款优雅轻盈的Web服务器,自4.x版本起扩展了JSP、EL等功能,超越了单纯的Servlet容器范畴。Servlet是Sun公司为Java编程Web应用制定的规范,Tomcat作为Servlet容器,负责构建Request与Response对象,并执行业务逻辑。
Tomcat源码解析】整体架构组成及核心组件
|
2月前
|
存储 NoSQL Redis
redis 6源码解析之 object
redis 6源码解析之 object
58 6

推荐镜像

更多
下一篇
无影云桌面