大数据Spark RDD 函数 2

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据Spark RDD 函数

4.4 聚合函数

在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就

是其中聚合函数的使用。

4.4.1 集合中聚合函数

回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列

表List中聚合函数reduce和fold源码如下:

通过代码,看看列表List中聚合函数使用:

运行截图如下所示:

fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:

聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:

4.4.2 RDD 中聚合函数

在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:

案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:

运行原理分析:

使用RDD中fold聚合函数:

查看RDD中高级聚合函数aggregate,函数声明如下:

业务需求:使用aggregate函数实现RDD中最大的两个数据,分析如下:

核心业务代码如下:

运行结果原理剖析示意图:

上述完整范例演示代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
 * RDD中聚合函数:reduce、aggregate函数
 */
object SparkAggTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // 模拟数据,1 到 10 的列表,通过并行方式创建RDD
    val datas = 1 to 10
    val datasRDD: RDD[Int] = sc.parallelize(datas, numSlices = 2)
    // 查看每个分区中的数据
    datasRDD.foreachPartition { iter =>
      println(s"p-${TaskContext.getPartitionId()}: ${iter.mkString(", ")}")
    }
    println("=========================================")
    // 使用reduce函数聚合
    val result: Int = datasRDD.reduce((tmp, item) => {
      println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
      tmp + item
    })
    println(result)
    println("=========================================")
    // 使用fold函数聚合
    val result2: Int = datasRDD.fold(0)((tmp, item) => {
      println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
      tmp + item
    })
    println(result2)
    println("=========================================")
    // 使用aggregate函数获取最大的两个值
    val top2: mutable.Seq[Int] = datasRDD.aggregate(new ListBuffer[Int]())(
      // 分区内聚合函数,每个分区内数据如何聚合 seqOp: (U, T) => U,
      (u, t) => {
        println(s"p-${TaskContext.getPartitionId()}: u = $u, t = $t")
        // 将元素加入到列表中
        u += t //
        // 降序排序
        val top = u.sorted.takeRight(2)
        // 返回
        top
      },
      // 分区间聚合函数,每个分区聚合的结果如何聚合 combOp: (U, U) => U
      (u1, u2) => {
        println(s"p-${TaskContext.getPartitionId()}: u1 = $u1, u2 = $u2")
        u1 ++= u2 // 将列表的数据合并,到u1中
        //
        u1.sorted.takeRight(2)
      }
    )
    println(top2)
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

4.4.3 PairRDDFunctions 聚合函数

在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数

据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。*ByKey函

数:将相同Key的Value进行聚合操作的,省去先分组再聚合。

  • 第一类:分组函数groupByKey
  • 第二类:分组聚合函数reduceByKey和foldByKey

    但是reduceByKey和foldByKey聚合以后的结果数据类型与RDD中Value的数据类型是一样的。
  • 第三类:分组聚合函数aggregateByKey

在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,

基本上都能完成任意聚合功能。

演示范例代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * RDD中聚合函数,针对RDD中数据类型Key/Value对:
 * groupByKey
 * reduceByKey/foldByKey
 * aggregateByKey
 * combineByKey
 */
object SparkAggByKeyTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // 1、并行化集合创建RDD数据集
    val linesSeq: Seq[String] = Seq(
      "hadoop scala hive spark scala sql sql", //
      "hadoop scala spark hdfs hive spark", //
      "spark hdfs spark hdfs scala hive spark" //
    )
    val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
    // 2、分割单词,转换为二元组
    val wordsRDD: RDD[(String, Int)] = inputRDD
      .flatMap(line => line.split("\\s+"))
      .map(word => word -> 1)
    // TODO: 先使用groupByKey函数分组,再使用map函数聚合
    val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey()
    val resultRDD: RDD[(String, Int)] = wordsGroupRDD.map { case (word, values) =>
      val count: Int = values.sum
      word -> count
    }
    println(resultRDD.collectAsMap())
    // TODO: 直接使用reduceByKey或foldByKey分组聚合
    val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey((tmp, item) => tmp + item)
    println(resultRDD2.collectAsMap())
    val resultRDD3 = wordsRDD.foldByKey(0)((tmp, item) => tmp + item)
    println(resultRDD3.collectAsMap())
    // TODO: 使用aggregateByKey聚合
    /*
    def aggregateByKey[U: ClassTag]
    (zeroValue: U) // 聚合中间临时变量初始值,类似fold函数zeroValue
    (
    seqOp: (U, V) => U, // 各个分区内数据聚合操作函数
    combOp: (U, U) => U // 分区间聚合结果的聚合操作函数
    ): RDD[(K, U)]
    */
    val resultRDD4 = wordsRDD.aggregateByKey(0)(
      (tmp: Int, item: Int) => {
        tmp + item
      },
      (tmp: Int, result: Int) => {
        tmp + result
      }
    )
    println(resultRDD4.collectAsMap())
    // 应用程序运行结束,关闭资源
    Thread.sleep(1000000)
    sc.stop()
  }
}

4.4.4 面试题

RDD中groupByKey和reduceByKey区别???

  • reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,
  • 将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
  • groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同
    key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。

4.5 关联函数

当两个RDD的数据类型为二元组Key/Value对时,可以依据Key进行关联Join。

首先回顾一下SQL JOIN,用Venn图表示如下:


RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下:

具体看一下join(等值连接)函数说明:

范例演示代码:

import org.apache.sp
import org.apache.spark.{SparkConf, SparkContext}
/**
 * RDD中关联函数Join,针对RDD中数据类型为Key/Value对
 *
 */
object SparkJoinTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // 模拟数据集
    val empRDD: RDD[(Int, String)] = sc.parallelize(
      Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu"))
    )
    val deptRDD: RDD[(Int, String)] = sc.parallelize(
      Seq((1001, "sales"), (1002, "tech"))
    )
    /*
    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    */
    val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
    println(joinRDD.collectAsMap())
    /*
    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    */
    val leftJoinRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
    println(leftJoinRDD.collectAsMap())
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

5 函数练习

RDD中的函数有很多,不同业务需求使用不同函数进行数据处理分析,下面仅仅展示出比较常

用的函数使用,更多函数在实际中使用体会,多加练习理解。

5.1 map 函数

对RDD中的每一个元素进行操作并返回操作的结果。

5.2 filter 函数

函数中返回True的被留下,返回False的被过滤掉。

5.3 flatMap 函数

对RDD中的每一个元素进行先map再压扁,最后返回操作的结果。

5.4 交集、并集、差集、笛卡尔积

数学集合中操作,类似Scala集合类Set中相关函数,注意类型要一致。

5.5 distinct 函数

对RDD中元素进行去重,与Scala集合中distinct类似。

5.6 first、take、top 函数

从RDD中获取某些元素,比如first为第一个元素,take为前N个元素,top为最大的N个元素。

5.7 keys、values 函数

针对RDD中数据类型为KeyValue对时,获取所有key和value的值,类似Scala中Map集合。

5.8 mapValues 函数

mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后。

5.9 collectAsMap 函数

当RDD中数据类型为Key/Value对时,转换为Map集合。

5.10 mapPartitionsWithIndex 函数

取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的。


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
20天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
55 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
11天前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
19天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
69 2
|
20天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
59 1
|
20天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
21天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
50 1
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0