13个实验带你玩转MaxCompute SQL之JSON操作

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 保姆级教程

你好呀,我是Bella酱~

一个主要写Java和SQL的妹子,这周工作中呢,我想要做一个功能(嗯,做什么功能自己定),这个功能呢,主要是对某些数据指标稽核,以监测这些指标的值是否正确,即监控数据质量。

背景

这些指标是我在MaxCompute中写SQL生成的(嗯,我监控我自己),而且这些指标是以JSON的形式全部存放在了一个字段中。为什么不是一个指标一列来存放数据,而是所有指标全部放在一个字段中呢?一是因为指标种类太多了,几十种;二是因为指标的种类不是固定的,是可以根据配置动态生成的,即有哪些指标,是不固定的,如果以列的形式存放的话,要能够动态生成列才行。接触过MongoDB的同学都知道,它是以document的结构来存放数据的,用document来存放数据的一个很重要的优势就是列不固定,你往document中扔什么,它就存放什么,相比固定列来说,更灵活。所以考虑到我的数据指标的特性,我当初就把我动态生成的所有指标全部都扔在了一个字段中,考虑到Java读取指标的便捷性,我又把所有指标以及它的值序列化成了JSON的格式。

所有指标以JSON的形式放在一个字段中(假设这个字段叫index_value),爽!确实很爽!

但是对于今日的我来说,不太爽,而且是太不爽了,为什么呢?因为我要把这些指标反序列,然后再对它们进行各种数学运算。

方案确定

好了,了解完背景,我们回到数据指标稽核这个功能上,考虑这个功能实现细节时的我,第一反应是写一个UDF,让MaxCompute SQL调用UDF。UDF的入参设置为index_value(即存放所有指标的那个字段),出参设置为反序列化后,经过各种数学运算的结果值。嗯,想到这个方案时,我的嘴角不自觉上扬,这个实现嘛,很简单,就是Java的反序列化和普通的逻辑计算,这个谁还不会嘛。但是,很快,脑海中出现了一个小人,她告诉我,“为什么不看看SQL能不能解决这个问题呢?为什么又要SQL中调用UDF呢,为什么不搞点新东西呢?跳出舒适区吧。”

听从了小人儿的想法,我开始了SQL反序列化Json字符串的探索之旅。果然,功夫不负有心人,I get it!现在呢,我的功能已经实现并且发布好啦,我们来一起看看MaxCompute SQL如何操作JSON字符串吧!

生成JSON数据

我们先来看下如何生成JSON数据,包含利用MaxCompute官方提供的函数和UDF 2种方式。

MaxCompute官方提供了 TO_JSON 函数来生成JSON格式的字符串,但是这个函数可以支持的场景非常有限。我们先来看下它的命令格式:

to_json(expr)

其中expr为必填项,且仅支持3种格式:MAP、ARRAY、STRUCT类型。

我们来看几个例子。

map类型

1.要求key-value对必须同时存在,否则运行时将报错

2.生成的JSON数据中的key和map中的key完全一样,不会自动转换大小写

3.value为null值的key-value对,仍然会正常输出

SELECT to_json(map('Bella酱_map',100,'MySQL',100,'Java',99,'Redis',98,'geography',60, 'Flink', CAST(NULL AS STRING )));

map类型生成json格式数据.jpg

STRUCT类型

1.要求key-value对必须同时存在,否则运行时将报错

2.生成的JSON数据中的key全部为小写

3.value为null值的key-value对,不会输出,自动过滤掉了

SELECT to_json(NAMED_STRUCT('Bella酱_named_struct', 100, 'ES', 99, 'HBase', 98, 'Java', CAST(null AS STRING)));

struct类型生成JSON格式数据.jpg

array类型

1.生成JSON Array格式的数据

SELECT to_json(ARRAY(map('Bella酱_array_map_1', 100, 'ES', 90, 'Java', 60), map('Bella酱_array_map_2', 90, 'C', 80)));

array生成JSON格式数据.jpg

UDF

除了上述3种方式,MaxCompute也提供了UDF的方式来生成JSON,我就是采用这种方式生成的,因为我要多行转一列,然后这一列的数据格式为JSON。

-- 1.建表
DROP TABLE IF EXISTS student_score ;

CREATE TABLE IF NOT EXISTS student_score 
(
    id BIGINT COMMENT 'id,逻辑主键'
    ,student_no BIGINT COMMENT '学号'
    ,student_name STRING COMMENT '姓名'
    ,suject STRING COMMENT '科目'
    ,score BIGINT COMMENT '成绩'
)
;

