Spark RDD/Core 编程 API入门系列 之rdd实战(rdd基本操作实战及transformation和action流程图)(源码)(三)

简介:

本博文的主要内容是:

1、rdd基本操作实战

2、transformation和action流程图

3、典型的transformation和action

 

 

 

RDD有3种操作:

1、  Trandformation      对数据状态的转换,即所谓算子的转换

2、  Action    触发作业,即所谓得结果的

3、  Contoller  对性能、效率和容错方面的支持,如cache、persist、checkpoint

Contoller包括cache、persist、checkpoint。

 

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

传入类型是T,返回类型是U。

 

 

 

元素之间,为什么reduce操作,要符合结合律和交换律?
答:因为,交换律,不知,哪个数据先过来。所以,必须符合交换律。
在交换律基础上,想要reduce操作,必须要符合结合律。

/**

* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

RDD.scala(源码)


这里,新建包com.zhouls.spark.cores

package com.zhouls.spark.cores

/**
* Created by Administrator on 2016/9/27.
*/
object TextLines {

}


下面,开始编代码

本地模式

自动 ,会写好

源码来看,

所以, val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通过HadoopRDD以及MapPartitionsRDD获取文件中每一行的内容本身

 

 

val lineCount = lines.map(line => (line,1)) //每一行变成行的内容与1构成的Tuple


val textLines = lineCount.reduceByKey(_+_)


textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))

 成功!



 现在,将此行代码,

     textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
 

总结:

本地模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行正常,因为在本地模式下,是jvm,但这样书写,是不正规的。

 

 

 

集群模式里,
   textLines.collect.foreach(pair  => println(pair._1 + ":" + pair._2))
改一改
     textLines.foreach(pair  => println(pair._1 + ":" + pair._2))
运行无法通过,因为结果是分布在各个节点上。
 
collect源码:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

得出,collect后array中就是一个元素,只不过这个元素是一个Tuple。
Tuple是元组。通过concat合并!


foreach源码:

/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
  


 

rdd实战(rdd基本操作实战)至此!

 

 

 

 

 

 rdd实战(transformation流程图)

 拿wordcount为例!

 

启动hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

 

 

 启动spark集群

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

 

 

启动spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

 

 

scala> val partitionsReadmeRdd =  sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")

 或者

 scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")

 scala>  val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)

.saveAsTextFile("~/partition1README.txt")

 

注意,~目录,不是这里。

 

 

 

 为什么,我的,不是这样的显示呢?

 

 

 

RDD的transformation和action执行的流程图

 

 

典型的transformation和action


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5913334.html,如需转载请自行联系原作者

相关文章
|
4月前
|
JSON 安全 API
电商API入门问答:开发者必知的10个基础问题
本文详解电商API的10个基础知识,涵盖定义、用途、认证、安全等内容,帮助开发者快速入门并提升开发效率。
117 0
|
1月前
|
Cloud Native 算法 API
Python API接口实战指南:从入门到精通
🌟蒋星熠Jaxonic,技术宇宙的星际旅人。深耕API开发,以Python为舟,探索RESTful、GraphQL等接口奥秘。擅长requests、aiohttp实战,专注性能优化与架构设计,用代码连接万物,谱写极客诗篇。
Python API接口实战指南:从入门到精通
|
4月前
|
缓存 监控 安全
电商API集成入门:从零开始搭建高效接口
在数字化电商时代,API集成成为企业提升效率、实现系统互联的关键。本文从零开始,逐步讲解如何搭建高效、可靠的电商API接口,适合初学者学习。内容涵盖API基础、认证安全、请求处理、性能优化等核心步骤,并提供Python代码示例与数学公式辅助理解。通过实践,读者可掌握构建优质电商API的技巧,提升用户体验与系统性能。
177 0
|
5月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
248 0
|
7月前
|
JSON 算法 API
一文掌握 1688 商品详情 API 接口:从入门到实战
1688是国内领先的综合电商批发平台,提供海量商品资源。其商品详情API助力开发者与企业获取商品的详细信息(如属性、价格、库存等),广泛应用于电商数据分析、比价系统及采购场景。API支持GET/POST请求,需传入通用参数(app_key、timestamp等)与业务参数(如product_id)。返回JSON格式数据,包含商品标题、价格、图片链接等详情,提升业务效率与决策精准度。
|
7月前
|
搜索推荐 API 开发者
京东商品列表 API 接口全解析:从入门到精通
京东商品列表API是京东开放平台为开发者提供的核心数据接口,支持批量获取商品基础信息、价格、库存状态等多维度数据。它具备数据丰富性、灵活筛选与分页查询、稳定高效等特点,可满足市场分析、选品优化、比价工具及推荐系统开发等需求,为电商业务创新提供坚实支撑。通过标准化通道,助力第三方高效、合法地利用京东海量商品数据。
|
6月前
|
JSON API 开发工具
电商API接口入门指南
本文介绍了API的基础知识及其在电商领域的实际应用。首先,阐释了API的概念、运作机制及参数与返回值的作用,帮助读者理解如何通过API实现软件间的交互。接着,以获取电商商品列表为例,详细讲解了从选择平台、引入SDK到编写代码调用API的全流程。示例代码采用Python语言,利用requests库发送请求并解析JSON数据,为开发者提供了清晰的实践指导。
|
4月前
|
存储 安全 API
亚马逊SP-API入门:海外电商接口调用与国内平台的差异化
亚马逊 SP-API 与国内电商 API 在技术架构、安全机制及开发流程上差异显著。本文对比京东、淘宝等平台,分析接口设计、地域适配、权限管理等核心差异,并结合实战经验提供开发建议,助力开发者高效接入 SP-API,实现全球电商业务拓展。
|
6月前
|
数据挖掘 API 开发者
京东商品详情 API 接口全攻略:从入门到精通
京东商品详情API接口是京东开放平台为开发者提供的服务,用于获取商品详细信息。通过调用接口,开发者可获得商品属性、价格、库存、促销信息等数据,适用于电商应用、价格比较工具及数据分析平台等场景。支持GET/POST请求方式,参数包括API版本、密钥等。示例代码展示了如何使用Python的requests库调用该接口,并获取JSON格式的返回数据,包含商品基本信息、价格、库存和用户评价等内容。
251 16
|
12月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
765 2
ClickHouse与大数据生态集成:Spark & Flink 实战

热门文章

最新文章

下一篇
开通oss服务