Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)

简介:

声明:

  大数据中,最重要的算子操作是:join  !!!

 

 

 

典型的transformation和action

 

 

 

 

 

val nums = sc.parallelize(1 to 10) //根据集合创建RDD

map适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 + item)
mapped.collect.foreach(println)
}
}

map源码

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}





 

filter适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 + item)
val filtered = nums.filter(item => item%2 == 0)
filtered.collect.foreach(println)
}
}

filter源码

/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}









flatMap适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)

val nums = sc.parallelize(1 to 10) //根据集合创建RDD

val mapped = nums.map(item => 2 + item)
// mapped.collect.foreach(println)

val filtered = nums.filter(item => item%2 == 0)
// filtered.collect.foreach(println)

val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")
val bigDataString = sc.parallelize(bigData)
val words = bigDataString.flatMap(line => line.split(" "))
words.collect.foreach(println)
sc.stop()
}
}




flatMap源码

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

 

 

 

 

 

成为大牛,必写的写法 ->

groupByKey适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
groupByKeyTransformation(sc)

sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))
val dataRDD = sc.parallelize(data)
val grouped = dataRDD.groupByKey()
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
}

groupByKey源码
**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}






 

reduceByKey适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
reduceByKeyTransformation(sc)//reduceByKey案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
}


reduceByKey源码

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}






join适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
// reduceByKeyTransformation(sc)//reduceByKey案例
joinTransformation(sc)//join案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
def joinTransformation(sc:SparkContext){
val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop"))
val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65))
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScores)
val studentNamesAndScores = names.join(scores)
studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印

}
}

join源码

/**
* Cartesian join with another [[DataFrame]].
*
* Note that cartesian joins are very expensive without an extra filter that can be pushed down.
*
* @param right Right side of the join operation.
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame): DataFrame = {
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}

/**
* Inner equi-join with another [[DataFrame]] using the given column.
*
* Different from other join functions, the join column will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the column "user_id"
* df1.join(df2, "user_id")
* }}}
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumn Name of the column to join on. This column must exist on both sides.
* @group dfops
* @since 1.4.0
*/
def join(right: DataFrame, usingColumn: String): DataFrame = {
join(right, Seq(usingColumn))
}

/**
* Inner equi-join with another [[DataFrame]] using the given columns.
*
* Different from other join functions, the join columns will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the columns "user_id" and "user_name"
* df1.join(df2, Seq("user_id", "user_name"))
* }}}
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @group dfops
* @since 1.4.0
*/
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]

// Project only one of the join columns.
val joinedCols = usingColumns.map(col => joined.right.resolve(col))
val condition = usingColumns.map { col =>
catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col))
}.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
catalyst.expressions.And(cond, eqTo)
}

Project(
joined.output.filterNot(joinedCols.contains(_)),
Join(
joined.left,
joined.right,
joinType = Inner,
condition)
)
}

