开发者学堂课程【大数据实时计算框架 Spark 快速入门:Spark 算子操作剖析3】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1689
Spark 算子操作剖析 3
一个 RDD 里面有某个 partition,groupByKey 里面传参数可变为相应 partition。不传参数,spark.default.parallelism 改变即可。
算子传参优先级最高,其次是 spark.default.parallelism 的设置,若以上两个都为空,则下一个 RDD 的并行度与上一个相同。
reduceoperator 算子:
reduce 是 action 操作,sum 在 driver 端,算子里的逻辑在重节点。
numbers 指针在driver端,数据不一定在 driver 端,reduce方法本身在driver端执行,但 reduce 里的匿名函数在 excutor 端执行。
调动 foreach 本身是 driver 端调动,println 是在丛节点里并行来执行的,并行的存入数据库更好。
Reducebykey:
shuffle 操作都有 reduce 和 map 操作两个阶段。
public class ReduceOperator {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("ReduceOperator"〉
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//有一个集合,里面有1到10, 10个数字,现在我们通过 reduce 来进行累加
List numberList = Arrays.aslist(1,2,3,4,5);
JavaRDD numbers = sc.parallelize(numberList);
// reduce 操作的原理:首先将第一个和第二个元素,传入 call 方法
//计算一个结果,接着把结果再和后面的元素依次累加
//以此类推
Int sum= numbers.reduce(new Function2(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println(sum);
sc.close();
Reduces the elements of this RDD using the specified commutative and
associative binary operator.
def reduce(f:(T, T)=> T):T = withscope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T]=> Option[T]= iter =>{
if (iter.hasNext){
Some(iter.reduceleft(cleanF))
} else {
None
}
}
var jobResult: Option[T]= None
val mergeResult =(index: Int, taskResult: Option[T])=>{
if (taskResult.isDefined){
jobResult = jobResult match {
case Some(value)=> Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducepartition, mergeResult)
//Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedoperationException("empty collection"))
vimport java.util.Arrays;
FeduceBykey= groupBykey + reduce
// shuffle 洗牌= map 端+ reduce 端
//spark 里面这个 reduceByKey在 map 端自带 Combiner
public class ReduceByKeyOperaton {
public static void main(string[] args){
SparkConf conf = new SparkConf().setAppName("LineCount").setMaster(
"local");
JavaSparkContext sc = new JavaSparkContext(conf);
List> scoreList = Arrays.aslist(
new Tuple2("xuruyun", 150),
new Tuple2("liangyongqi", 100),
new Tuple2("wangfei”, 100),
new Tuple2("wangfei", 80));