开发者学堂课程【大数据实时计算框架 Spark 快速入门:Spark 算子操作剖析 1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1687
Spark 算子操作剖析 1
通过读取数据创建 RDD,如果不写 RDD 的数量,则通过 Parallize 的方法找对应集成运行模式的并行度,executors×vcores=total cores。
假设 RDD 中有五个 partition,上游有五个 partition,下游也有五个 partition,默认一对一;若添加算子操作 groupByKey(10),下游则变成十个 partition;若再添加算子操作,新生成 RDD 数量同上,无论宽依赖和窄依赖。
例如:
mapToPair 为窄依赖,repartition(10)宽依赖将RDD数量提升为十,groupByKey 为宽依赖。
代码写完之后有三个stage:ShuffleMapStage 0,
ShuffleMapStage 1,ResultStage2
当出现 Job 0 finished 时,下面自动 stop。因为有42行collect 的算子,才出现Job 0 finished。
同时,一个stage对应一个taskset,一个partition对应一个task。
repartition(10)是发动Submitting 10 missing tasks的原因,相当于十条线并行来执行,通过DAGScheduler 传递给TaskScheduler。
如不写groupbykey括号里的个数,则根据repartition括号里的个数决定。
parallelism指定默认并行度。
//groupByKey 把相同的key的元素放到一起去
List> scoreList = Arrays.aslist(
new Tuple2("xuruyun", 150),
new Tuple2("liangyongqi", 100),
new Tuple2("wangfei", 100),
new Tuple2("wangfei", 80));
JavaPairRDD rdd = sc.parallelizePairs(scorelist);
JavaPairRDD mapped = rdd.mapToPair(new PairFunction
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._1, tuple._2+2);
D;
JavaPairRDD results = mapped.repartition(10);
JavaPairRDD> finalResults = results.groupByKey();
finalResults.collect();
sc.close();
JavaPairRDD rdd = sc.parallelizePairs(scorelist);
JavaPairRDD mapped = rdd.mapToPair(new Pairfunction
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Tuple2 tuple)
throws Exception {
return new Tuple2(tuple._1, tuple._2+2);
D
javaPairRDD results = mapped.repartition(10);
JavaPairRDD> finalkesults = resufts.groupBykey();
finalResults.collect();
sc.close();