开发者学堂课程【大数据实时计算框架 Spark 快速入门:Spark 算子操作剖析4】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1690
Spark 算子操作剖析 4
有了 combiner 网络传输效率更高,以前可能需要传一万个1,现在一个 key 在一个partition 里只需一个数,在 reduce 端的计算量也相应减少。
Sum 端的累加可以用 reducebykey 来做,如果有1,2,3三个数,1,2在一个 partition,3在一个 partiton,用 reducebykey 来做,自带 combiner,在 map 端进行加和平均的逻辑操作,1,2平均为1.5,再和3平均,结果不为2。
Aggregatebykey:
当 map 端和 reduce 端逻辑不一样时,可直接使用 aggregatebykey.
Sample 为随机抽样,从整个 RDD 里不重复取 false 之后的百分数,若改为 true,则会抽出重复的。
union(otherDataser)
Return a new dataset that contains the union of the elements in the source dataset and
the argument.
intersection(otherDataser)
Return a new RDD that contains the Intersection of elements in the source dataset and
the argument.
distinct([numTasks])
Return a new dataset that contains the distinct elements of the source dataset.
groupByKey(inumTasks))
When called on a dataset of (K, V) palrs, returns a dataset of (K, Iterable) palrs!
Note: If you are grouping In order to pertorm an aggregation (such as a sum or average)
over each key, using rcaucesykey or aggregateBykey will yield much better pertormance.
Note: By default, the level of parallelism in the output depends on the number of
partitions of the parent RDD. You can pass an optional numtasks argument to set a
different number of tasks,
reduceByKey(tunc, [numTasks)
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the
values for each key are aggregated using the given reduce function func, which must be
of type (VV)=> V. Like in groupeykey, the number of reduce tasks is configurable through
an optional second argument.
aggregateByKey(zeroValue)(seqOp, combop,
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the
[numTasks])
values for each key are aggregated using the glven combine functions and a neutral
"zero" value. Allows an aggregated value type that is difterent than the input value type,
while avolding unnecessary allocations. Like in groupBykey, the number of reduce tasks is
contigurable through an optional second argument.
sortByKey([ascending], [numTasks)
When called on a dataset of (K, V) pairs where K Implements Ordered, returns a dataset
of (K, V) pairs sorted by keys in ascending or descending order, as speclflied in the
boolean ascending argument.
join(otherDataset,[numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs
with all pairs of elements for each key. Outer joins are supported through TeftouterJoin,
rightouterJoin, and fu11outerJoin.
cogroup(otherDataset,[num Tasks])
When called on datasets of type (K, V) and (K, W). returns a dataset of (K,(Iterable
Iterable)) tuples. This operation is also called groupwith.
cartesian(olherDalasef)
When called on datlasets of types T and U, returns a dalaset of (T,U) pairs (alli pairs of
elements).
JavaSparkContext sc = new JavaSparkContext(conf);
List> scoreList = Arrays.aslist(
new Tuple2("xuruyun", 150),
new Tuple2("liangyongqi", 100),
new Tuple2("wangfei”, 100),
new Tuple2("wangfei", 80));
JavaPairRDD rdd = sc.parallelizePairs(scorelist);
rdd.reduceByKey(new Function2(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}).foreach(new VoidFunction>(){
package com.shsxt.study.operator;
pimport java.util.Arrays;
public class AggregateBKeyOperator {
public static void main(String[] args){
SparkConf conf = new SparkConf()
isetAppName("AggregateByKeyOperator")
.setMaster("local");
JavaSparkContext sc= new JavaSparkContext(conf);
JavaRDD lines = sc.textFile("CHANGES.txt");
JavaRDD words = lines.flatMap(new FlatMapFunction(){
private static final long serialVersionUID = 1L;
@Override
public Iterable call(String line) throws Exception {
return Arrays.aslist(line.split(""));
DA
javapairRDD pairs = words.mapToPair(new PairFunction
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2(word ,1);
// aggregateByKey其实和这个 reduceByKey 差不多,reduceByKey 是@ggregateByKey
//aggregateByKey 里面的参数需要三个
//第一个参数,每个 Key 的初始值
//第二个参数,Seq Function,如何进行 Shuffle map-side 的本地聚合
//第三个参数,说白了就是如何进行 Shuffle reduce-side 的全局聚合
[简化版
// reduce foldLeft
JavaPairRDD wordCounts = pairs.aggregateBykey(0
, new Function2(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2)
throws Exception {
return v1+v2;
public Iterable call(String line) throws Exception {|
return Arrays.aslist(line.split(""));
雨
javaPairRDD pairs = words.mapToPair(new Pairfunction
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String word) throws Exception {
return new Tuple2(word ,1);
网
// aggregateByKey 其实和这个 reduceByKey 差不多,reduceByKey 是aggregateByKey 简化版
// aggregateByKey 里面的参数需要三个
//第一个参数,每个 Key 的初始值
//第二个参数,Seq Function,如何进行 Shuffle map-side 的本地聚合
//第三个参数,说白了就是如何进行 Shuffle reduce-side 的全局聚合
new Function2(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer vi, Integer v2)
throws Exception {
return v1+v2;
3);
List> list = wordCounts.collect();
for(Tuple2 wc: list){
System.out.println(wc);
sc.close();
public class SampleOperator {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("SampleOperator")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List names = Arrays
aslist("xurunyun","liangyongqi","wangfei","yasaka","xurunyun","lian
JavaRDD nameRDD = sc.parallelize(names,2);
nameRDD.sample(false,@13).foreach(new VoidFunction(){
private static final long serialVersionUID = 1L;
@Override
public void call(String name) throws Exception {
System.out.println(name);
//
sc.close();
Return a sampled subset of this RDD.
@param withReplacement can elements be sampled multiple times (Feplaced when sampled out)
@param fraction expected size of the sample as a fraction of this RDD's size
without replacement: probability that each element is chosen; fraction must be [e, 1]
with replacement: expected number of times each element is chosen; fraction must be >= o
@param seed seed for the random number generator
def sample(
withReplacement: Boolean,
fraction: Double,
seed:Long = Utils.random.nextLong):RDD[T]= withscope {
require(fraction >= 0.0,"Negative fraction value:"+ fraction)
if (withReplacement){
new PartitionwiseSampledRDD[T, T](this, new Poissonsampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new Bernoullisampler[T](fraction), true, seed)
Randomly splits this RDD with the provided weights.
@param weights weights for splits, will be normalized if they don't sum to 1
@param seed random seed
@return split RDDs in an array