Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)

简介:

声明:

  大数据中,最重要的算子操作是:join  !!!

 

 

 

典型的transformation和action

 

 

 

 

 

val nums = sc.parallelize(1 to 10) //根据集合创建RDD

map适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 + item)
mapped.collect.foreach(println)
}
}

map源码

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}





 

filter适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 + item)
val filtered = nums.filter(item => item%2 == 0)
filtered.collect.foreach(println)
}
}

filter源码

/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}









flatMap适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)

val nums = sc.parallelize(1 to 10) //根据集合创建RDD

val mapped = nums.map(item => 2 + item)
// mapped.collect.foreach(println)

val filtered = nums.filter(item => item%2 == 0)
// filtered.collect.foreach(println)

val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")
val bigDataString = sc.parallelize(bigData)
val words = bigDataString.flatMap(line => line.split(" "))
words.collect.foreach(println)
sc.stop()
}
}




flatMap源码

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

 

 

 

 

 

成为大牛,必写的写法 ->

groupByKey适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
groupByKeyTransformation(sc)

sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))
val dataRDD = sc.parallelize(data)
val grouped = dataRDD.groupByKey()
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
}

groupByKey源码
**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}






 

reduceByKey适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
reduceByKeyTransformation(sc)//reduceByKey案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
}


reduceByKey源码

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}






join适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
// reduceByKeyTransformation(sc)//reduceByKey案例
joinTransformation(sc)//join案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
def joinTransformation(sc:SparkContext){
val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop"))
val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65))
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScores)
val studentNamesAndScores = names.join(scores)
studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印

}
}

join源码

/**
* Cartesian join with another [[DataFrame]].
*
* Note that cartesian joins are very expensive without an extra filter that can be pushed down.
*
* @param right Right side of the join operation.
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame): DataFrame = {
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}

/**
* Inner equi-join with another [[DataFrame]] using the given column.
*
* Different from other join functions, the join column will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the column "user_id"
* df1.join(df2, "user_id")
* }}}
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumn Name of the column to join on. This column must exist on both sides.
* @group dfops
* @since 1.4.0
*/
def join(right: DataFrame, usingColumn: String): DataFrame = {
join(right, Seq(usingColumn))
}

/**
* Inner equi-join with another [[DataFrame]] using the given columns.
*
* Different from other join functions, the join columns will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the columns "user_id" and "user_name"
* df1.join(df2, Seq("user_id", "user_name"))
* }}}
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @group dfops
* @since 1.4.0
*/
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]

// Project only one of the join columns.
val joinedCols = usingColumns.map(col => joined.right.resolve(col))
val condition = usingColumns.map { col =>
catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col))
}.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
catalyst.expressions.And(cond, eqTo)
}

Project(
joined.output.filterNot(joinedCols.contains(_)),
Join(
joined.left,
joined.right,
joinType = Inner,
condition)
)
}

