Spark 算子操作剖析4

简介: 快速学习 Spark 算子操作剖析4

开发者学堂课程【大数据实时计算框架 Spark 快速入门Spark 算子操作剖析4】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/100/detail/1690


Spark 算子操作剖析 4


有了 combiner 网络传输效率更高,以前可能需要传一万个1,现在一个 key 在一个partition 里只需一个数,在 reduce 端的计算量也相应减少。

Sum 端的累加可以用 reducebykey 来做,如果有1,2,3三个数,1,2在一个 partition,3在一个 partiton,用 reducebykey 来做,自带 combiner,在 map 端进行加和平均的逻辑操作,1,2平均为1.5,再和3平均,结果不为2。

Aggregatebykey:

当 map 端和 reduce 端逻辑不一样时,可直接使用 aggregatebykey.

Sample 为随机抽样,从整个 RDD  里不重复取 false 之后的百分数,若改为 true,则会抽出重复的。

union(otherDataser)

Return a new dataset that contains the union of the elements in the source dataset and

the argument.

intersection(otherDataser)

Return a new RDD that contains the Intersection of elements in the source dataset and

the argument.

distinct([numTasks])

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey(inumTasks))

When called on a dataset of (K, V) palrs, returns a dataset of (K, Iterable) palrs!

Note: If you are grouping In order to pertorm an aggregation (such as a sum or average)

over each key, using rcaucesykey or aggregateBykey will yield much better pertormance.

Note: By default, the level of parallelism in the output depends on the number of

partitions of the parent RDD. You can pass an optional numtasks argument to set a

different number of tasks,

reduceByKey(tunc, [numTasks)

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the

values for each key are aggregated using the given reduce function func, which must be

of type (VV)=> V. Like in groupeykey, the number of reduce tasks is configurable through

an optional second argument.

aggregateByKey(zeroValue)(seqOp, combop,

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the

[numTasks])

values for each key are aggregated using the glven combine functions and a neutral

"zero" value. Allows an aggregated value type that is difterent than the input value type,

while avolding unnecessary allocations. Like in groupBykey, the number of reduce tasks is  

contigurable through an optional second argument.

sortByKey([ascending], [numTasks)

When called on a dataset of (K, V) pairs where K Implements Ordered, returns a dataset

of (K, V) pairs sorted by keys in ascending or descending order, as speclflied in the

boolean ascending argument.

join(otherDataset,[numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs

with all pairs of elements for each key. Outer joins are supported through TeftouterJoin,

rightouterJoin, and fu11outerJoin.

cogroup(otherDataset,[num Tasks])

When called on datasets of type (K, V) and (K, W). returns a dataset of (K,(Iterable

Iterable)) tuples. This operation is also called groupwith.

cartesian(olherDalasef)

When called on datlasets of types T and U, returns a dalaset of (T,U) pairs (alli pairs of

elements).

JavaSparkContext sc = new JavaSparkContext(conf);

List> scoreList = Arrays.aslist(

new Tuple2("xuruyun", 150),

new Tuple2("liangyongqi", 100),

new Tuple2("wangfei”, 100),

new Tuple2("wangfei", 80));

JavaPairRDD rdd = sc.parallelizePairs(scorelist);

rdd.reduceByKey(new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1+v2;

}).foreach(new VoidFunction>(){

package com.shsxt.study.operator;

pimport java.util.Arrays;

public class AggregateBKeyOperator {

public static void main(String[] args){

SparkConf conf = new SparkConf()

isetAppName("AggregateByKeyOperator")

.setMaster("local");

JavaSparkContext sc= new JavaSparkContext(conf);

JavaRDD lines = sc.textFile("CHANGES.txt");

JavaRDD words = lines.flatMap(new FlatMapFunction(){

private static final long serialVersionUID = 1L;

@Override

public Iterable call(String line) throws Exception {

return Arrays.aslist(line.split(""));

DA

javapairRDD pairs = words.mapToPair(new PairFunction

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(String word) throws Exception {

return new Tuple2(word ,1);

// aggregateByKey其实和这个 reduceByKey 差不多,reduceByKey 是@ggregateByKey

//aggregateByKey 里面的参数需要三个

//第一个参数,每个 Key 的初始值

//第二个参数,Seq Function,如何进行 Shuffle map-side 的本地聚合

//第三个参数,说白了就是如何进行 Shuffle reduce-side 的全局聚合

[简化版

// reduce foldLeft  

JavaPairRDD wordCounts = pairs.aggregateBykey(0

, new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2)

throws Exception {

return v1+v2;

public Iterable call(String line) throws Exception {|

return Arrays.aslist(line.split(""));

javaPairRDD pairs = words.mapToPair(new Pairfunction

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(String word) throws Exception {

return new Tuple2(word ,1);

// aggregateByKey 其实和这个 reduceByKey 差不多,reduceByKey 是aggregateByKey 简化版

// aggregateByKey 里面的参数需要三个

//第一个参数,每个 Key 的初始值

//第二个参数,Seq Function,如何进行 Shuffle map-side 的本地聚合

//第三个参数,说白了就是如何进行 Shuffle reduce-side 的全局聚合

new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer vi, Integer v2)

throws Exception {

return v1+v2;

3);

List> list = wordCounts.collect();

for(Tuple2 wc: list){

System.out.println(wc);

sc.close();

public class SampleOperator {

public static void main(String[] args){

SparkConf conf = new SparkConf().setAppName("SampleOperator")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

List names = Arrays

aslist("xurunyun","liangyongqi","wangfei","yasaka","xurunyun","lian

JavaRDD nameRDD = sc.parallelize(names,2);

nameRDD.sample(false,@13).foreach(new VoidFunction(){

private static final long serialVersionUID = 1L;

@Override

public void call(String name) throws Exception {

System.out.println(name);

//

sc.close();

Return a sampled subset of this RDD.

@param withReplacement can elements be sampled multiple times (Feplaced when sampled out)

@param fraction expected size of the sample as a fraction of this RDD's size

without replacement: probability that each element is chosen; fraction must be [e, 1]

with replacement: expected number of times each element is chosen; fraction must be >= o

@param seed seed for the random number generator

def sample(

withReplacement: Boolean,

fraction: Double,

seed:Long = Utils.random.nextLong):RDD[T]= withscope {

require(fraction >= 0.0,"Negative fraction value:"+ fraction)

if (withReplacement){

new PartitionwiseSampledRDD[T, T](this, new Poissonsampler[T](fraction), true, seed)

} else {

new PartitionwiseSampledRDD[T, T](this, new Bernoullisampler[T](fraction), true, seed)

Randomly splits this RDD with the provided weights.

@param weights weights for splits, will be normalized if they don't sum to 1

@param seed random seed

@return split RDDs in an array

相关文章
|
6月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
295 1
|
6月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks MaxCompute
DataWorks操作报错合集之spark操作odps,写入时报错,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
5月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks 网络安全
DataWorks操作报错合集之还未运行,spark节点一直报错,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
140 2