开发者学堂课程【大数据 Spark2020版(知识精讲与实战演练)第三阶段:函数_UDF】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/690/detail/12102
函数_UDF
之前的课程介绍了许多关于 SparkSQL 的知识点,本节课补充介绍一个知识点,UDF。
数据集:
val source = Seq(
("Thin", "cell phone" , 6000),
("Normal", “Tablet",1500),
("Mini", "Tablet", 5500),
("Ultra thin" , "cell phone",5000),
("very thin", "cell phone", 6000),
("Big","Tablet", 2500) ,
("Bendable", "cell phone", 3000),
("Foldable", "cell phone", 3000),
("Pro", "Tablet", 4500) ,
(Pro2", "Tablet", 6500)
).toDF( colNames = "product", "category, "revenue")
这个数据集有三个列,第一个是商品名称,第二个是商品类别,第三个是商品价格。
根据以上数据集提出了三个需求:第一,聚合每个类别的总价;第二,把名称变为小写;第三,把价格变为字符串形式。
如何实现需求一:前面的课程讲到要进性聚合,首先是分组,之后对每一组数据进行聚合。
操作代码:
object UDF {
def main(args: Array[String]): unit = {
val spark = SparkSession.builder()
.appName( name = "windows")
.master( master = "local[6]")
.getorCreate()
import spark .implicits._
val source = Seq(
("Thin", "cell phone" , 6000),
("Normal", “Tablet",1500),
("Mini", "Tablet", 5500),
("Ultra thin" , "cell phone",5000),
("very thin", "cell phone", 6000),
("Big","Tablet", 2500) ,
("Bendable", "cell phone", 3000),
("Foldable", "cell phone", 3000),
("Pro", "Tablet", 4500) ,
(Pro2", "Tablet", 6500)
).toDF( colNames = "product", "category, "revenue")
//需求一:聚合每个类别的总价
//1.分组 2.对每一组数据进行聚合
import org.apache.spark.sql.functions._
source.groupBy( cols = 'category)
注:groupBy 中是需要选中按照什么进行分组
/对分组数据进行聚合
.agg( sum( ‘ revenue))
注:前面课程讲到可以直接求 count,count 直接作用于一个分组,还可以求 max、sum,它们都作用于某一个分组。
使用 agg API,agg 是聚合,具体如何聚合要以函数的形式,在前面要导入 org.apache.spark.sql.functions._
Functions 中有 count、sum、mean、max 等函数可以作用于分组,本次选择 sum。
.show()
运行得到结果:
由图可知:category 是分组的列,sum 是每个组聚合的总和。
注意:以上代码是为了让大家了解 functions 有许多函数,而函数就叫 functions,这里介绍的函数与以往函数有些区别。
Functions 中的函数,比如 sum 函数:
def sum(e: column): column= withAggregateFunction { Sum(e.expr)}
Sum 函数接收了一列数据,并且返回新的一列数据。sparkSQL 中的函数并不是处理单个值然后返回单个值,而这里的函数是接收一列数据,对一列数据进行统一处理然后返回新的一列数据。
函数可以作用于 groupBy 后的 agg 中,还可以作用于其他地方,如下:
//需求二:把名称变为小写
/将上一段 source 后的代码暂时注释掉
source.select( Lower( ‘ product))
注意:正常情况下,’product 返回的是一个 column,select 接收的也是 column,经过 lower 处理,lower 函数返回的也是 column 对象,所以这个函数就可以将一列数据全部变为小写,之后再返回一个新的列。
.show()
运行代码得到结果:
Product 列的名称都变为小写,说明:函数不仅可以作用于 agg 中,还可以作用于 select 中,甚至是 where 中,where 也解说 column 对象。所有接收 column 对象的地方都可以使用函数处理一列,返回 column 然后传进,在 SparkSQL 中会比较灵活。
本节课要说的不是这些函数,点开 functions,可以看到:
Functions availabLe for DataFrame operations.
@groupname udf _funcs UDF functions
@groupname agg _funcsAggregate functions
@groupname datetime_funcs Date time functions
@groupname sort_funcs Sorting functions
@groupname normal_funes Non-aggregate functions
@groupname math_funcs Math functions
@groupname misc_funcs Misc functions
@groupname window_funcs window functions
@groupname string_funcs string functions
@groupname collection_funcs colLection functions
@groupname Ungrouped Support functions for DataFrames
since 1.3.e
里面有一部分是 agg 的 functions,用于聚合的,比如 max、min、sum、mean;对时间进行处理的 functions;用于排序的 functions,比如升序,降序;还有一些无聚合的 functions,比如直接是字符串显示在一列中;以及数学、窗口、收集函数。平常如果要使用 functions,可直接在 functions 中进行查询。
本节课主要要讲的是 UDF,什么是 UDF?接下来看需求三:
//需求三:把价格变为字符串形式
注:将需求二代码也注释掉。现在的价格是 int 值形式,int 值在直接写时是 long 型的数据
//正常情况下显示的 6000,现在要显示 6k
}
自定义函数,自定义函数只能接收单个值。
def tostr( revenue: Long): string = {
(revenue / 1000) +"K"
}
toStr 是作用于每个单值的,如何将整列数据都进行转换:
//需求三:把价格变为字符串形式
SparkSQL functions 包下有一个 udf 函数,这个函数可以提供一个函数,然后返回一个对应的 UserDefinedFunction。
val toStrUDF = udf(tostr _)
注:会得到一个 UDF,这个 UDF 也是一个函数,这个函数会去接收一个 column 并且返回一个 column,spark 会帮助作用于每条元素。
source.select( cols = 'product,'category,tostrUDF ( " revenue))
注:revenue 在传进 toStrUDF 必须是一个 column
.show()
}
def tostr( revenue: Long): string = {
(revenue / 180e) +"K"
}
运行代码得到结果:
可以看到 UDF 列显示 6K、1K 等数据,是因为在进行除法时未考虑类型,故未显示 dubble 或者 float。
知识点:
在 Spark 当中有一些预定义好的函数可供大家使用,如果这些函数不能满足需求,可以定义一些 userdefinedfunction 函数(UDF),通过自定义函数满足需求。