bigdata-36-Spark转换算子与动作算子

简介: bigdata-36-Spark转换算子与动作算子

创建RDD

RDD是Spark编程核心,在进行Spark编程时,首要任务是创建一个初始的RDD,这样就相当于设置了Spark应用程序的输入源数据,然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件

  1. 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
  2. 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
  3. 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作

使用集合创建RDD

如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。

相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。

调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。

Spark会为每一个partition运行一个task来进行处理。

Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

Scala版本代码如下

package com.bigdata.scala

import org.apache.spark.{SparkConf, SparkContext}

/**

* 需求:使用集合创建RDD

* Created by 颜

*/

object CreateRddByArrayScala {

def main(args: Array[String]): Unit = {

   //创建SparkContext

   val conf = new SparkConf()

   conf.setAppName("CreateRddByArrayScala")

     .setMaster("local")

   val sc = new SparkContext(conf)

   //创建集合

   val arr = Array(1,2,3,4,5)

   //基于集合创建RDD

   val rdd = sc.parallelize(arr)

   //对集合中的数据求和

   val sum = rdd.reduce( + )

   //注意:这行println代码是在driver进程中执行的

   println(sum)

}

}

结果:15

注意:


val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行


parallelize还有reduce之类的操作是在worker节点中执行的

转换算子

Spark2.4.3官方文档:Overview - Spark 2.4.4 Documentation (apache.org)

选择RDDs,UserGuide

转换算子如下

常用算子介绍:

转换算子操作开发实战

前置需求分析操作如下

  1. map:对集合中每个元素乘以2
  2. filter:过滤出集合中的偶数
  3. flatMap:将行拆分为单词
  4. groupByKey:对每个大区的主播进行分组
  5. reduceByKey:统计每个大区的主播数量
  6. sortByKey:对主播的音浪收入排序
  7. join:打印每个主播的大区信息和音浪收入
  8. distinct:统计当天开播的大区信息

map:

def mapOp(sc: SparkContext): Unit = {

   val dataRDD = sc.parallelize(Array(1,2,3,4,5))

   dataRDD.map(_ * 2).foreach(println(_))

}

filter:

def filterOp(sc: SparkContext): Unit = {

   val dataRDD = sc.parallelize(Array(1,2,3,4,5))

   //满足条件的保留下来

   dataRDD.filter( % 2 == 0).foreach(println())

}

flatMap:

def flatMapOp(sc: SparkContext): Unit = {

   val dataRDD = sc.parallelize(Array("good good study","day day up"))

   dataRDD.flatMap(.split(" ")).foreach(println())

}

groupByKey:一元情况

def groupByKeyOp(sc: SparkContext): Unit = {

   val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))

   //需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作

   //此时的key就是tuple中的第一列,其实在这里就可以把这个tuple认为是一个key-value

   //注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装成tuple2这种形式

   //此时map算子之后生成的新的数据格式是这样的:("US",150001)

   //如果tuple中的数据列数超过了2列怎么办?看groupByKeyOp2

   dataRDD.map(tup=>(tup.2,tup.1)).groupByKey().foreach(tup=>{

     //获取大区信息

     val area = tup._1

     print(area+":")

     //获取同一个大区对应的所有用户id

     val it = tup._2

     for(uid <- it){

       print(uid+" ")

     }

     println()

   })

}

groupByKey:tuple是二元的情况

def groupByKeyOp2(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array((150001, "US","male"), (150002, "CN","female"), (150003, "CN","male"), (150004, "IN","female")))

//如果tuple中的数据列数超过了2列怎么办?

//把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下

//此时map算子之后生成的新的数据格式是这样的:("US",(150001,"male"))

//注意:如果你的数据结构比较负责,你可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式

dataRDD.map(tup=>(tup.2,(tup.1,tup._3))).groupByKey().foreach(tup=>{

   //获取大区信息

   val area = tup._1

   print(area+":")

   //获取同一个大区对应的所有用户id和性别信息

   val it = tup._2

   for((uid,sex) <- it){

     print("<"+uid+","+sex+"> ")

   }

   println()

})

}

reduceByKey:

def reduceByKeyOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))

//由于这个需求只需要使用到大区信息,所以在mao操作的时候只保留大区信息即可

dataRDD.map(tup=>(tup.2,1)).reduceByKey( + ).foreach(println())

}

sortByKey:

def sortByKeyOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))

//由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置和互换

/*dataRDD.map(tup=>(tup._2,tup._1))

   .sortByKey(false)//默认是正序,第一个参数为true,想要倒序需要把这个参数设置为false

   .foreach(println(_))*/

//sortBy的使用:可以动态指定排序字段,比较灵活

dataRDD.sortBy(.2,false).foreach(println(_))

dataRDD.sortByKey()

}

join:

def joinOp(sc: SparkContext): Unit = {

val dataRDD1 = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))