-- 2.插入数据
INSERT OVERWRITE TABLE student_score VALUES 
    (1, 2021073101, 'Bella酱', 'MySQL', 100), 
    (2, 2021073101, 'Bella酱', 'Java', 99), 
    (3, 2021073101, 'Bella酱', 'Redis', 98), 
    (4, 2021073101, 'Bella酱', 'HBase', 97), 
    (5, 2021073101, 'Bella酱', 'geography', 60), 
    (6, 2021073102, '特拉法尔加·罗', 'MySQL', 100), 
    (7, 2021073102, '特拉法尔加·罗', 'Java', 100), 
    (8, 2021073102, '特拉法尔加·罗', 'Redis', 100), 
    (9, 2021073102, '特拉法尔加·罗', 'HBase', 100), 
    (10, 2021073102, '特拉法尔加·罗', 'geography', 100), 
    (11, 2021073103, '索隆', 'MySQL', 95), 
    (12, 2021073103, '索隆', 'Java', 94), 
    (13, 2021073103, '索隆', 'Redis', 93), 
    (14, 2021073103, '索隆', 'HBase', 98), 
    (15, 2021073103, '索隆', 'geography', 20) ;

-- 3.按学生维度来存放数据,所有科目的成绩以json的形式存放在一个字段中
DROP TABLE IF EXISTS student_score_json;

CREATE TABLE IF NOT EXISTS student_score_json AS 
SELECT  MAX(id) AS id
        ,student_no
        ,MAX(student_name) AS student_name
        ,GENERATEJSONSTRING(
            WM_CONCAT(',',suject_score)
            ,','
            ,'='
        ) AS suject_score
FROM    (
            SELECT  id
                    ,student_no
                    ,student_name
                    ,suject
                    ,score
                    ,CONCAT_WS('=', suject, score) AS suject_score
            FROM    student_score
        ) a
GROUP BY student_no
;

其中UDF代码如下:

import com.aliyun.odps.udf.UDF;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * 生成JSON格式字符串
 *
 * @author Bella酱
 * @date 2021/08/01
 */
public class GenerateJsonString extends UDF {

    public String evaluate(String source, String delimiter, String joiner) {
        Map<String, String> map = transferStr2Map(source, delimiter, joiner);
        return new Gson().toJson(map);
    }

    /**
     * @param source    数据源
     * @param delimiter 分隔符
     * @param joiner    连接符
     * @return
     */
    private static Map<String, String> transferStr2Map(String source, String delimiter, String joiner) {
        Map<String, String> map = new HashMap<>(128);
        if (StringUtils.isBlank(source)) {
            return map;
        }

        String[] sourceArray = source.split(delimiter);
        for (String item : sourceArray) {
            String[] itemArray = item.split(joiner);
            if (Objects.isNull(itemArray) || itemArray.length == 0) {
                break;
            }

            map.put(itemArray[0], itemArray[1]);
        }

        return map;
    }
}

最终生成的 student_score_json 表中数据如下:

student_score_json表数据.jpg

可以看到subject_score字段即JSON格式数据。

好了,了解完MaxCompute SQL生成JSON格式数据的4种方式后,接下来呢,我们将以 student_score_json 表为例,来讲解MaxCompute SQL如何解析JSON格式数据。

解析JSON数据生成多列

MaxCompute SQL提供了2种解析JSON字符串的函数,分别是GET_JSON_OBJECT和JSON_TUPLE。

GET_JSON_OBJECT

我们先来看下GET_JSON_OBJECT,命令格式如下:

string get_json_object(string json, string path)

从命令格式可以看出,我们每一次调用get_json_object函数只能从JSON字符串中提取出一个字段,若JSON串中有N个字段,那我们则要调用N次get_json_object,同样,读取了JSON字符串N次。在海量数据的情况下(MaxCompute又偏偏是处理海量数据的),这种行为是非常糟糕的,会将整个数据处理放大N倍,是可能会影响到性能的。

这里需要注意一点,path要以$开头,表示根节点,.表示子节点,读取suject_core中的Java字段的值则应该写为$.Java

代码和执行效果如下,当某个path对应的值不存在时,get_json_object函数返回值为null。

SELECT  id
        ,student_no
        ,student_name
        ,GET_JSON_OBJECT(s.suject_core, "$.Java") AS Java
        ,GET_JSON_OBJECT(s.suject_core, "$.geography") AS geography
        ,GET_JSON_OBJECT(s.suject_core, "$.MySQL") AS MySQL
        ,GET_JSON_OBJECT(s.suject_core, "$.Redis") AS Redis
        ,GET_JSON_OBJECT(s.suject_core, "$.HBase") AS HBase
FROM    student_score_json s
;

get_json_object.jpg

如果要读取JSON Array数组呢?当然也是可以的啦。

1.数据key[*]即可读取数组中所有数据

2.数组key[数组下标]即可读取数组相应下标中存放的JSON字符串,若要进一步读取JSON字符串中的值,数组key[数组下标].字段key即可。

我们一起来看个栗子吧。首先是数据准备。

DROP TABLE IF EXISTS tmp_score_array_demo ;

CREATE TABLE IF NOT EXISTS tmp_score_array_demo 
(
    score_array STRING COMMENT '成绩json数组'
)
;

INSERT OVERWRITE TABLE tmp_score_array_demo 
VALUES (to_json(MAP('scores', ARRAY(MAP('array_map_1', 80, 'Flink', 70, 'Redis', 60), 
                                    MAP('array_map_2', 90, 'ES', 70, 'Redis', 60))))) ;

上述脚本准备好的数据是这个样子的。

