Spark 算子操作剖析 3

简介: 快速学习 Spark 算子操作剖析 3

开发者学堂课程【大数据实时计算框架 Spark 快速入门Spark 算子操作剖析3】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/100/detail/1689


Spark 算子操作剖析 3


一个 RDD 里面有某个 partition,groupByKey 里面传参数可变为相应 partition。不传参数,spark.default.parallelism 改变即可。

算子传参优先级最高,其次是 spark.default.parallelism 的设置,若以上两个都为空,则下一个 RDD 的并行度与上一个相同。

reduceoperator 算子:

reduce 是 action 操作,sum 在 driver 端,算子里的逻辑在重节点。

numbers 指针在driver端,数据不一定在 driver 端,reduce方法本身在driver端执行,但 reduce 里的匿名函数在 excutor 端执行。            

调动 foreach 本身是 driver 端调动,println 是在丛节点里并行来执行的,并行的存入数据库更好。

Reducebykey:

shuffle 操作都有 reduce 和 map 操作两个阶段。

public class ReduceOperator {

public static void main(String[] args){

SparkConf conf = new SparkConf().setAppName("ReduceOperator"〉

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

//有一个集合,里面有1到10, 10个数字,现在我们通过 reduce 来进行累加

List numberList = Arrays.aslist(1,2,3,4,5);

JavaRDD numbers = sc.parallelize(numberList);

// reduce 操作的原理:首先将第一个和第二个元素,传入 call 方法

//计算一个结果,接着把结果再和后面的元素依次累加

//以此类推

Int sum= numbers.reduce(new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1+v2;

}

});

System.out.println(sum);

sc.close();

image.png

Reduces the elements of this RDD using the specified commutative and

associative binary operator.

def reduce(f:(T, T)=> T):T = withscope {

val cleanF = sc.clean(f)

val reducePartition: Iterator[T]=> Option[T]= iter =>{

if (iter.hasNext){

Some(iter.reduceleft(cleanF))

} else {

None

}

}

var jobResult: Option[T]= None

val mergeResult =(index: Int, taskResult: Option[T])=>{

if (taskResult.isDefined){

jobResult = jobResult match {

case Some(value)=> Some(f(value, taskResult.get))

case None => taskResult

}

}

}

sc.runJob(this, reducepartition, mergeResult)

//Get the final result out of our Option, or throw an exception if the RDD was empty

jobResult.getOrElse(throw new UnsupportedoperationException("empty collection"))

vimport java.util.Arrays;

FeduceBykey= groupBykey + reduce  

// shuffle 洗牌= map 端+ reduce 端

//spark 里面这个 reduceByKey在 map 端自带 Combiner  

public class ReduceByKeyOperaton {

public static void main(string[] args){

SparkConf conf = new SparkConf().setAppName("LineCount").setMaster(

"local");

JavaSparkContext sc = new JavaSparkContext(conf);

List> scoreList = Arrays.aslist(

new Tuple2("xuruyun", 150),

new Tuple2("liangyongqi", 100),

new Tuple2("wangfei”, 100),

new Tuple2("wangfei", 80));

相关文章
|
6月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
295 1
|
6月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks MaxCompute
DataWorks操作报错合集之spark操作odps,写入时报错,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
5月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks 网络安全
DataWorks操作报错合集之还未运行,spark节点一直报错,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
140 2