/**
* Inner join with another [[DataFrame]], using the given join expression.
*
* {{{
* // The following two are equivalent:
* df1.join(df2, $"df1Key" === $"df2Key")
* df1.join(df2).where($"df1Key" === $"df2Key")
* }}}
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner")

/**
* Join with another [[DataFrame]], using the given join expression. The following performs
* a full outer join between `df1` and `df2`.
*
* {{{
* // Scala:
* import org.apache.spark.sql.functions._
* df1.join(df2, $"df1Key" === $"df2Key", "outer")
*
* // Java:
* import static org.apache.spark.sql.functions.*;
* df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
* }}}
*
* @param right Right side of the join.
* @param joinExprs Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
// Note that in this function, we introduce a hack in the case of self-join to automatically
// resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
// Consider this case: df.join(df, df("key") === df("key"))
// Since df("key") === df("key") is a trivially true condition, this actually becomes a
// cartesian join. However, most likely users expect to perform a self join using "key".
// With that assumption, this hack turns the trivially true condition into equality on join
// keys that are resolved to both sides.

// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))
.queryExecution.analyzed.asInstanceOf[Join]

// If auto self join alias is disabled, return the plan.
if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
return plan
}

// If left/right have no output set intersection, return the plan.
val lanalyzed = this.logicalPlan.queryExecution.analyzed
val ranalyzed = right.logicalPlan.queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
return plan
}

// Otherwise, find the trivially true predicates and automatically resolves them to both sides.
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
if a.sameRef(b) =>
catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}





 
cogroup的scala版,适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
// reduceByKeyTransformation(sc)//reduceByKey案例
// joinTransformation(sc)//join案例
cogroupTransformation(sc)//cogroup案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
def joinTransformation(sc:SparkContext){
val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop"))
val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65))
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScores)
val studentNamesAndScores = names.join(scores)
studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def cogroupTransformation(sc:SparkContext){
val namesLists = Array(Tuple2(1,"xiaoming"),Tuple2(2,"xiaozhou"),Tuple2(3,"xiaoliu"))
val scoresLists = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,85),Tuple2(1,75),Tuple2(2,65),Tuple2(3,55))
val names = sc.parallelize(namesLists)
val scores = sc.parallelize(scoresLists)
val namesListsAndScores = names.cogroup(scores)
namesListsAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
}

 

 
cogroup源码

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]])
}
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
}




本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5913846.html,如需转载请自行联系原作者

相关文章
|
13天前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
29天前
|
存储 安全 Java
Map的并发处理,助你提升编程效率,代码更优雅高效。
【10月更文挑战第19天】Map使用技巧大公开:从选择合适的Map实现(如HashMap、TreeMap、LinkedHashMap)到利用Map的初始化、使用Map.Entry遍历、运用computeIfAbsent和computeIfPresent方法,再到Map的并发处理,助你提升编程效率,代码更优雅高效。
25 2
|
1月前
|
存储 安全 Java
Map使用技巧大公开:你离高效编程只差这一步!
【10月更文挑战第17天】在Java编程中,Map作为重要数据结构,常用于存储键值对。本文介绍如何选择合适的Map实现(如HashMap、TreeMap、LinkedHashMap)、Map的初始化方法、使用Map.Entry遍历、利用computeIfAbsent和computeIfPresent方法,以及Map的并发处理技巧,助你提升编程效率,写出更优雅、高效的代码。
33 1
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
39 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
31 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
54 0
|
10天前
|
JSON API 数据格式
淘宝 / 天猫官方商品 / 订单订单 API 接口丨商品上传接口对接步骤
要对接淘宝/天猫官方商品或订单API,需先注册淘宝开放平台账号,创建应用获取App Key和App Secret。之后,详细阅读API文档,了解接口功能及权限要求,编写认证、构建请求、发送请求和处理响应的代码。最后,在沙箱环境中测试与调试,确保API调用的正确性和稳定性。
|
22天前
|
供应链 数据挖掘 API
电商API接口介绍——sku接口概述
商品SKU(Stock Keeping Unit)接口是电商API接口中的一种,专门用于获取商品的SKU信息。SKU是库存量单位,用于区分同一商品的不同规格、颜色、尺寸等属性。通过商品SKU接口,开发者可以获取商品的SKU列表、SKU属性、库存数量等详细信息。
|
23天前
|
JSON API 数据格式
店铺所有商品列表接口json数据格式示例(API接口)
当然,以下是一个示例的JSON数据格式,用于表示一个店铺所有商品列表的API接口响应
|
1月前
|
编解码 监控 API
直播源怎么调用api接口
调用直播源的API接口涉及开通服务、添加域名、获取API密钥、调用API接口、生成推流和拉流地址、配置直播源、开始直播、监控管理及停止直播等步骤。不同云服务平台的具体操作略有差异,但整体流程简单易懂。
下一篇
无影云桌面