spark-2.0-从RDD到DataSet

简介: DataSet API和DataFrame两者结合起来,DataSet中许多的API模仿了RDD的API,实现不太一样,但是基于RDD的代码很容易移植过来。 spark未来基本是要在DataSet上扩展了,因为spark基于spark core关注的东西很多,整合内部代码是必然的。

DataSet API和DataFrame两者结合起来,DataSet中许多的API模仿了RDD的API,实现不太一样,但是基于RDD的代码很容易移植过来。
spark未来基本是要在DataSet上扩展了,因为spark基于spark core关注的东西很多,整合内部代码是必然的。
1、加载文件

val rdd = sparkContext.textFile("./data.txt")
val ds = sparkSession.read.text("./data.txt")

2、计算总数

rdd.count()

ds.count()

3、wordcount实例

val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
val wordsPairs = wordsRDD.map(word => (word,1))
val wordCount = wordsPairs.reduceByKey(_+_)
import sparkSession.implicits._
val wordsDs = ds.flatMap(value => value.split("\\s+"))
val wordsPairDs = wordsDs.groupByKey(value => value)
val wordCounts = wordsPairDs.count()

4、缓存

rdd.cache()

ds.cache()

5、过滤

val filterRDD = wordsRDD.filter(value => value=="hello")

val filterDs = wordsDs.filter(value => value = "hello")

6、map partition

val mapPartitionsRDD = rdd.mapPartitions(iterator => List(iterator.count(value=>true)).iterator)

val mapPartitionsDs = ds.mapPartitions(iterator => List(iterator.count(value=>true)).iterator)

7 、reduceByKey

val reduceCountByRDD = wordsPair.reduceByKey(_+_)

val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))

8、RDD和 DataSet互换

val dsToRDD = ds.rdd
val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]

9、double

val doubleRDD = sparkContext.makeRDD(List(1.0,5.0,8.9,9.0))
val rddSum =doubleRDD.sum()
val rddMean = doubleRDD.mean()
val rowRDD = doubleRDD.map(value => Row.fromSeq(List(value)))
val schema = StructType(Array(StructField("value",DoubleType)))
val doubleDS = sparkSession.createDataFrame(rowRDD,schema)
import org.apache.spark.sql.functions._
doubleDS.agg(sum("value"))
doubleDS.agg(mean("value"))

10、reduce

val rddReduce = doubleRDD.reduce((a,b) => a +b)
val dsReduce = doubleDS.reduce((row1,row2) =>Row(row1.getDouble(0) + row2.getDouble(0)))

code

import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object RDDToDataSet {

  def main(args: Array[String]) {

    val sparkSession = SparkSession.builder.master("local")
                                           .appName("example")
                                           .getOrCreate()
    val sparkContext = sparkSession.sparkContext
    //read data from text file
    val rdd = sparkContext.textFile("src/main/resources/data.txt")
    val ds = sparkSession.read.text("src/main/resources/data.txt")

    // do count
    println("count ")
    println(rdd.count())
    println(ds.count())

    // wordcount
    println(" wordcount ")

    val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
    val wordsPair = wordsRDD.map(word => (word,1))
    val wordCount = wordsPair.reduceByKey(_+_)
    println(wordCount.collect.toList)

    import sparkSession.implicits._
    val wordsDs = ds.flatMap(value => value.split("\\s+"))
    val wordsPairDs = wordsDs.groupByKey(value => value)
    val wordCountDs = wordsPairDs.count
    wordCountDs.show()

    //cache
    rdd.cache()
    ds.cache()

    //filter

    val filteredRDD = wordsRDD.filter(value => value =="hello")
    println(filteredRDD.collect().toList)

    val filteredDS = wordsDs.filter(value => value =="hello")
    filteredDS.show()


    //map partitions

    val mapPartitionsRDD = rdd.mapPartitions(iterator => 
    List(iterator.count(value => true)).iterator)
    println(s" the count each partition is ${mapPartitionsRDD.collect().toList}")

    val mapPartitionsDs = ds.mapPartitions(iterator => 
    List(iterator.count(value => true)).iterator)
    mapPartitionsDs.show()

    //converting to each other
    val dsToRDD = ds.rdd
    println(dsToRDD.collect())

    val rddStringToRowRDD = rdd.map(value => Row(value))
    val dfschema = StructType(Array(StructField("value",StringType)))
    val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
    val rDDToDataSet = rddToDF.as[String]
    rDDToDataSet.show()

    // double based operation

    val doubleRDD = sparkContext.makeRDD(List(1.0,5.0,8.9,9.0))
    val rddSum =doubleRDD.sum()
    val rddMean = doubleRDD.mean()

    println(s"sum is $rddSum")
    println(s"mean is $rddMean")

    val rowRDD = doubleRDD.map(value => Row.fromSeq(List(value)))
    val schema = StructType(Array(StructField("value",DoubleType)))
    val doubleDS = sparkSession.createDataFrame(rowRDD,schema)

    import org.apache.spark.sql.functions._
    doubleDS.agg(sum("value")).show()
    doubleDS.agg(mean("value")).show()

    //reduceByKey API
    val reduceCountByRDD = wordsPair.reduceByKey(_+_)
    val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))

    println(reduceCountByRDD.collect().toList)
    println(reduceCountByDs.collect().toList)

    //reduce function
    val rddReduce = doubleRDD.reduce((a,b) => a +b)
    val dsReduce = doubleDS.reduce((row1,row2) =>
    Row(row1.getDouble(0) + row2.getDouble(0)))

    println("rdd reduce is " +rddReduce +" dataset reduce "+dsReduce)

  }

}
目录
相关文章
|
23天前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
【赵渝强老师】Spark RDD的缓存机制
|
1月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
98 15
|
4月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
5月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
59 0
|
5月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
154 0
|
5月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
66 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
5月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
78 0
|
5月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
109 0
|
5月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
88 0
|
5月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
79 4