1 Spark SQL自定义函数
1.1 自定义函数分类
类似于hive当中的自定义函数, spark同样可以使用自定义函数来实现新的功能。
spark中的自定义函数有如下3类
1.UDF(User-Defined-Function)
输入一行,输出一行
2.UDAF(User-Defined Aggregation Funcation)
输入多行,输出一行
3.UDTF(User-Defined Table-Generating Functions)
输入一行,输出多行
1.2 自定义UDF
●需求
有udf.txt数据格式如下:
Hello abc study small
通过自定义UDF函数将每一行数据转换成大写
select value,smallToBig(value) from t_word
●代码演示
package cn.oldlu.sql import org.apache.spark.SparkContext import org.apache.spark.sql.{Dataset, SparkSession} object UDFDemo { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\udf.txt") fileDS.show() /* +----------+ | value| +----------+ |helloworld| | abc| | study| | smallWORD| +----------+ */ /* 将每一行数据转换成大写 select value,smallToBig(value) from t_word */ //注册一个函数名称为smallToBig,功能是传入一个String,返回一个大写的String spark.udf.register("smallToBig",(str:String) => str.toUpperCase()) fileDS.createOrReplaceTempView("t_word") //使用我们自己定义的函数 spark.sql("select value,smallToBig(value) from t_word").show() /* +----------+---------------------+ | value|UDF:smallToBig(value)| +----------+---------------------+ |helloworld| HELLOWORLD| | abc| ABC| | study| STUDY| | smallWORD| SMALLWORD| +----------+---------------------+ */ sc.stop() spark.stop() } }
1.3 自定义UDAF
虽然小编说UDAF只做了解,但也只需要掌握的。在企业中,这也是你和别人拉开差距的地方
●需求
有udaf.json数据内容如下
{"name":"Michael","salary":3000} {"name":"Andy","salary":4500} {"name":"Justin","salary":3500} {"name":"Berta","salary":4000}
求取平均工资
●继承UserDefinedAggregateFunction方法重写说明
inputSchema:输入数据的类型
bufferSchema:产生中间结果的数据类型
dataType:最终返回的结果类型
deterministic:确保一致性,一般用true
initialize:指定初始值
update:每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的运算)
merge:全局聚合(将每个分区的结果进行聚合)
evaluate:计算最终的结果
●代码演示
package cn.oldlu.sql import org.apache.spark.SparkContext import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} object UDAFDemo { def main(args: Array[String]): Unit = { //1.获取sparkSession val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") //2.读取文件 val employeeDF: DataFrame = spark.read.json("D:\\data\\udaf.json") //3.创建临时表 employeeDF.createOrReplaceTempView("t_employee") //4.注册UDAF函数 spark.udf.register("myavg",new MyUDAF) //5.使用自定义UDAF函数 spark.sql("select myavg(salary) from t_employee").show() //6.使用内置的avg函数 spark.sql("select avg(salary) from t_employee").show() } } class MyUDAF extends UserDefinedAggregateFunction{ //输入的数据类型的schema override def inputSchema: StructType = { StructType(StructField("input",LongType)::Nil) } //缓冲区数据类型schema,就是转换之后的数据的schema override def bufferSchema: StructType = { StructType(StructField("sum",LongType)::StructField("total",LongType)::Nil) } //返回值的数据类型 override def dataType: DataType = { DoubleType } //确定是否相同的输入会有相同的输出 override def deterministic: Boolean = { true } //初始化内部数据结构 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } //更新数据内部结构,区内计算 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { //所有的金额相加 buffer(0) = buffer.getLong(0) + input.getLong(0) //一共有多少条数据 buffer(1) = buffer.getLong(1) + 1 } //来自不同分区的数据进行合并,全局合并 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } //计算输出数据值 override def evaluate(buffer: Row): Any = { buffer.getLong(0).toDouble / buffer.getLong(1) } }