大数据Spark RDD 函数 1

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark RDD 函数

1 函数分类

有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率。在开发并行程序时,可以利用类似 Fork/Join 的框架将一个大的任务切分成细小的任务,每个小任务模块之间是相互独立的,可以并行执行,然后将所有小任务的结果汇总起来,得到最终的结果。一个非常好的例子便是归并排序。对整个序列进行排序时,可以将序列切分成多个子序列进行排序,然后将排好序的子序列归并起来得到最终的结果。

f5aa5603ee474a198100d1e17f11a6fb.png

对 Hadoop 有所了解的读者都知道 map、reduce 操作。对于大量的数据,我们可以通过map 操作让不同的集群节点并行计算,之后通过 reduce 操作将结果整合起来得到最终输出。


对于 Spark 处理的大量数据而言,会将数据切分后放入RDD作为Spark 的基本数据结构,开发者可以在 RDD 上进行丰富的操作,之后 Spark 会根据操作调度集群资源进行计算。总结起来,RDD 的操作主要可以分为 Transformation 和 Action 两种。


dab70a8414614055a9c7f32734a41df9.png

官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

RDD中操作(函数、算子)分为两类:


1)、Transformation转换操作:返回一个新的RDD

which create a new dataset from an existing one

所有Transformation函数都是Lazy,不会立即执行,需要Action函数触发

2)、Action动作操作:返回值不是RDD(无返回值或返回其他的)

which return a value to the driver program after running a computation on the datase

所有Action函数立即执行(Eager),比如count、first、collect、take等


550abb460127496989124eaadbeafde9.png

此外注意RDD中函数细节:


第一点:RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数);

第二点:RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行。之所以使用惰性求值/

延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。

2 Transformation函数


在Spark中Transformation操作表示将一个RDD通过一系列操作变为另一个RDD的过程,这个操作可能是简单的加减操作,也可能是某个函数或某一系列函数。值得注意的是Transformation操作并不会触发真正的计算,只会建立RDD间的关系图。

如下图所示,RDD内部每个方框是一个分区。假设需要采样50%的数据,通过sample函数,从 V1、V2、U1、U2、U3、U4 采样出数据 V1、U1 和 U4,形成新的RDD。


94ce800e31ac4fd3bf1d541193e2a4fb.png

常用Transformation转换函数,加上底色为重要函数,重点讲解常使用函数:

3 Action函数

不同于Transformation操作,Action操作代表一次计算的结束,不再产生新的 RDD,将结果返回到Driver程序或者输出到外部。所以Transformation操作只是建立计算关系,而Action 操作才是实际的执行者。每个Action操作都会调用SparkContext的runJob 方法向集群正式提交请求,所以每个Action操作对应一个Job。常用Action执行函数,加上底色为重要函数,后续重点讲解。

4 重要函数

RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。

主要常见使用函数如下,一一通过演示范例讲解。

4.1 基本函数

RDD中map、filter、flatMap及foreach等函数为最基本函数,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。


map 函数:

map(f:T=>U) : RDD[T]=>RDD[U],表示将 RDD 经由某一函数 f 后,转变为另一个RDD。

flatMap 函数:

flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]),表示将 RDD 经由某一函数 f 后,转变为一

个新的 RDD,但是与 map 不同,RDD 中的每一个元素会被映射成新的 0 到多个元素

(f 函数返回的是一个序列 Seq)。

filter 函数:

filter(f:T=>Bool) : RDD[T]=>RDD[T],表示将 RDD 经由某一函数 f 后,只保留 f 返回

为 true 的数据,组成新的 RDD。

foreach 函数:

foreach(func),将函数 func 应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如 Redis。关于 foreach,在后续章节中还会使用,到时会详细介绍它的使用方法及注意事项。

saveAsTextFile 函数:

saveAsTextFile(path:String),数据集内部的元素会调用其 toString 方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS 等。上述函数基本上都使用过,在后续的案例中继续使用,此处不再单独演示案例。

4.2 分区操作函数

每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替、foreache函数使用foreachPartition代替。

