一、自定义函数UDF
数据源:
{"deptName":"dept-1", "name":"Michael", "salary":3000} {"deptName":"dept-2", "name":"Andy", "salary":5000} {"deptName":"dept-1", "name":"Alex", "salary":4500} {"deptName":"dept-2", "name":"Justin", "salary":6700} {"deptName":"dept-2", "name":"Cherry", "salary":3400} {"deptName":"dept-1", "name":"Jack", "salary":5500} {"deptName":"dept-2", "name":"Jone", "salary":12000} {"deptName":"dept-1", "name":"Lucy", "salary":8000} {"deptName":"dept-2", "name":"LiLi", "salary":7600} {"deptName":"dept-2", "name":"Pony", "salary":4200}
初始化SparkSession
package com.kfk.spark.common import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/2 * @time : 10:02 下午 */ object CommSparkSessionScala { def getSparkSession(): SparkSession ={ val spark = SparkSession .builder .appName("CommSparkSessionScala") .master("local") .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse") .getOrCreate return spark } }
自定义UDF将内容改为大写:
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/8 * @time : 4:46 下午 */ object UDFScala { def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() val userPath = Comm.fileDirPath + "users.json" spark.read.json(userPath).createOrReplaceTempView("user") // 自定义UDF将内容改为大写 spark.udf.register("strUpper",(str:String) => str.toUpperCase) spark.sql("select deptName,strUpper(name) from user").show() /** * +--------+------------------+ * |deptName|UDF:strUpper(name)| * +--------+------------------+ * | dept-1| MICHAEL| * | dept-2| ANDY| * | dept-1| ALEX| * | dept-2| JUSTIN| * | dept-2| CHERRY| * | dept-1| JACK| * | dept-2| JONE| * | dept-1| LUCY| * | dept-2| LILI| * | dept-2| PONY| * +--------+------------------+ */ } }
二、自定义聚合函数UDAF
package com.kfk.spark.sql import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, IntegerType, StringType, StructField, StructType} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/8 * @time : 7:13 下午 */ class MyCount extends UserDefinedAggregateFunction{ /** * 指的是输入数据的类型 * @return */ override def inputSchema: StructType = { StructType(Array(StructField("str",StringType,true))) } /** * 中间进行聚合的时候所处理的数据类型 * @return */ override def bufferSchema: StructType = { StructType(Array(StructField("count",IntegerType,true))) } /** * 返回类型 * @return */ override def dataType: DataType = { IntegerType } /** * 校验返回值 * @return */ override def deterministic: Boolean = { true } /** * 为每个分组的数据执行初始化操作 * @param buffer */ override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0 } /** * 每个分组有新的数据进来的时候,如何进行分组对应的聚合值的计算 * @param buffer * @param input */ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Int](0) + 1 } /** * 在每一个节点上的集合值要进行最后的merge * @param buffer1 * @param buffer2 */ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0) } /** * 返回最终结果 * @param buffer * @return */ override def evaluate(buffer: Row): Any = { buffer.getAs(0) } }
自定义聚合函数UDAF,count
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/8 * @time : 8:06 下午 */ object UDAFScala { def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() val userPath = Comm.fileDirPath + "users.json" spark.read.json(userPath).createOrReplaceTempView("user") // 自定义UDAF,聚合函数 spark.udf.register("count",new MyCount) spark.sql("select deptName,count(deptName) count from user group by deptName").show() /** * +--------+-----+ * |deptName|count| * +--------+-----+ * | dept-1| 4| * | dept-2| 6| * +--------+-----+ */ } }