大数据Spark RDD 函数 2

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark RDD 函数

4.4 聚合函数

在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就

是其中聚合函数的使用。

4.4.1 集合中聚合函数

回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列

表List中聚合函数reduce和fold源码如下:

通过代码,看看列表List中聚合函数使用:

运行截图如下所示:

fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:

聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:

4.4.2 RDD 中聚合函数

在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:

案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:

运行原理分析:

使用RDD中fold聚合函数:

查看RDD中高级聚合函数aggregate,函数声明如下:

业务需求:使用aggregate函数实现RDD中最大的两个数据,分析如下:

核心业务代码如下:

运行结果原理剖析示意图:

上述完整范例演示代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
 * RDD中聚合函数:reduce、aggregate函数
 */
object SparkAggTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // 模拟数据,1 到 10 的列表,通过并行方式创建RDD
    val datas = 1 to 10
    val datasRDD: RDD[Int] = sc.parallelize(datas, numSlices = 2)
    // 查看每个分区中的数据
    datasRDD.foreachPartition { iter =>
      println(s"p-${TaskContext.getPartitionId()}: ${iter.mkString(", ")}")
    }
    println("=========================================")
    // 使用reduce函数聚合
    val result: Int = datasRDD.reduce((tmp, item) => {
      println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
      tmp + item
    })
    println(result)
    println("=========================================")
    // 使用fold函数聚合
    val result2: Int = datasRDD.fold(0)((tmp, item) => {
      println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
      tmp + item
    })
    println(result2)
    println("=========================================")
    // 使用aggregate函数获取最大的两个值
    val top2: mutable.Seq[Int] = datasRDD.aggregate(new ListBuffer[Int]())(
      // 分区内聚合函数,每个分区内数据如何聚合 seqOp: (U, T) => U,
      (u, t) => {
        println(s"p-${TaskContext.getPartitionId()}: u = $u, t = $t")
        // 将元素加入到列表中
        u += t //
        // 降序排序
        val top = u.sorted.takeRight(2)
        // 返回
        top
      },
      // 分区间聚合函数,每个分区聚合的结果如何聚合 combOp: (U, U) => U
      (u1, u2) => {
        println(s"p-${TaskContext.getPartitionId()}: u1 = $u1, u2 = $u2")
        u1 ++= u2 // 将列表的数据合并,到u1中
        //
        u1.sorted.takeRight(2)
      }
    )
    println(top2)
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

4.4.3 PairRDDFunctions 聚合函数

在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数

据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。*ByKey函

数:将相同Key的Value进行聚合操作的,省去先分组再聚合。

  • 第一类:分组函数groupByKey
  • 第二类:分组聚合函数reduceByKey和foldByKey

    但是reduceByKey和foldByKey聚合以后的结果数据类型与RDD中Value的数据类型是一样的。
  • 第三类:分组聚合函数aggregateByKey

在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,

基本上都能完成任意聚合功能。

演示范例代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * RDD中聚合函数,针对RDD中数据类型Key/Value对:
 * groupByKey
 * reduceByKey/foldByKey
 * aggregateByKey
 * combineByKey
 */
object SparkAggByKeyTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // 1、并行化集合创建RDD数据集
    val linesSeq: Seq[String] = Seq(
      "hadoop scala hive spark scala sql sql", //
      "hadoop scala spark hdfs hive spark", //
      "spark hdfs spark hdfs scala hive spark" //
    )
    val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
    // 2、分割单词,转换为二元组
    val wordsRDD: RDD[(String, Int)] = inputRDD
      .flatMap(line => line.split("\\s+"))
      .map(word => word -> 1)
    // TODO: 先使用groupByKey函数分组,再使用map函数聚合
    val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey()
    val resultRDD: RDD[(String, Int)] = wordsGroupRDD.map { case (word, values) =>
      val count: Int = values.sum
      word -> count
    }
    println(resultRDD.collectAsMap())
    // TODO: 直接使用reduceByKey或foldByKey分组聚合
    val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey((tmp, item) => tmp + item)
    println(resultRDD2.collectAsMap())
    val resultRDD3 = wordsRDD.foldByKey(0)((tmp, item) => tmp + item)
    println(resultRDD3.collectAsMap())
    // TODO: 使用aggregateByKey聚合
    /*
    def aggregateByKey[U: ClassTag]
    (zeroValue: U) // 聚合中间临时变量初始值,类似fold函数zeroValue
    (
    seqOp: (U, V) => U, // 各个分区内数据聚合操作函数
    combOp: (U, U) => U // 分区间聚合结果的聚合操作函数
    ): RDD[(K, U)]
    */
    val resultRDD4 = wordsRDD.aggregateByKey(0)(
      (tmp: Int, item: Int) => {
        tmp + item
      },
      (tmp: Int, result: Int) => {
        tmp + result
      }
    )
    println(resultRDD4.collectAsMap())
    // 应用程序运行结束,关闭资源
    Thread.sleep(1000000)
    sc.stop()
  }
}

4.4.4 面试题

RDD中groupByKey和reduceByKey区别???

  • reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,
  • 将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
  • groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同
    key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。

4.5 关联函数

当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。

首先回顾一下SQL JOIN,用Venn图表示如下:


RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:

具体看一下join(等值连接)函数说明:

范例演示代码:

import org.apache.sp
import org.apache.spark.{SparkConf, SparkContext}
/**
 * RDD中关联函数Join,针对RDD中数据类型为Key/Value对
 *
 */
object SparkJoinTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // 模拟数据集
    val empRDD: RDD[(Int, String)] = sc.parallelize(
      Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu"))
    )
    val deptRDD: RDD[(Int, String)] = sc.parallelize(
      Seq((1001, "sales"), (1002, "tech"))
    )
    /*
    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    */
    val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
    println(joinRDD.collectAsMap())
    /*
    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    */
    val leftJoinRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
    println(leftJoinRDD.collectAsMap())
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

5 函数练习

RDD中的函数有很多,不同业务需求使用不同函数进行数据处理分析,下面仅仅展示出比较常

用的函数使用,更多函数在实际中使用体会,多加练习理解。

5.1 map 函数

对RDD中的每一个元素进行操作并返回操作的结果。

5.2 filter 函数

函数中返回True的被留下,返回False的被过滤掉。

5.3 flatMap 函数

对RDD中的每一个元素进行先map再压扁,最后返回操作的结果。

5.4 交集、并集、差集、笛卡尔积

数学集合中操作,类似Scala集合类Set中相关函数,注意类型要一致。

5.5 distinct 函数

对RDD中元素进行去重,与Scala集合中distinct类似。

5.6 first、take、top 函数

从RDD中获取某些元素,比如first为第一个元素,take为前N个元素,top为最大的N个元素。

5.7 keys、values 函数

针对RDD中数据类型为KeyValue对时,获取所有key和value的值,类似Scala中Map集合。

5.8 mapValues 函数

mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后。

5.9 collectAsMap 函数

当RDD中数据类型为Key/Value对时,转换为Map集合。

5.10 mapPartitionsWithIndex 函数

取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的。


相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
4月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
212 0
|
7月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
296 79
|
11月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
193 0
|
8月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
312 15
|
8月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
163 0
【赵渝强老师】Spark RDD的缓存机制
|
11月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
463 2
|
10天前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
76 14
|
2月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
82 0
|
3月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
97 4
|
11天前
|
传感器 人工智能 监控
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
数据下田,庄稼不“瞎种”——聊聊大数据如何帮农业提效
80 14

热门文章

最新文章