/**
* Inner join with another [[DataFrame]], using the given join expression.
*
* {{{
* // The following two are equivalent:
* df1.join(df2, $"df1Key" === $"df2Key")
* df1.join(df2).where($"df1Key" === $"df2Key")
* }}}
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner")

/**
* Join with another [[DataFrame]], using the given join expression. The following performs
* a full outer join between `df1` and `df2`.
*
* {{{
* // Scala:
* import org.apache.spark.sql.functions._
* df1.join(df2, $"df1Key" === $"df2Key", "outer")
*
* // Java:
* import static org.apache.spark.sql.functions.*;
* df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
* }}}
*
* @param right Right side of the join.
* @param joinExprs Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
// Note that in this function, we introduce a hack in the case of self-join to automatically
// resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
// Consider this case: df.join(df, df("key") === df("key"))
// Since df("key") === df("key") is a trivially true condition, this actually becomes a
// cartesian join. However, most likely users expect to perform a self join using "key".
// With that assumption, this hack turns the trivially true condition into equality on join
// keys that are resolved to both sides.

// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))
.queryExecution.analyzed.asInstanceOf[Join]

// If auto self join alias is disabled, return the plan.
if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
return plan
}

// If left/right have no output set intersection, return the plan.
val lanalyzed = this.logicalPlan.queryExecution.analyzed
val ranalyzed = right.logicalPlan.queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
return plan
}

// Otherwise, find the trivially true predicates and automatically resolves them to both sides.
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
if a.sameRef(b) =>
catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}





 
cogroup的scala版,适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
// reduceByKeyTransformation(sc)//reduceByKey案例
// joinTransformation(sc)//join案例
cogroupTransformation(sc)//cogroup案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
def joinTransformation(sc:SparkContext){
val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop"))
val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65))
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScores)
val studentNamesAndScores = names.join(scores)
studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def cogroupTransformation(sc:SparkContext){
val namesLists = Array(Tuple2(1,"xiaoming"),Tuple2(2,"xiaozhou"),Tuple2(3,"xiaoliu"))
val scoresLists = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,85),Tuple2(1,75),Tuple2(2,65),Tuple2(3,55))
val names = sc.parallelize(namesLists)
val scores = sc.parallelize(scoresLists)
val namesListsAndScores = names.cogroup(scores)
namesListsAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
}

 

 
cogroup源码

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]])
}
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
}




本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5913846.html,如需转载请自行联系原作者

相关文章
|
12天前
|
索引
ES5常见的数组方法:forEach ,map ,filter ,some ,every ,reduce (除了forEach,其他都有回调,都有return)
ES5常见的数组方法:forEach ,map ,filter ,some ,every ,reduce (除了forEach,其他都有回调,都有return)
|
10天前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
1月前
|
存储 API 数据库
如何使用 ef core 的 code first(fluent api)模式实现自定义类型转换器?
本文介绍了如何在 EF Core 的 Code First 模式下使用自定义类型转换器实现 JsonDocument 和 DateTime 类型到 SQLite 数据库的正确映射。通过自定义 ValueConverter,实现了数据类型的转换,并展示了完整的项目结构和代码实现,包括实体类定义、DbContext 配置、Repositories 仓储模式及数据库应用迁移(Migrations)操作。
52 6
如何使用 ef core 的 code first(fluent api)模式实现自定义类型转换器?
WK
|
16天前
|
Python
map和filter的区别是什么
`map()`和`filter()`均为Python中的高阶函数,前者针对可迭代对象中的每个元素执行指定操作,如数值翻倍或字符串转大写;后者则筛选出符合条件的元素,例如仅保留偶数或非空字符串。两者均返回迭代器,并可通过`list()`等函数转换为所需的数据结构。具体使用时,应依据实际需求和场景选择合适的函数。
WK
11 1
WK
|
1月前
map和filter的区别是什么
在编程中,`map` 和 `filter` 是处理数组或集合时常用的两个函数。`map` 用于将每个元素通过指定函数转换后生成新的数组,而 `filter` 则根据条件筛选出符合条件的元素组成新数组。两者的主要区别在于:`map` 的返回数组长度与原数组相同,但元素被转换;`filter` 的返回数组长度可能不同,只包含符合条件的元素。
WK
15 2
|
12天前
|
JavaScript 前端开发
JavaScript 中 五种迭代数组的方法 every some map filter forEach
本文介绍了JavaScript中五种常用数组迭代方法:every、some、filter、map和forEach,并通过示例代码展示了它们的基本用法和区别。
|
2月前
|
JavaScript 前端开发 索引
JS中常用的数组迭代方法(filter,forEach,map,every,some,find,findIndex)
这段代码和说明介绍了JavaScript中数组的一些常用方法。函数接收三个参数:`item`(数组项的值)、`index`(项的位置,可选)和`array`(数组本身,可选)。示例展示了如何使用`filter()`过滤非空项、`forEach()`遍历数组、`map()`处理并返回新数组、`every()`检查所有元素是否满足条件、`some()`检查是否存在满足条件的元素、`find()`获取首个符合条件的元素值以及`findIndex()`获取其索引位置。这些方法都不会修改原数组。
JS中常用的数组迭代方法(filter,forEach,map,every,some,find,findIndex)
|
2月前
|
存储 算法 Java
Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据
Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据
|
2月前
|
开发框架 JSON .NET
ASP.NET Core 标识(Identity)框架系列(三):在 ASP.NET Core Web API 项目中使用标识(Identity)框架进行身份验证
ASP.NET Core 标识(Identity)框架系列(三):在 ASP.NET Core Web API 项目中使用标识(Identity)框架进行身份验证
|
2月前
|
安全 Java API
Java 8 流库的魔法革命:Filter、Map、FlatMap 和 Optional 如何颠覆编程世界!
【8月更文挑战第29天】Java 8 的 Stream API 通过 Filter、Map、FlatMap 和 Optional 等操作,提供了高效、简洁的数据集合处理方式。Filter 用于筛选符合条件的元素;Map 对元素进行转换;FlatMap 将多个流扁平化合并;Optional 安全处理空值。这些操作结合使用,能够显著提升代码的可读性和简洁性,使数据处理更为高效和便捷。
41 0