Spark【RDD编程(三)键值对RDD】

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Spark【RDD编程(三)键值对RDD】

简介

       键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。        

       因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以键值对的形式进行统一批处理的,因为MapReduce模型中,Mapper和Reducer之间的联系就是通过键和值进行连接产生关系的。

键值对RDD的创建

       其实就是个RDD 的创建,无非就是通过并行集合创建和通过文件系统创建,然后文件系统又分为本地文件系统和HDFS。

常用的键值对RDD转换操作

1、reduceByKey(func)

和上一篇文章中的用法一致。

2、groupByKey(func)

和上一篇文章中的用法一致。

3、keys

返回键值对 RDD 中所有的key,构成一个新的 RDD。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object KV_RDD {
  def main(args: Array[String]): Unit = {
    //创建SparkContext对象
    val conf = new SparkConf()
    conf.setAppName("kv_rdd").setMaster("local")
    val sc:SparkContext = new SparkContext(conf)
    //通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    val res: RDD[String] = rdd.keys
    res.foreach(println)
    //关闭SparkContext
   sc.stop()
  }
}

输出结果:

Spark
Hadoop
Spark
Flink

4、values

返回键值对 RDD 中所有的key,构成一个新的 RDD。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    val res: RDD[Int] = rdd.values
    res.foreach(println)

运行结果:

1
1
1
1

5、sortByKey(Boolean asce)

返回一个根据 key 排序(字典序)的RDD。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    val res: RDD[(String,Int)] = rdd.sortByKey()
    res.foreach(println)

运行结果:

(Flink,1)
(Hadoop,1)
(Spark,1)
(Spark,1)

设置升序/降序

默认我们sortByKey()方法是升序排序的,如果要降序可以传入一个false的值。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    //降序
    val res: RDD[(String,Int)] = rdd.sortByKey(false)
    res.foreach(println)

运行结果:

(Spark,1)
(Spark,1)
(Hadoop,1)
(Flink,1)

6、sortBy()

可以根据其他字段进行排序。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    //按照value升序排序
    val res: RDD[(String,Int)] = rdd.sortBy(kv=>kv._2,true)
    res.foreach(println)

运行结果:

(Spark,1)
(Hive,2)
(Flink,3)
(Hadoop,5)

7、mapValues(func)

       之前我们处理的RDD 都是文本或数字类型的,之前我们的map(func)中的func函数是对整个RDD的元素进行处理。但是这里换成了mapValues(func),这里func函数处理的是我们(key,value)中的所有value,而key 不会发生变化。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    //所有的value+1
    val res: RDD[(String,Int)] = rdd.mapValues(value=>value+1)
    res.foreach(println)

运行结果:

(Spark,2)
(Hadoop,6)
(Hive,3)
(Flink,4)

8、join()

内连接,(K,V1)和(K,V2)进行内连接生成(K,(V1,V2))。

//通过并行集合创建RDD
    val arr1 = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val arr2 = Array(("Spark","fast"),("Hadoop","good"))
    val rdd1: RDD[(String,Int)] = sc.parallelize(arr1)
    val rdd2: RDD[(String,String)] = sc.parallelize(arr2)
    //所有的value+1
//    val res: RDD[(String,(Int,Int))] = rdd1.join(rdd2)
  val res: RDD[(String, (Int, String))] = rdd1.join(rdd2)
    res.foreach(println)

运行结果:

(Spark,(1,fast))
(Hadoop,(5,good))

我们可以看到,返回的RDD 的元素都是满足连接表rdd2的K的。

9、combineByKey()

这个函数的参数比较多,下面做个介绍:

  1. createCombiner:用于将RDD中的每个元素转换为一个类型为C(V=>C)的值。这个函数在第一次遇到某个key的时候会被调用,用于创建一个累加器。
  2. mergeValue:用于将RDD中的每个value值合并到已经存在的累加器中。这个函数在遇到相同key的value时会被调用。
  3. mergeCombiners:用于将不同分区中的累加器值进行合并。这个函数在每个分区处理完后,将各个分区的累加器值进行合并。

案例-统计公司三个季度的总收入和平均收入