针对词频统计WordCount代码进行修改,针对分区数据操作,范例代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
/**
 * 分区操作函数:mapPartitions和foreachPartition
 */
object SparkIterTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // TODO: 1、从文件系统加载数据,创建RDD数据集
    val inputRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
    // TODO: 2、处理数据,调用RDD集合中函数(类比于Scala集合类中列表List)
    /*
    def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false
    ): RDD[U]
    */
    val wordcountsRDD: RDD[(String, Int)] = inputRDD
      // 将每行数据按照分隔符进行分割,将数据扁平化
      .flatMap(line => line.trim.split("\\s+"))
      // TODO: 针对每个分区数据操作
      .mapPartitions { iter =>
        // iter 表示RDD中每个分区中的数据,存储在迭代器中,相当于列表List
        iter.map(word => (word, 1))
      }
      // 按照Key聚合统计, 先按照Key分组,再聚合统计(此函数局部聚合,再进行全局聚合)
      .reduceByKey((a, b) => a + b)
    // TODO: 3、输出结果RDD到本地文件系统
    wordcountsRDD.foreachPartition { datas =>
      // 获取各个分区ID
      val partitionId: Int = TaskContext.getPartitionId()
      // val xx: Iterator[(String, Int)] = datas
      datas.foreach { case (word, count) =>
        println(s"p-${partitionId}: word = $word, count = $count")
      }
    }
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

为什么要对分区操作,而不是对每个数据操作,好处在哪里呢???

  • 应用场景:处理网站日志数据,数据量为10GB,统计各个省份PV和UV。
  1. 假设10GB日志数据,从HDFS上读取的,此时RDD的分区数目:80 分区;
  1. 但是分析PV和UV有多少条数据:34,存储在80个分区中,实际项目中降低分区数目,比
    如设置为2个分区。
    .

4.3 重分区函数

如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。

  • 1)、增加分区函数
  1. 函数名称:repartition,此函数使用的谨慎,会产生Shuffle。
  • 2)、减少分区函数
  1. 函数名称:coalesce,此函数不会产生Shuffle,当且仅当降低RDD分区数目。
  2. 比如RDD的分区数目为10个分区,此时调用rdd.coalesce(12),不会对RDD进行任何操作。
  • 3)、调整分区函数
  1. 在PairRDDFunctions(此类专门针对RDD中数据类型为KeyValue对提供函数)工具类中
    partitionBy函数:

    范例演示代码,适当使用函数调整RDD分区数目:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
 * RDD中分区函数,调整RDD分区数目,可以增加分区和减少分区
 */
object SparkPartitionTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }
    sc.setLogLevel("WARN")
    // 读取本地文件系统文本文件数据
    val datasRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
    // TODO: 增加RDD分区数
    val etlRDD: RDD[String] = datasRDD.repartition(3)
    println(s"EtlRDD 分区数目 = ${etlRDD.getNumPartitions}")
    // 词频统计
    val resultRDD: RDD[(String, Int)] = etlRDD
      // 数据分析,考虑过滤脏数据
      .filter(line => null != line && line.trim.length > 0)
      // 分割单词,注意去除左右空格
      .flatMap(line => line.trim.split("\\s+"))
      // 转换为二元组,表示单词出现一次
      .mapPartitions { iter =>
        iter.map(word => (word, 1))
      }
      // 分组聚合,按照Key单词
      .reduceByKey((tmp, item) => tmp + item)
    // 输出结果RDD
    resultRDD
      // TODO: 对结果RDD降低分区数目
      .coalesce(1)
      .foreachPartition(iter => iter.foreach(println))
    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

在实际开发中,什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????

  • 第一点:增加分区数目
  1. 当处理的数据很多的时候,可以考虑增加RDD的分区数目
  • 第二点:减少分区数目
  1. 其一:当对RDD数据进行过滤操作(filter函数)后,考虑是否降低RDD分区数目

其二:当对结果RDD存储到外部系统


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
143 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
118 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
88 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
72 1
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
48 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
104 0

热门文章

最新文章