val dataRDD2 = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))

val joinRDD = dataRDD1.join(dataRDD2)

//joinRDD.foreach(println(_))

joinRDD.foreach(tup=>{

   //用户id

   val uid = tup._1

   val area_gold = tup._2

   //大区

   val area = area_gold._1

   //音浪收入

   val gold = area_gold._2

   println(uid+"\t"+area+"\t"+gold)

})

}

distinct:

def distinctOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))

//由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息

dataRDD.map(.2).distinct().foreach(println(_))

}

获取配置:

def getSparkContext = {

val conf = new SparkConf()

conf.setAppName("WordCountScala")

   .setMaster("local")

new SparkContext(conf)

}

主方法一览:

def main(args: Array[String]): Unit = {

val sc = getSparkContext

//map:对集合中每个元素乘以2

//mapOp(sc)

//filter:过滤出集合中的偶数

//filterOp(sc)

//flatMap:将行拆分为单词

//flatMapOp(sc)

//groupByKey:对每个大区的主播进行分组

//groupByKeyOp(sc)

//groupByKeyOp2(sc)

//reduceByKey:统计每个大区的主播数量

//reduceByKeyOp(sc)

//sortByKey:对主播的音浪收入排序

//sortByKeyOp(sc)

//join:打印每个主播的大区信息和音浪收入

//joinOp(sc)

//distinct:统计当天开播的大区信息

//distinctOp(sc)

sc.stop()

}

常用Action介绍

Action操作开发实战

下面针对常见的Action算子来写一些具体案例

  1. reduce:聚合计算
  2. collect:获取元素集合
  3. take(n):获取前n个元素
  4. count:获取元素总数
  5. saveAsTextFile:保存文件
  6. countByKey:统计相同的key出现多少次
  7. foreach:迭代遍历元素

获取配置:

def getSparkContext = {

val conf = new SparkConf()

conf.setAppName("ActionOpScala")

   .setMaster("local")

new SparkContext(conf)

}

reduce:

def reduceOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array(1,2,3,4,5))

val num = dataRDD.reduce( + )

println(num)

}

collect:

def collectOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array(1,2,3,4,5))

//collect返回的是一个Array数组

//注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点

//如果想要获取几条数据,查看一下数据格式,可以使用take(n)

val res = dataRDD.collect()

for(item <- res){

   println(item)

}

}

take(n):

def takeOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array(1,2,3,4,5))

//从RDD中获取前2个元素

val res = dataRDD.take(2)

for(item <- res){

   println(item)

}

}

count:

def countOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array(1,2,3,4,5))

val res = dataRDD.count()

println(res)

}

saveAsTextFile:

def saveAsTextFileOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array(1,2,3,4,5))

//指定HDFS的路径信息即可,需要指定一个不存在的目录

dataRDD.saveAsTextFile("hdfs://bigdata:9000/out001")

}

countByKey:

def countByKeyOp(sc: SparkContext): Unit = {

val daraRDD = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))

//返回的是一个map类型的数据

val res = daraRDD.countByKey()

for((k,v) <- res){

   println(k+","+v)

}

}

foreach:

def foreachOp(sc: SparkContext): Unit = {

val dataRDD = sc.parallelize(Array(1,2,3,4,5))

//注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的

//实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach

//在里面实现具体向外部输出数据的代码

dataRDD.foreach(println(_))

}

main方法:

def main(args: Array[String]): Unit = {

val sc = getSparkContext

//reduce:聚合计算

//reduceOp(sc)

//collect:获取元素集合

//collectOp(sc)

//take(n):获取前n个元素

//takeOp(sc)

//count:获取元素总数

//countOp(sc)

//saveAsTextFile:保存文件

//saveAsTextFileOp(sc)

//countByKey:统计相同的key出现多少次

//countByKeyOp(sc)

//foreach:迭代遍历元素

//foreachOp(sc)

sc.stop()

}

目录
相关文章
|
8天前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
8天前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
8天前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
108 2
|
8天前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
60 1
|
8天前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
9月前
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
8天前
|
机器学习/深度学习 分布式计算 数据库连接
[Spark精进]必须掌握的4个RDD算子之filter算子
[Spark精进]必须掌握的4个RDD算子之filter算子
53 2
|
8天前
|
分布式计算 Spark
[Spark精进]必须掌握的4个RDD算子之flatMap算子
[Spark精进]必须掌握的4个RDD算子之flatMap算子
56 0
|
8天前
|
机器学习/深度学习 分布式计算 数据处理
[Spark精进]必须掌握的4个RDD算子之mapPartitions算子
[Spark精进]必须掌握的4个RDD算子之mapPartitions算子
58 0
|
8天前
|
分布式计算 Scala Spark
[Spark精进]必须掌握的4个RDD算子之map算子
[Spark精进]必须掌握的4个RDD算子之map算子
37 0