Flink SQL 解析嵌套的 JSON 数据

简介: 在日常的开发中,最常用的数据格式是 JSON ,并且有的时候 JSON 的格式是非常复杂的(嵌套的格式),那在 Flink SQL 中进行解析的时候也会相当麻烦一点,下面将会演示如何在 DDL 里面定义 Map、Array、Row 类型的数据,以及在 SQL 里面如何获里面的值数据格式如下:以下数据完全是自己造的,没有任何实际含义

在日常的开发中,最常用的数据格式是 JSON ,并且有的时候 JSON 的格式是非常复杂的(嵌套的格式),那在 Flink SQL 中进行解析的时候也会相当麻烦一点,下面将会演示如何在 DDL 里面定义 Map、Array、Row 类型的数据,以及在 SQL 里面如何获里面的值


数据格式如下:


以下数据完全是自己造的,没有任何实际含义


{
 "funcName": "test",
 "data": {
  "snapshots": [{
   "content_type": "application/x-gzip-compressed-jpeg",
   "url": "https://blog.csdn.net/xianpanjia4616"
  }],
  "audio": [{
   "content_type": "audio/wav",
   "url": " https://bss.csdn.net/m/topic/blog_star2020/detail?username=xianpanjia4616"
  }]
 },
 "resultMap": {
  "result": {
   "cover": "/data/test/log.txt"
  },
  "isSuccess": true
 },
 "meta": {
  "video_type": "normal"
 },
 "type": 2,
 "timestamp": 1610549997263,
 "arr": [{
  "address": "北京市海淀区",
  "city": "beijing"
 }, {
  "address": "北京市海淀区",
  "city": "beijing"
 }, {
  "address": "北京市海淀区",
  "city": "beijing"
 }],
 "map": {
  "flink": 456
 },
 "doublemap": {
  "inner_map": {
   "key": 123
  }
 }
}


上面的数据包含了 Map、Array、Row 等类型, 对于这样的数据格式,在建表 DDL 里面应该如何定义呢?


定义 DDL


CREATE TABLE kafka_source (
    funcName STRING,
    data ROW<snapshots ARRAY<ROW<content_type STRING,url STRING>>,audio ARRAY<ROW<content_type STRING,url STRING>>>,
    resultMap ROW<`result` MAP<STRING,STRING>,isSuccess BOOLEAN>,
    meta  MAP<STRING,STRING>,
    `type` INT,
    `timestamp` BIGINT,
    arr ARRAY<ROW<address STRING,city STRING>>,
    map MAP<STRING,INT>,
    doublemap MAP<STRING,MAP<STRING,INT>>,
    proctime as PROCTIME()
) WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'test',  -- kafka topic
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker连接信息
    'properties.group.id' = 'jason_flink_test', -- 消费kafka的group_id
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json',  -- 数据源格式为 json
    'json.fail-on-missing-field' = 'true', -- 字段丢失任务不失败
    'json.ignore-parse-errors' = 'false'  -- 解析失败跳过
)


解析 SQL


select
funcName,
doublemap['inner_map']['key'],
count(data.snapshots[1].url),
`type`,
TUMBLE_START(proctime, INTERVAL '30' second) as t_start
from kafka_source
group by TUMBLE(proctime, INTERVAL '30' second),funcName,`type`,doublemap['inner_map']['key']


SQL 运行的结果


4> (true,test,123,6,2,2021-01-15T03:31)
4> (false,test,123,6,2,2021-01-15T03:31)
4> (true,test,123,8,2,2021-01-15T03:31)
4> (false,test,123,8,2,2021-01-15T03:31)
4> (true,test,123,10,2,2021-01-15T03:31)
4> (false,test,123,10,2,2021-01-15T03:31)
4> (true,test,123,13,2,2021-01-15T03:31)
4> (false,test,123,13,2,2021-01-15T03:31)
4> (true,test,123,15,2,2021-01-15T03:31)
4> (true,test,123,3,2,2021-01-15T03:31:30)


说明数据都可以正常的解析出来,如果遇到更加复杂的 JSON 格式的数据,只需要比葫芦画瓢就可以了,在复杂的格式都不是问题.


数据类型映射


目前,JSON 模式总是从表模式派生。目前还不支持显式定义 JSON 模式。Flink JSON 格式使用 jackson databind API 来解析和生成JSON 字符串。下表列出了从 Flink 类型到 JSON 类型的映射。



注意事项:


Json 中的每个 {} 都需要用 Row 类型来表示


Json 中的每个 [] 都需要用 Arrary 类型来表示


数组的下标是从 1 开始的不是 0 如上面 SQL 中的 data.snapshots[1].url


关键字在任何地方都需要加反引号 如上面 SQL 中的 `type`


select 语句中的字段类型和顺序一定要和结果表的字段类型和顺序保持一致


UDF 可以直接在建表语句中使用


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
JSON API 数据格式
淘宝拍立淘按图搜索API系列,json数据返回
淘宝拍立淘按图搜索API系列通过图像识别技术实现商品搜索功能,调用后返回的JSON数据包含商品标题、图片链接、价格、销量、相似度评分等核心字段,支持分页和详细商品信息展示。以下是该API接口返回的JSON数据示例及详细解析:
|
4月前
|
JSON 算法 API
Python采集淘宝商品评论API接口及JSON数据返回全程指南
Python采集淘宝商品评论API接口及JSON数据返回全程指南
|
4月前
|
JSON API 数据安全/隐私保护
Python采集淘宝拍立淘按图搜索API接口及JSON数据返回全流程指南
通过以上流程,可实现淘宝拍立淘按图搜索的完整调用链路,并获取结构化的JSON商品数据,支撑电商比价、智能推荐等业务场景。
|
4月前
|
JSON 中间件 Java
【GoGin】(3)Gin的数据渲染和中间件的使用:数据渲染、返回JSON、浅.JSON()源码、中间件、Next()方法
我们在正常注册中间件时,会打断原有的运行流程,但是你可以在中间件函数内部添加Next()方法,这样可以让原有的运行流程继续执行,当原有的运行流程结束后再回来执行中间件内部的内容。​ c.Writer.WriteHeaderNow()还会写入文本流中。可以看到使用next后,正常执行流程中并没有获得到中间件设置的值。接口还提供了一个可以修改ContentType的方法。判断了传入的状态码是否符合正确的状态码,并返回。在内部封装时,只是标注了不同的render类型。再看一下其他返回的类型;
227 3
|
4月前
|
JSON Java Go
【GoGin】(2)数据解析和绑定:结构体分析,包括JSON解析、form解析、URL解析,区分绑定的Bind方法
bind或bindXXX函数(后文中我们统一都叫bind函数)的作用就是将,以方便后续业务逻辑的处理。
326 3
|
5月前
|
JSON API 数据安全/隐私保护
Python采集淘宝评论API接口及JSON数据返回全流程指南
Python采集淘宝评论API接口及JSON数据返回全流程指南
|
5月前
|
JSON 自然语言处理 监控
淘宝关键词搜索与商品详情API接口(JSON数据返回)
通过商品ID(num_iid)获取商品全量信息,包括SKU规格、库存、促销活动、卖家信息、详情页HTML等。
|
SQL JSON 监控
实时计算 Flink版产品使用合集之直接将 JSON 字符串解析为数组的内置函数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL JSON Apache
Flink问题之嵌套 json 中string 数组的解析异常如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
641 1
|
SQL JSON Apache
Flink SQL问题之复杂JSON解析如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
1264 0