一、DataSet中常见函数详解
(1)重分区函数:coalesce / repartition
coalesce:只能用于减少分区的数据,而且可以选择不发生shuffle。
repartition:可以增加分区的数据,也可以减少分区的数据,必须会发生shuffle,相当于进行了一次重新分区操作。
(2)去重函数:distinct / dropDuplicates
distinct:是根据每一条数据进行完整内容的对比和去重。
dropDuplicates:可以根据指定的字段进行去重。
(3)过滤函数:filter / except / intersect
filter:根据我们自己的业务逻辑,如果返回true,就保留该元素,如果是false,就不保留该元素。
except:获取在当前的dataset中有,但是在另一个dataset中没有的元素。
intersect:获取两个dataset中的交集。
(4)Map函数:map / flatMap / mapPartition
map:将数据集中的每一条数据做一个映射,返回一条映射。
flatMap:数据集中的每条数据都可以返回多条数据。
mapPartition:一次性对一个partition中的数据进行处理。
(5)Join函数:join
join:根据某一个字段,关联合并两个数据集。
(6)Sort函数:sort
sort:对数据表排序。
(7)随机切分函数:sample / randomSplit
randomSplit:根据Array中的元素的数量进行切分,然后再给定每个元素的值来保证对切分数据的权重。
sample:根据我们设定的比例进行随机抽取。
示例代码及运行演示:
初始化SparkSession
package com.kfk.spark.common import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/2 * @time : 10:02 下午 */ object CommSparkSessionScala { def getSparkSession(): SparkSession ={ val spark = SparkSession .builder .appName("CommSparkSessionScala") .master("local") .config("spark.sql.warehouse.dir", "/Users/caizhengjie/Document/spark/spark-warehouse") .getOrCreate return spark } }
案例中用到的数据
people.json
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} {"name":"Justin", "age":19} {"name":"Justin", "age":32}
people1.json
{"name":"Justin", "age":32}
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommSparkSessionScala} import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/4 * @time : 2:45 下午 */ object TypeOperationScala { // case class Person(name:String, age:Option[Long]) case class Person(name:String, age:Long) case class Employee(name1:String, salary:Long) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() /** * coalesce / repartition */ coalesceAndRepartitions(spark) /** * distinct / dropDuplicates */ distinctAndDropDuplicates(spark) /** * filter / except / intersect */ filterAndExceptAndIntersect(spark) /** * map / flatMap / mapPartition */ mapAndFlatMap(spark) /** * join */ joinWith(spark) /** * sort */ sort(spark) /** * sample / randomSplit */ sampleAndRandomSplit(spark) } /** * coalesce 和 repartition 操作 * 都是用来进行重分区的 * 区别在于:coalesce只能用于减少分区的数据,而且可以选择不发生shuffle * repartition可以增加分区的数据,也可以减少分区的数据,必须会发生shuffle,相当于进行了一次重新分区操作 * @param spark */ def coalesceAndRepartitions(spark : SparkSession): Unit ={ val personDF = spark.read.json(Comm.fileDirPath + "people.json") // 1 println(personDF.rdd.partitions.size) // repartition val personDFRepartition = personDF.repartition(4) // 4 println(personDFRepartition.rdd.partitions.size) // coalesce val personDFCoalesce = personDFRepartition.coalesce(3) // 3 println(personDFCoalesce.rdd.partitions.size) } /** * distinct:是根据每一条数据进行完整内容的对比和去重 * DropDuplicates:可以根据指定的字段进行去重 * @param spark */ def distinctAndDropDuplicates(spark : SparkSession): Unit ={ val personDF = spark.read.json(Comm.fileDirPath + "people.json") personDF.show() /** * +----+-------+ * | age| name| * +----+-------+ * |null|Michael| * | 30| Andy| * | 19| Justin| * | 19| Justin| * | 32| Justin| * +----+-------+ */ // distinct personDF.distinct().show() /** * +----+-------+ * | age| name| * +----+-------+ * |null|Michael| * | 30| Andy| * | 19| Justin| * | 32| Justin| * +----+-------+ */ // dropDuplicates personDF.dropDuplicates(Seq("name")).show() /** * +----+-------+ * | age| name| * +----+-------+ * |null|Michael| * | 30| Andy| * | 19| Justin| * +----+-------+ */ } /** * filter:根据我们自己的业务逻辑,如果返回true,就保留该元素,如果是false,就不保留该元素 * except:获取在当前的dataset中有,但是在另一个dataset中没有的元素 * intersect:获取两个dataset中的交集 * @param spark */ def filterAndExceptAndIntersect(spark : SparkSession): Unit ={ import spark.implicits._ val personDS = spark.read.json(Comm.fileDirPath + "people.json").as[Person] val personDS1 = spark.read.json(Comm.fileDirPath + "people1.json").as[Person] // filter方式一 personDS.filter(line => line.age != None).show() /** * +---+------+ * |age| name| * +---+------+ * | 30| Andy| * | 19|Justin| * | 19|Justin| * | 32|Justin| * +---+------+ */ // filter方式二 val personDF = spark.read.json(Comm.fileDirPath + "people.json") val rdd = personDF.rdd.filter(line => line.getAs("age") != null) for (elem <- rdd.collect()) { System.out.println(elem) } /** * [30,Andy] * [19,Justin] * [19,Justin] * [32,Justin] */ // except personDS.except(personDS1).show() /** * +----+-------+ * | age| name| * +----+-------+ * |null|Michael| * | 30| Andy| * | 19| Justin| * +----+-------+ */ // intersect personDS.intersect(personDS1).show() /** * +---+------+ * |age| name| * +---+------+ * | 32|Justin| * +---+------+ */ } /** * map:将数据集中的每一条数据做一个映射,返回一条映射 * flatMap:数据集中的每条数据都可以返回多条数据 * mapPartition:一次性对一个partition中的数据进行处理 * @param spark */ def mapAndFlatMap(spark : SparkSession): Unit ={ import spark.implicits._ val personDS = spark.read.json(Comm.fileDirPath + "people.json").as[Person] // map personDS.filter("age is not null").map(line => (line.name,line.age+10)).show() /** * +------+---+ * | _1| _2| * +------+---+ * | Andy| 40| * |Justin| 29| * |Justin| 29| * |Justin| 42| * +------+---+ */ // flatMap personDS.filter("age is not null").flatMap(line => { Seq(Person(line.name + "_1",line.age + 1000),Person(line.name + "_2",line.age + 1000)) }).show() /** * +--------+----+ * | name| age| * +--------+----+ * | Andy_1|1030| * | Andy_2|1030| * |Justin_1|1019| * |Justin_2|1019| * |Justin_1|1019| * |Justin_2|1019| * |Justin_1|1032| * |Justin_2|1032| * +--------+----+ */ // mapPartition personDS.filter("age is not null").mapPartitions(line => { var result = scala.collection.mutable.ArrayBuffer[(String, Long)]() while (line.hasNext){ var person = line.next() result += ((person.name,person.age + 1000)) } result.iterator }).show() /** * +------+----+ * | _1| _2| * +------+----+ * | Andy|1030| * |Justin|1019| * |Justin|1019| * |Justin|1032| * +------+----+ */ } /** * join:根据某一个字段,关联合并两个数据集 * @param spark */ def joinWith(spark : SparkSession): Unit ={ import spark.implicits._ val peopleFilePath = Comm.fileDirPath + "people.json" val employeeFilePath = Comm.fileDirPath + "employees.json" val personDS = spark.read.json(peopleFilePath).as[Person] val employeeDS = spark.read.json(employeeFilePath).as[Employee] personDS.joinWith(employeeDS,$"name" === $"name1").show() /** * +------------+---------------+ * | _1| _2| * +------------+---------------+ * | [, Michael]|[Michael, 3000]| * | [30, Andy]| [Andy, 4500]| * |[32, Justin]| [Justin, 3500]| * |[19, Justin]| [Justin, 3500]| * |[19, Justin]| [Justin, 3500]| * +------------+---------------+ */ } /** * sort:对数据表排序 * @param spark */ def sort(spark : SparkSession): Unit ={ import spark.implicits._ val employeeFilePath = Comm.fileDirPath + "employees.json" val employeeDS = spark.read.json(employeeFilePath).as[Employee] employeeDS.show() /** * +-------+------+ * | name1|salary| * +-------+------+ * |Michael| 3000| * | Andy| 4500| * | Justin| 3500| * | Berta| 4000| * +-------+------+ */ employeeDS.sort($"salary".desc).show() /** * +-------+------+ * | name1|salary| * +-------+------+ * | Andy| 4500| * | Berta| 4000| * | Justin| 3500| * |Michael| 3000| * +-------+------+ */ // 创建临时表,使用sql语句 employeeDS.toDF().createOrReplaceTempView("employee") val result = spark.sql("select * from employee a where 1=1 order by salary desc") result.show() /** * +-------+------+ * | name1|salary| * +-------+------+ * | Andy| 4500| * | Berta| 4000| * | Justin| 3500| * |Michael| 3000| * +-------+------+ */ } /** * randomSplit:根据Array中的元素的数量进行切分,然后再给定每个元素的值来保证对切分数据的权重 * sample:根据我们设定的比例进行随机抽取 * @param spark */ def sampleAndRandomSplit(spark : SparkSession): Unit ={ import spark.implicits._ val employeeFilePath = Comm.fileDirPath + "employees.json" val employeeDS = spark.read.json(employeeFilePath).as[Employee] employeeDS.randomSplit(Array(2,5,1)).foreach(ds => ds.show()) /** * +-----+------+ * |name1|salary| * +-----+------+ * |Berta| 4000| * +-----+------+ * * +------+------+ * | name1|salary| * +------+------+ * | Andy| 4500| * |Justin| 3500| * +------+------+ * * +-------+------+ * | name1|salary| * +-------+------+ * |Michael| 3000| * +-------+------+ */ employeeDS.sample(false,0.5).show() /** * +-------+------+ * | name1|salary| * +-------+------+ * |Michael| 3000| * | Justin| 3500| * +-------+------+ */ }
二、DataSet中untype详解
案例中使用到的数据
employees.json
{"name1":"Michael", "salary":3000} {"name1":"Andy", "salary":4500} {"name1":"Justin", "salary":3500} {"name1":"Berta", "salary":4000}
dept.json
{"id":"dept-1","deptName":"开发部"} {"id":"dept-2","deptName":"市场部"}
案例代码:
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommSparkSessionScala} /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/6 * @time : 3:39 下午 */ object UnTypeOperationScala { case class Employee(deptId:String,name:String, salary:Long) case class Dept(id:String,deptName:String) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() import spark.implicits._ val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee] val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept] // join val joinDS = employeeDS.join(deptDS,$"deptId" === $"id") joinDS.show() /** * +------+-------+------+--------+------+ * |deptId| name|salary|deptName| id| * +------+-------+------+--------+------+ * |dept-1|Michael| 3000| 开发部|dept-1| * |dept-2| Andy| 4500| 市场部|dept-2| * |dept-2| Justin| 3500| 市场部|dept-2| * |dept-1| Berta| 4000| 开发部|dept-1| * +------+-------+------+--------+------+ */ // 使用算子 joinDS.groupBy("deptId","deptName").sum("salary").select("deptId","deptName","sum(salary)").orderBy($"sum(salary)".desc).show() /** * +------+--------+-----------+ * |deptId|deptName|sum(salary)| * +------+--------+-----------+ * |dept-2| 市场部| 8000| * |dept-1| 开发部| 7000| * +------+--------+-----------+ */ // 创建临时表,使用sql语句 joinDS.createOrReplaceTempView("deptSalary") val resultDF = spark.sql("select a.deptId, a.deptName ,sum(salary) as salary from deptSalary a group by a.deptId, a.deptName order by salary desc") resultDF.show() /** * +------+--------+------+ * |deptId|deptName|salary| * +------+--------+------+ * |dept-2| 市场部| 8000| * |dept-1| 开发部| 7000| * +------+--------+------+ */ } }
三、DataSet中聚合函数详解
案例中使用到的数据
employees1.json
{"deptId":"dept-1", "name":"Michael", "salary":3000} {"deptId":"dept-2", "name":"Andy", "salary":4500} {"deptId":"dept-2", "name":"Justin", "salary":3500} {"deptId":"dept-1", "name":"Berta", "salary":4000}
dept.json
{"id":"dept-1","deptName":"开发部"} {"id":"dept-2","deptName":"市场部"}
案例代码:
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommSparkSessionScala} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/6 * @time : 7:29 下午 */ object AggOptionScala { case class Employee(deptId:String,name:String, salary:Long) case class Dept(id:String,deptName:String) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() agg1(spark) agg2(spark) } /** * avg * sum * max * min * count * @param spark */ def agg1(spark : SparkSession): Unit ={ import spark.implicits._ val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee] val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept] // join val joinDS = employeeDS.join(deptDS,$"deptId" === $"id") joinDS.show() joinDS.groupBy("deptId","deptName").agg(avg("salary"),sum("salary"),max("salary"),min("salary"),count("name")).show() /** * +------+--------+-----------+-----------+-----------+-----------+-----------+ * |deptId|deptName|avg(salary)|sum(salary)|max(salary)|min(salary)|count(name)| * +------+--------+-----------+-----------+-----------+-----------+-----------+ * |dept-1| 开发部| 3500.0| 7000| 4000| 3000| 2| * |dept-2| 市场部| 4000.0| 8000| 4500| 3500| 2| * +------+--------+-----------+-----------+-----------+-----------+-----------+ */ joinDS.groupBy("deptId","deptName").agg("salary" -> "avg","salary" -> "sum","salary" -> "max","salary" -> "min","name" -> "count").show() } /** * collect_list(行转列)将一个分组内指定字段的值收集在一起,不会去重 * collect_set(行转列)意思同上,唯一的区别就是它会去重 * @param spark */ def agg2(spark : SparkSession): Unit ={ import spark.implicits._ val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee] val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept] // join val joinDS = employeeDS.join(deptDS,$"deptId" === $"id") joinDS.show() joinDS.groupBy("deptId","deptName").agg(collect_list("name")).show() joinDS.groupBy("deptId","deptName").agg(collect_set("name")).show() /** * +------+--------+-----------------+ * |deptId|deptName|collect_set(name)| * +------+--------+-----------------+ * |dept-1| 开发部| [Michael, Berta]| * |dept-2| 市场部| [Andy, Justin]| * +------+--------+-----------------+ */ } }
四、DataSet中其他函数详解
案例中使用到的数据
employees1.json
{"deptId":"dept-1", "name":"Michael", "salary":3000} {"deptId":"dept-2", "name":"Andy", "salary":4500} {"deptId":"dept-2", "name":"Justin", "salary":3500} {"deptId":"dept-1", "name":"Berta", "salary":4000}
dept.json
{"id":"dept-1","deptName":"开发部"} {"id":"dept-2","deptName":"市场部"}
案例代码:
package com.kfk.spark.sql import com.kfk.spark.common.{Comm, CommSparkSessionScala} import org.apache.spark.sql.functions._ /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/6 * @time : 8:13 下午 */ object OtherOperationScala { case class Employee(deptId:String,name:String, salary:Long) case class Dept(id:String,deptName:String) def main(args: Array[String]): Unit = { val spark = CommSparkSessionScala.getSparkSession() import spark.implicits._ val employeeDS = spark.read.json(Comm.fileDirPath + "employees1.json").as[Employee] val deptDS = spark.read.json(Comm.fileDirPath + "dept.json").as[Dept] // join val joinDS = employeeDS.join(deptDS,$"deptId" === $"id") joinDS.show() /** * 日期函数:current_date current_timestamp * 数学函数:round * 随机函数:rand * 字符串函数:concat */ joinDS.select(current_date(),current_timestamp(),rand(),round($"salary",2),concat($"name",$"salary")).show() /** * +--------------+--------------------+-------------------------+----------------+--------------------+ * |current_date()| current_timestamp()|rand(7003525560546994152)|round(salary, 2)|concat(name, salary)| * +--------------+--------------------+-------------------------+----------------+--------------------+ * | 2020-12-06|2020-12-06 21:13:...| 0.8157198990085353| 3000| Michael3000| * | 2020-12-06|2020-12-06 21:13:...| 0.8446053083722802| 4500| Andy4500| * | 2020-12-06|2020-12-06 21:13:...| 0.5323697131017762| 3500| Justin3500| * | 2020-12-06|2020-12-06 21:13:...| 0.10113425439292889| 4000| Berta4000| * +--------------+--------------------+-------------------------+----------------+--------------------+ */ } }