Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)

简介:

本博文的主要内容是:

1、rdd基本操作实战

2、transformation和action流程图

3、典型的transformation和action

 

 

 

RDD有3种操作:

1、  Trandformation      对数据状态的转换,即所谓算子的转换

2、  Action    触发作业,即所谓得结果的

3、  Contoller  对性能、效率和容错方面的支持,如cache、persist、checkpoint

Contoller包括cache、persist、checkpoint。

 

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

传入类型是T,返回类型是U。

 

 

 

元素之间,为什么reduce操作,要符合结合律和交换律?
答:因为,交换律,不知,哪个数据先过来。所以,必须符合交换律。
在交换律基础上,想要reduce操作,必须要符合结合律。

/**

* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

RDD.scala(源码)


这里,新建包com.zhouls.spark.cores

package com.zhouls.spark.cores

/**
* Created by Administrator on 2016/9/27.
*/
object TextLines {

}


下面,开始编代码

本地模式

自动 ,会写好

源码来看,

所以, val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身

 

 

val lineCount = lines.map(line => (line,1)) //每一行变成行的内容与1构成的Tuple


val textLines = lineCount.reduceByKey(_+_)


textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))

 成功!



 现在,将此行代码,

     textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
 

总结:

本地模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行正常,因为在本地模式下,是jvm,但这样书写,是不正规的。

 

 

 

集群模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行无法通过,因为结果是分布在各个节点上。
 
collect源码:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

得出,collect后array中就是一个元素,只不过这个元素是一个Tuple。
Tuple是元组。通过concat合并!


foreach源码:

/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
  


 

rdd实战(rdd基本操作实战)至此!

 

 

 

 

 

 rdd实战(transformation流程图)

 拿wordcount为例!

 

启动hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

 

 

 启动spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

 

 

启动spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

 

 

scala> val partitionsReadmeRdd =  sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

 或者

 scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

 scala>  val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

.saveAsTextFile("~/partition1README.txt")

 

注意,~目录,不是这里。

 

 

 

 为什么,我的,不是这样的显示呢?

 

 

 

RDD的transformation和action执行的流程图

 

 

典型的transformation和action


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5913334.html,如需转载请自行联系原作者

相关文章
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
414 2
|
存储 人工智能 API
AgentScope:阿里开源多智能体低代码开发平台,支持一键导出源码、多种模型API和本地模型部署
AgentScope是阿里巴巴集团开源的多智能体开发平台,旨在帮助开发者轻松构建和部署多智能体应用。该平台提供分布式支持,内置多种模型API和本地模型部署选项,支持多模态数据处理。
8069 77
AgentScope:阿里开源多智能体低代码开发平台,支持一键导出源码、多种模型API和本地模型部署
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
246 0
|
11月前
|
存储 API 文件存储
单页图床HTML源码+本地API接口图床系统源码
图床系统是一种用于存储和管理图片文件的在线服务。它允许用户上传图片文件,并生成相应的图片链接,从而方便用户在网页、社交媒体或其他平台上分享图片。
477 2
单页图床HTML源码+本地API接口图床系统源码
|
11月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
587 15
|
11月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
339 0
【赵渝强老师】Spark RDD的缓存机制
|
缓存 分布式计算 Scala
Spark为什么只有在调用action时才会触发任务执行呢(附算子优化和使用示例)?
Spark算子主要划分为两类:transformation和action,并且只有action算子触发的时候才会真正执行任务。还记得之前的文章《Spark RDD详解》中提到,Spark RDD的缓存和checkpoint是懒加载操作,只有action触发的时候才会真正执行,其实不仅是Spark RDD,在Spark其他组件如SparkStreaming中也是如此,这是Spark的一个特性之一。像我们常用的算子map、flatMap、filter都是transformation算子,而collect、count、saveAsTextFile、countByKey、foreach则为action
Spark为什么只有在调用action时才会触发任务执行呢(附算子优化和使用示例)?
|
7月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
408 0
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
1027 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
10月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
549 79