tmp_score_array_demo表数据.jpg

1)读取scores数组的值。

SELECT  GET_JSON_OBJECT(tmp_score_array_demo.score_array, '$.scores[*]')
FROM    tmp_score_array_demo
;

get_json_object读取数组的值.jpg

2) 读取scores数组中第一个元素的值。

SELECT  GET_JSON_OBJECT(tmp_score_array_demo.score_array, '$.scores[0]')
FROM    tmp_score_array_demo
;

get_json_object读取数组中第一个元素的值.jpg

3) 读取scores数组中第一个元素中key为Flink的值

SELECT  GET_JSON_OBJECT(tmp_score_array_demo.score_array, '$.scores[0].Flink')
FROM    tmp_score_array_demo
;

get_json_object读取数组中第一个元素中key为Flink的值.jpg

JSON_TUPLE

我们再来看下JSON_TUPLE,命令格式如下:

string json_tuple(string json, string key1, string key2, string key3...)

从命令格式可以看出,即使我们要读取JSON中多个key的值,也只需要读取一次JSON数据就好了。这,不就是我一直在寻找的吗,激动的我赶快试了下。

SELECT  JSON_TUPLE(
            student_score_json.suject_score
            ,"Java"
            ,"geography"
            ,"MySQL"
            ,"Redis"
            ,"HBase"
        )
FROM    student_score_json
;

JSON_TUPLE读取json数据_01.jpg

好用,确实是非常好用,但是有2个问题。

1.只有成绩,没有学生信息,我无法知道2者的对应关系。

2.列名都丢失了,无法知道每一列是哪个科目的成绩。

莫慌~MaxCompute还提供了LATERAL VIEW的功能,我们可以配合LATERAL VIEW一起食用,完美解决了上述2个问题。

SELECT  s.id AS id
        ,s.student_no AS student_no
        ,s.student_name AS student_name
        ,a.Java AS Java
        ,a.geography AS geography
        ,a.MySQL AS MySQL
        ,a.Redis AS Redis
        ,a.HBase AS HBase
FROM    student_score_json s
LATERAL VIEW JSON_TUPLE(s.suject_score, "Java","geography","MySQL","Redis","HBase") a AS Java, geography, MySQL, Redis, HBase
;

json_tuple解析json数据_02.jpg

至此,我想要的效果出来啦!

再配合nvl函数,将上述的a.Java之类的都改为nvl(a.Java, 0) (如果json中不存在key对应的值,则取默认值0),后面就可以随心所欲的对这些列进行数据运算了。

如果要读取JSON Array数组呢?当然也是可以的啦。我们还以上文中的tmp_score_array_demo表为例。表中数据如下:

json_tuple读取数组中所有元素的值.jpg

1)读取scores数组的值

SELECT json_tuple(tmp_score_array_demo.score_array, "scores[*]") FROM tmp_score_array_demo;

json_tuple读取数组中所有元素的值.jpg

2) 读取scores数组中第一个元素的值

SELECT json_tuple(tmp_score_array_demo.score_array, "scores[0]") FROM tmp_score_array_demo;

json_tuple读取数组中第一个元素的值.jpg

3) 读取scores数组中第一个元素中key为Flink的值

SELECT  json_tuple(tmp_score_array_demo.score_array, "scores[0].Flink")
FROM    tmp_score_array_demo
;

json_tuple读取数组中第一个元素中key为Flink的值.jpg

好啦,通过上述13个实验,我们已经把如何生成JSON数据,如何解析JSON数据、JSON Array数据等讲解完了。我们今天的文章就到这里啦,下期见~

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
13天前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之在使用MaxCompute的Java SDK创建函数时,出现找不到文件资源的情况,是BUG吗
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
31 0
|
11天前
|
JSON NoSQL MongoDB
实时计算 Flink版产品使用合集之要将收集到的 MongoDB 数据映射成 JSON 对象而非按字段分割,该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8天前
|
SQL 分布式计算 关系型数据库
Spark编程实验三:Spark SQL编程
Spark编程实验三:Spark SQL编程
18 1
|
8天前
|
SQL 分布式计算 Java
大数据软件基础(2)—— Java、SQL
大数据软件基础(2)—— Java、SQL
9 0
|
13天前
|
SQL Oracle 关系型数据库
实验四 SQL语言
实验四 SQL语言
8 2
|
13天前
|
分布式计算 DataWorks Oracle
DataWorks操作报错合集之DataWorks ODPS数据同步后,timesramp遇到时区问题,解决方法是什么
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
33 0
|
13天前
|
分布式计算 DataWorks 数据管理
DataWorks操作报错合集之DataWorks中udf开发完后,本地和在MaxCompute的工作区可以执行函数查询,但是在datawork里报错FAILED: ODPS-0130071:[2,5],是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
29 0
|
13天前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之DataWorks在同步mysql时报错Code:[Framework-02],mysql里面有个json类型字段,是什么原因导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
38 0
|
13天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
27 0
|
13天前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之在DataWorks数据集成中,但是预览ODPS源数据库为空,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
32 0

热门文章

最新文章