//通过并行集合创建RDD
    val arr = Array(("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr,3)
    val res: RDD[(String,Int,Float)] = rdd.combineByKey(
      income=>(income,1),
      (acc:(Int,Int),income)=>(acc._1+income,+acc._2+1),
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
    ).map({
      case (key,value) => (key,value._1,value._1/value._2.toFloat)
    })
    //重新分配分区 将3个分区合并为1个
    res.repartition(1).saveAsTextFile("data/kv_rdd/")

运行结果中-part-00000文件内容:

(company-3,266,88.666664)
(company-1,269,89.666664)
(company-2,254,84.666664)

其中,第一列为季度名称。第二列为总收入,第三列为平均收入。

参数解析

       第一个参数的作用是:当我们取出的RDD元素是第一次遇到的key,那么就创建一个组合器函数createCombiner(),负责将我们的键值对(K:季度名称,V:收入额)中的 V:收入额转为 C格式(总收入额,1)的格式,其中的1代表当前已经累加了一个月的收入。


       第二个参数是合并值函数 mergeValue(),它的作用是:如果遇到相同的key,比如都是"company-1",那么就对相同key的的value进行mergeValue()中定义的操作。


       第三个参数的作用是 :由于我们开启了多个分区,所以最后要对不同分区的数据进行一个对总,这个函数中定义的就是对两个 C格式 的键值对进行的操作。


最后我们进行了一个模式匹配,对于结果返回的(k,v)形式的数据,其中 k 就是指季度名称, v 是一个键值对(总收入额,月份数),我们将它转为 (季度名称,总收入额,平均收入额)。

分区1:
1-调用createCombiner()函数
(company-1,88) => (company-1,(88,1))
2-调用mergeValue()函数
(company-1,96) => (company-1,(184,2))
分区2:
1-调用createCombiner()函数
(company-1,85) => (company-1,(85,1))
3-调用mergeCombiners()函数
(company-1,(184,2)) + (company-1,(85,1)) => (company-1,(269,3))

10、flatMapValues(fubc)

       flatMapValues(func)的操作和mapValues(func)相似。它们都是对键值对类型的RDD进行操作,mapValues(func)是对(ke要,value)的value通过函数 func 进行一个处理,而key不变。而flatMapValues(func)则是对value先通过函数 func 进行处理,然后再处理后的值和key组成一系列新的键值对。


输入数据:

("k1","hadoop,spark,flink")
("k2","hadoop,hive,hbase")

处理

//通过并行集合创建RDD
    val arr = Array(("k1","hadoop,spark,flink"),("k2","hadoop,hive,hbase"))
    val rdd: RDD[(String, String)] = sc.parallelize(arr)
    //flatMapValues(func)
    //val res: Array[(String, String)] = rdd.flatMapValues(value =>   value.split(",")).collect()  
    //mapValues(func)
    val res: Array[(String, Array[String])] =rdd.mapValues(value => value.split(",")).collect()
value.split(",")).collect()
    res.foreach(println)

运行结果:

(k1,hadoop)
(k1,spark)
(k1,flink)
(k2,hadoop)
(k2,hive)
(k2,hbase)

而我们的mapValues(func)执行后的RDD集合内为:

(k1,Array("hadoop","spark","flink"))
(k2,Array("hadoop","hive","hbase"))

显然我们的flatMapValues(func)是多进行了一部扁平化的操作,将集合内的元素与key一一组成一系列心得键值对。

相关实践学习
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
128 1
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
104 2
|
2月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
2月前
|
存储 分布式计算 Hadoop
Spark编程实验一:Spark和Hadoop的安装使用
Spark编程实验一:Spark和Hadoop的安装使用
57 4
|
2月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
70 2
|
2月前
|
SQL 分布式计算 关系型数据库
Spark编程实验三:Spark SQL编程
Spark编程实验三:Spark SQL编程
46 1
|
2月前
|
分布式计算 Shell 开发工具
Spark编程实验二:RDD编程初级实践
Spark编程实验二:RDD编程初级实践
43 1
|
2月前
|
存储 分布式计算 程序员
Spark中的RDD介绍
Spark中的RDD介绍
26 0
|
2月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
200 1
|
2月前
|
分布式计算 Spark
Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
【2月更文挑战第14天】Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
51 1