[译]介绍Spark2.4的用于处理复杂数据类型的新内置函数与高阶函数

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Apache Spark2.4引入了29个新的内置函数用于处理复杂数据类型(比如,数组类型等),新的内置函数也包括高阶函数。 在Spark2.4版本之前,有两种典型的方式处理复杂数据类型: 1. 将嵌套结构的数据转化为多行数据,然后使用函数处理,最后在组装成嵌套结构。

[译]介绍Spark2.4的用于处理复杂数据类型的新内置函数与高阶函数

本文翻译自databricks的介绍spark2.4新特性的blog,英文原文参考原文链接

Apache Spark2.4总共支持了29个用于处理复杂数据类型(比如,数组类型等)的新内置函数和高阶函数。

在Spark2.4版本之前,有两种典型的方式处理复杂数据类型:

  1. 将嵌套结构的数据转化为多行数据,然后使用函数处理,最后再组装成嵌套结构。
  2. 自己构建一个UDF。

和之前不同,新的内置函数可以直接操作复杂数据类型,同时新的高阶函数可以使用匿名的lambda函数处理复杂数据类型,和UDF类似,但是性能大大提高。

在本篇文章,我们将通过一些示例展示部分内置函数以及它们的具体用法。

典型的处理方式

我们首先来看一下Spark2.4以前典型的处理方式。

选择1 - Explode and Collect

我们使用explode函数将数组数据拆解为多行数据然后计算val + 1,最后用collect_list再将多行数据重新组织成数组。

SELECT id,
       collect_list(val + 1) AS vals
FROM   (SELECT id,
               explode(vals) AS val
        FROM input_tbl) x
GROUP BY id

这种方式容易出错,同时效率也比较低,这主要体现在三个方面。首先我们很难去确保最后重组的数组所用的行数据确定来自原始的数组,需要通过对unique key分组来保证。其次,我们需要用到group by,这就意味之需要一次shuffle操作,但是shuffle操作并不保证重组后的数组和原始数组中数据的顺序一致。最后,这样的处理方式非常昂贵。

选择2 - User Defined Function

接下来,我们使用Scala UDF处理Seq[Int],访问序列中每个元素并加1.

def addOne(values: Seq[Int]): Seq[Int] = {
  values.map(value => value + 1)
}
val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])

或者,我们也可以使用Python UDF。

SELECT id, plusOneInt(vals) as vals FROM input_tbl

这样更简单更快,同时也避免了很多可能导致错误的陷阱,但是由于还是需要将数据反序列化成Scala或者Python对象,仍然效率不高。
【译者注】基于Tungsten引擎,Spark SQL处理的中间数据是以binary的方式直接存储的,使用UDF需要将binary数据反序列化成Scala/Python数组,处理完成后,还需要序列化成binary数据。

你可以参考并尝试我们之前发布的一篇文章中notebook示例

新内置函数

下面我们看看新内置函数是如何处理复杂数据类型的。这个notebook列举每个函数的示例。每个函数的名称和参数标注了它们对应处理的数据类型,T或U表示数组,K,V表示映射(MAP)类型。

高阶函数

【译者注】高阶函数是来自函数式语言的一个概念,主要是指一个函数支持使用其他函数作为参数或者返回类型为函数。具体定义可以参考Higher-Order Function
为了更进一步处理数组和映射类型的数据,我们使用了匿名lambda函数或高阶函数这两种SQL中支持的语法,使用lambda函数作为入参。

lambda函数的语法规范如下:

 argument -> function body
  (argument1, argument2, ...) -> function body

箭头->左边表示参数列表,箭头右边定义函数体,在函数体中使用参数和其他成员变量计算结果值。

使用匿名Lambda函数

我们首先尝试使用匿名lambda函数的transform函数。

假设有一个表,包含三列数据:integer类型的key,integer数组类型的values,二维Integer数组类型的nested_values。

key values nested_values
1 [1,2,3] [[1,2,3],[],[4,5]]

当我们执行如下SQL的时候:

SELECT TRANSFORM(values, element -> element + 1) FROM data;

transform函数迭代访问values数组中的每个元素,并执行lambda函数,给每个元素加1,然后构建一个新数组。

我们可以使用除了参数以外的其他变量,比如:key,key是表中的另外一列,在lambda函数上下文之外,但是我们仍然可以使用它,比如:

SELECT TRANSFORM(values, element -> element + key) FROM data;

如果你想要处理更复杂的嵌套类型,比如nested_values列的数据,你甚至可以使用嵌套的lambda函数:

SELECT TRANSFORM(
    nested_values,
    arr -> TRANSFORM(arr,
      element -> element + key + SIZE(arr)))
  FROM data;

在里层的lambda函数中,你同样也可以使用key和arr这些在lambda函数上下文之外的变量以及表的其他字段值。

需要注意的是,在上面的notebook中,同时展示了基于之前典型的处理方式和新的高阶函数的处理方式两种示例代码,

结论

Spark2.4支持了24个新的内置函数,比如array_union, array_max/min等,以及5个高阶函数,比如transform, filter等,都是用于处理复杂数据类型。如果你需要处理复杂数据类型,建议使用这些函数。感谢Apache Spark的contributor贡献了这些功能:Alex Vayda, Bruce Robbins, Dylan Guedes, Florent Pepin, H Lu, Huaxin Gao, Kazuaki Ishizaki, Marco Gaido, Marek Novotny, Neha Patil, Sandeep Singh以及其他人。

欢迎加群指正交流
01__

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
存储 分布式计算 并行计算
大数据Spark RDD 函数 1
大数据Spark RDD 函数
116 0
|
SQL 分布式计算 Scala
【Spark】Spark SQL 数据类型转换
【Spark】Spark SQL 数据类型转换
1848 0
【Spark】Spark SQL 数据类型转换
spark3.5.1中内置函数大全
spark3.5.1中内置函数大全
|
6月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
分布式计算 Java Spark
图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理
图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理
57 0
|
8月前
|
机器学习/深度学习 分布式计算 算法
【大数据技术】Spark MLlib机器学习库、数据类型详解(图文解释)
【大数据技术】Spark MLlib机器学习库、数据类型详解(图文解释)
150 0
|
8月前
|
SQL 分布式计算 Spark
Spark【Spark SQL(四)UDF函数和UDAF函数】
Spark【Spark SQL(四)UDF函数和UDAF函数】
|
分布式计算 大数据 数据挖掘
大数据Spark RDD 函数 2
大数据Spark RDD 函数
109 0
|
SQL JSON 分布式计算
spark2 sql读取数据源编程学习样例2:函数实现详解
spark2 sql读取数据源编程学习样例2:函数实现详解
98 0
spark2 sql读取数据源编程学习样例2:函数实现详解
|
分布式计算 Java Linux
【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
199 0
【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化
下一篇
开通oss服务