五、 DataFrame创建方式及功能使用
在Spark中, DataFrame是一 种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有 名称和类型。
使得SparkSQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之.上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。
反观RDD ,由于无从得知所存数据元素的具体内部结构, Spark Core只能在stage层面进行简单、通用的流水线优化。
DataFrame和RDD的对比图
RDD转换DataFrame:
准备数据集
val rdd1 = sc.textFile("file:///opt/datas/stu.txt") val rdd2 = sc.textFile("file:///opt/datas/stu1.txt") val allRdd = rdd1.union(rdd2)
将RDD转换为DataFrame,使用toDF
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x =>
lines: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
默认列名是 _1 _2
scala> lines.printSchema root |-- _1: string (nullable = true) |-- _2: integer (nullable = false)
设置Schema为key value
val lines = allRdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x =>(x._2 >1)).map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1)).toDF("key","value") lines: org.apache.spark.sql.DataFrame = [key: string, value: int]
scala> lines.printSchema root |-- key: string (nullable = true) |-- value: integer (nullable = false)
显示key
scala> lines.select("key").show +------+ | key| +------+ | java| |python| |hadoop| | scala| |spring| | hive| | php| | linux| | hue| | unix| | spark| | hbase| | mysql| | c++| | c| +------+
DataSet转换DataFrame:
准备数据集
val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" " )).map(x => (x,1))
dataSet: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
查看DataSet的数据格式
scala> dataSet.show +------+---+ | _1| _2| +------+---+ | java| 1| |python| 1| |hadoop| 1| |spring| 1| |python| 1| |hadoop| 1| | java| 1| | c| 1| | c++| 1| | hbase| 1| | spark| 1| | scala| 1| | scala| 1| |python| 1| | java| 1| | linux| 1| | unix| 1| | java| 1| | php| 1| | mysql| 1| +------+---+ only showing top 20 rows
将DataSet转化为DataFrame
val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" " )).map(x => (x,1)).toDF("key","v
这时已经转化为DataFrame的格式
dataSet: org.apache.spark.sql.DataFrame = [key: string, value: int]
DataFrame操作
dataSet.select("key","value").groupBy("key").count.sort($"count".desc).show
+------+-----+ | key|count| +------+-----+ | java| 4| |python| 3| | scala| 2| |hadoop| 2| | unix| 1| |spring| 1| | mysql| 1| | spark| 1| | linux| 1| | hue| 1| | c++| 1| | hbase| 1| | c| 1| | hive| 1| | php| 1| +------+-----+
六、DataSet创建方式及功能使用
DataSet与RDD相似,但是它们不是使用Java序列化或Kryo,而是使用专用的Encoder对对象进行序列化以进行网络处理或传输。
DataSet创建方式有两种:一种是直接通过sparkSession对象读取外部数据创建,另一种是通过RDD转换。
(1)创建DataSet方式一
通过sparkSession对象读取外部数据创建DataSet:
val dataSet = spark.read.textFile("file:///opt/datas/stu.txt")
dataSet: org.apache.spark.sql.Dataset[String] = [value: string]
查看Schema信息
scala> dataSet.printSchema root |-- value: string (nullable = true)
查看一下数据
scala> val dataSet = spark.read.textFile("file:///opt/datas/stu.txt").show +--------------------+ | value| +--------------------+ | java python hadoop| |spring python had...| | hbase spark scala| | scala python java | | linux unix java php| | mysql hive hue| +--------------------+
val lines = dataSet.flatMap(x => x.split(" ")).map(x => (x,1))
对DataSet操作
scala> lines.select("_1","_2").groupBy("_1").count.show +------+-----+ | _1|count| +------+-----+ | unix| 1| | hbase| 1| |spring| 1| | mysql| 1| | scala| 2| | spark| 1| | hue| 1| | c++| 1| | c| 1| | linux| 1| | java| 4| | php| 1| |hadoop| 2| |python| 3| | hive| 1| +------+-----+
自定义对象的映射
case class Person(username:String,usercount:Int)
defined class Person
val lines = dataSet.flatMap(x => x.split(" ")).map(x => (x,1)).map(x => Person(x._1,x._2))
这时列名就改成了刚才我们自定义的
scala> lines.show +--------+---------+ |username|usercount| +--------+---------+ | java| 1| | python| 1| | hadoop| 1| | spring| 1| | python| 1| | hadoop| 1| | java| 1| | c| 1| | c++| 1| | hbase| 1| | spark| 1| | scala| 1| | scala| 1| | python| 1| | java| 1| | linux| 1| | unix| 1| | java| 1| | php| 1| | mysql| 1| +--------+---------+ only showing top 20 rows
scala> lines.select("username","usercount").groupBy("username").count.show +--------+-----+ |username|count| +--------+-----+ | unix| 1| | hbase| 1| | spring| 1| | mysql| 1| | scala| 2| | spark| 1| | hue| 1| | c++| 1| | c| 1| | linux| 1| | java| 4| | php| 1| | hadoop| 2| | python| 3| | hive| 1| +--------+-----+
scala> lines.select("username","usercount").groupBy("username").count.sort($"count".desc).show +--------+-----+ |username|count| +--------+-----+ | java| 4| | python| 3| | scala| 2| | hadoop| 2| | hbase| 1| | hue| 1| | unix| 1| | c++| 1| | spring| 1| | mysql| 1| | spark| 1| | c| 1| | linux| 1| | php| 1| | hive| 1| +--------+-----+
scala> lines.select("username","usercount").groupBy("username").count.sort($"count".desc).toDF("username","usercount").show +--------+---------+ |username|usercount| +--------+---------+ | java| 4| | python| 3| | scala| 2| | hadoop| 2| | hbase| 1| | spring| 1| | spark| 1| | c| 1| | hue| 1| | c++| 1| | mysql| 1| | linux| 1| | php| 1| | unix| 1| | hive| 1| +--------+---------+
(2)创建DataSet方式二
通过RDD转换成DataSet
scala> val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
rdd1: org.apache.spark.rdd.RDD[String] = file:///opt/datas/stu.txt MapPartitionsRDD[187] at textFile at <console>:24
scala> val dataSet2 = rdd1.toDS
dataSet2: org.apache.spark.sql.Dataset[String] = [value: string]
这时就将RDD转换成了DataSet,
(3)创建DataSet方式三
通过DataFrame转换成DataSet
scala> val dataSet2 = rdd1.toDF.toJSON
dataSet2: org.apache.spark.sql.Dataset[String] = [value: string]
通过DataFrame转换成DataSet,查看数据发现每一行是json的格式
scala> dataSet2.show +--------------------+ | value| +--------------------+ |{"value":"java py...| |{"value":"spring ...| |{"value":"hbase s...| |{"value":"scala p...| |{"value":"linux u...| |{"value":"mysql h...| +--------------------+
可以再通过这种方式将DataSet转换为DataFrame
scala> spark.read.json(dataSet2) res23: org.apache.spark.sql.DataFrame = [value: string]
scala> spark.read.json(dataSet2).show +--------------------+ | value| +--------------------+ | java python hadoop| |spring python had...| | hbase spark scala| | scala python java | | linux unix java php| | mysql hive hue| +--------------------+
还有一种方式可以通过DataFrame转换成DataSet
scala> val rdd1 = sc.textFile("file:///opt/datas/stu.txt") scala> val df = rdd1.flatMap(x => x.split(" ")).toDF df: org.apache.spark.sql.DataFrame = [value: string]
scala> val dataSet = df.as[String] dataSet: org.apache.spark.sql.Dataset[String] = [value: string]
scala> dataSet.map(x => (x,1)).show +------+---+ | _1| _2| +------+---+ | java| 1| |python| 1| |hadoop| 1| |spring| 1| |python| 1| |hadoop| 1| | java| 1| | c| 1| | c++| 1| | hbase| 1| | spark| 1| | scala| 1| | scala| 1| |python| 1| | java| 1| | linux| 1| | unix| 1| | java| 1| | php| 1| | mysql| 1| +------+---+
scala> dataSet.map(x => (x,1)) res28: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
此时我们发现它的列名是_1,_2,我们可以将它的列名改为我们自定义对象
scala> case class Person(username:String,usercount:Int) defined class Person
scala> val df = rdd1.flatMap(x => x.split(" ")).map(x => (x,1)).toDF("username","usercount") df: org.apache.spark.sql.DataFrame = [username: string, usercount: int]
scala> val dataSet = df.as[Person] dataSet: org.apache.spark.sql.Dataset[Person] = [username: string, usercount: int]
这时列名就换过来了
scala> dataSet.show +--------+---------+ |username|usercount| +--------+---------+ | java| 1| | python| 1| | hadoop| 1| | spring| 1| | python| 1| | hadoop| 1| | java| 1| | c| 1| | c++| 1| | hbase| 1| | spark| 1| | scala| 1| | scala| 1| | python| 1| | java| 1| | linux| 1| | unix| 1| | java| 1| | php| 1| | mysql| 1| +--------+---------+
下面就可以对DataSet做一些操作
scala> dataSet.select("username","usercount").groupBy("username").count.show +--------+-----+ |username|count| +--------+-----+ | unix| 1| | hbase| 1| | spring| 1| | mysql| 1| | scala| 2| | spark| 1| | hue| 1| | c++| 1| | c| 1| | linux| 1| | java| 4| | php| 1| | hadoop| 2| | python| 3| | hive| 1| +--------+-----+
七、数据集之间的对比与转换(总结)