Spark 算子操作剖析 1

简介: 快速学习 Spark 算子操作剖析 1

开发者学堂课程【大数据实时计算框架 Spark 快速入门Spark 算子操作剖析 1】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/100/detail/1687


Spark 算子操作剖析 1


通过读取数据创建 RDD,如果不写 RDD 的数量,则通过 Parallize 的方法找对应集成运行模式的并行度,executors×vcores=total cores。

假设 RDD 中有五个 partition,上游有五个 partition,下游也有五个 partition,默认一对一;若添加算子操作 groupByKey(10),下游则变成十个 partition;若再添加算子操作,新生成 RDD 数量同上,无论宽依赖和窄依赖。

例如:

mapToPair 为窄依赖,repartition(10)宽依赖将RDD数量提升为十,groupByKey 为宽依赖。

代码写完之后有三个stage:ShuffleMapStage 0,

ShuffleMapStage 1,ResultStage2

当出现 Job 0 finished 时,下面自动 stop。因为有42行collect 的算子,才出现Job 0 finished。

同时,一个stage对应一个taskset,一个partition对应一个task。

repartition(10)是发动Submitting 10 missing tasks的原因,相当于十条线并行来执行,通过DAGScheduler 传递给TaskScheduler。

如不写groupbykey括号里的个数,则根据repartition括号里的个数决定。

parallelism指定默认并行度。

//groupByKey 把相同的key的元素放到一起去

List> scoreList = Arrays.aslist(

new Tuple2("xuruyun", 150),

new Tuple2("liangyongqi", 100),

new Tuple2("wangfei", 100),

new Tuple2("wangfei", 80));

JavaPairRDD rdd = sc.parallelizePairs(scorelist);

JavaPairRDD mapped = rdd.mapToPair(new PairFunction

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(Tuple2 tuple)

throws Exception {

return new Tuple2(tuple._1, tuple._2+2);

D;

JavaPairRDD results = mapped.repartition(10);

JavaPairRDD> finalResults = results.groupByKey();

finalResults.collect();

sc.close();

JavaPairRDD rdd = sc.parallelizePairs(scorelist);

JavaPairRDD mapped = rdd.mapToPair(new Pairfunction

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(Tuple2 tuple)

throws Exception {

return new Tuple2(tuple._1, tuple._2+2);

D

javaPairRDD results = mapped.repartition(10);

JavaPairRDD> finalkesults = resufts.groupBykey();

finalResults.collect();

sc.close();

相关文章
|
7月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
382 1
|
7月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
5月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 DataWorks MaxCompute
DataWorks操作报错合集之spark操作odps,写入时报错,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
7月前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
6月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
6月前
|
分布式计算 DataWorks 网络安全
DataWorks操作报错合集之还未运行,spark节点一直报错,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
7月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
150 2