Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)

简介:

声明:

  大数据中,最重要的算子操作是:join  !!!

 

 

 

典型的transformation和action

 

 

 

 

 

val nums = sc.parallelize(1 to 10) //根据集合创建RDD

map适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 + item)
mapped.collect.foreach(println)
}
}

map源码

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}





 

filter适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 + item)
val filtered = nums.filter(item => item%2 == 0)
filtered.collect.foreach(println)
}
}

filter源码

/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}









flatMap适用于

 

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)

val nums = sc.parallelize(1 to 10) //根据集合创建RDD

val mapped = nums.map(item => 2 + item)
// mapped.collect.foreach(println)

val filtered = nums.filter(item => item%2 == 0)
// filtered.collect.foreach(println)

val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")
val bigDataString = sc.parallelize(bigData)
val words = bigDataString.flatMap(line => line.split(" "))
words.collect.foreach(println)
sc.stop()
}
}




flatMap源码

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

 

 

 

 

 

成为大牛,必写的写法 ->

groupByKey适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
groupByKeyTransformation(sc)

sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))
val dataRDD = sc.parallelize(data)
val grouped = dataRDD.groupByKey()
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
}

groupByKey源码
**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}






 

reduceByKey适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
reduceByKeyTransformation(sc)//reduceByKey案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
}


reduceByKey源码

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}






join适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
// reduceByKeyTransformation(sc)//reduceByKey案例
joinTransformation(sc)//join案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
def joinTransformation(sc:SparkContext){
val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop"))
val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65))
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScores)
val studentNamesAndScores = names.join(scores)
studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印

}
}

join源码

/**
* Cartesian join with another [[DataFrame]].
*
* Note that cartesian joins are very expensive without an extra filter that can be pushed down.
*
* @param right Right side of the join operation.
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame): DataFrame = {
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}

/**
* Inner equi-join with another [[DataFrame]] using the given column.
*
* Different from other join functions, the join column will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the column "user_id"
* df1.join(df2, "user_id")
* }}}
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumn Name of the column to join on. This column must exist on both sides.
* @group dfops
* @since 1.4.0
*/
def join(right: DataFrame, usingColumn: String): DataFrame = {
join(right, Seq(usingColumn))
}

/**
* Inner equi-join with another [[DataFrame]] using the given columns.
*
* Different from other join functions, the join columns will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the columns "user_id" and "user_name"
* df1.join(df2, Seq("user_id", "user_name"))
* }}}
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @group dfops
* @since 1.4.0
*/
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]

// Project only one of the join columns.
val joinedCols = usingColumns.map(col => joined.right.resolve(col))
val condition = usingColumns.map { col =>
catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col))
}.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
catalyst.expressions.And(cond, eqTo)
}

Project(
joined.output.filterNot(joinedCols.contains(_)),
Join(
joined.left,
joined.right,
joinType = Inner,
condition)
)
}

/**
* Inner join with another [[DataFrame]], using the given join expression.
*
* {{{
* // The following two are equivalent:
* df1.join(df2, $"df1Key" === $"df2Key")
* df1.join(df2).where($"df1Key" === $"df2Key")
* }}}
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner")

/**
* Join with another [[DataFrame]], using the given join expression. The following performs
* a full outer join between `df1` and `df2`.
*
* {{{
* // Scala:
* import org.apache.spark.sql.functions._
* df1.join(df2, $"df1Key" === $"df2Key", "outer")
*
* // Java:
* import static org.apache.spark.sql.functions.*;
* df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
* }}}
*
* @param right Right side of the join.
* @param joinExprs Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
* @group dfops
* @since 1.3.0
*/
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
// Note that in this function, we introduce a hack in the case of self-join to automatically
// resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
// Consider this case: df.join(df, df("key") === df("key"))
// Since df("key") === df("key") is a trivially true condition, this actually becomes a
// cartesian join. However, most likely users expect to perform a self join using "key".
// With that assumption, this hack turns the trivially true condition into equality on join
// keys that are resolved to both sides.

// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))
.queryExecution.analyzed.asInstanceOf[Join]

// If auto self join alias is disabled, return the plan.
if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
return plan
}

// If left/right have no output set intersection, return the plan.
val lanalyzed = this.logicalPlan.queryExecution.analyzed
val ranalyzed = right.logicalPlan.queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
return plan
}

// Otherwise, find the trivially true predicates and automatically resolves them to both sides.
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
if a.sameRef(b) =>
catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}





 
cogroup的scala版,适用于

package com.zhouls.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2016/9/27.
*/
object Transformations {
def main(args: Array[String]) {
val sc = sparkContext("Transformations Operations") //创建SparkContext
// mapTransformation(sc)//map案例
// filterTransformation(sc)//filter案例
// flatMapTransformation(sc)//flatMap案例
// groupByKeyTransformation(sc)//groupByKey案例
// reduceByKeyTransformation(sc)//reduceByKey案例
// joinTransformation(sc)//join案例
cogroupTransformation(sc)//cogroup案例
sc.stop() //停止sparkContext,释放相关的Driver对象,释放资源
}
def sparkContext(name:String)={
val conf = new SparkConf().setAppName("Transformations").setMaster("local")
val sc = new SparkContext(conf)
sc
}

def mapTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 10) //根据集合创建RDD
val mapped = nums.map(item => 2 * item) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理
mapped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def filterTransformation(sc:SparkContext){
val nums = sc.parallelize(1 to 20) //根据集合创建RDD
val filtered = nums.filter(item => item%2 == 0)//根据filter中作为参数的函数Boolean来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD。
filtered.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def flatMapTransformation(sc:SparkContext){
val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
val bigDataString = sc.parallelize(bigData)//创建以字符串为元素类型的MapPartitionsRDD
val words = bigDataString.flatMap(line => line.split(" "))//首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,是{Scala Spark Java Hadoop Java Tachyon}
words.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def groupByKeyTransformation(sc:SparkContext){
val data = Array(Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(70,"Hadoop"),Tuple2(80,"Kafka"),Tuple2(80,"HBase"))//准备数据
val dataRDD = sc.parallelize(data)//根据集合创建RDD
val grouped = dataRDD.groupByKey()//按照相同的key对value进行分组,分组后的value是一个集合
grouped.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def reduceByKeyTransformation(sc:SparkContext){
val lines = sc.textFile("D://SoftWare//spark-1.6.2-bin-hadoop2.6//README.md")
val words = lines.flatMap{ line => line.split(" ")}
val pairs = words.map { word => (word,1) }
val wordCountsOdered = pairs.reduceByKey(_+_)//对相同的key,进行value的累计(包括local和reducer级别同时reduce)
wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))//收集计算结果并通过foreach循环打印
}
def joinTransformation(sc:SparkContext){
val studentNames = Array(Tuple2(1,"Spark"),Tuple2(2,"Tachyon"),Tuple2(3,"Hadoop"))
val studentScores = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,65))
val names = sc.parallelize(studentNames)
val scores = sc.parallelize(studentScores)
val studentNamesAndScores = names.join(scores)
studentNamesAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
def cogroupTransformation(sc:SparkContext){
val namesLists = Array(Tuple2(1,"xiaoming"),Tuple2(2,"xiaozhou"),Tuple2(3,"xiaoliu"))
val scoresLists = Array(Tuple2(1,100),Tuple2(2,95),Tuple2(3,85),Tuple2(1,75),Tuple2(2,65),Tuple2(3,55))
val names = sc.parallelize(namesLists)
val scores = sc.parallelize(scoresLists)
val namesListsAndScores = names.cogroup(scores)
namesListsAndScores.collect.foreach(println)//收集计算结果并通过foreach循环打印
}
}

 

 
cogroup源码

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]])
}
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}

/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
}




本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5913846.html,如需转载请自行联系原作者

相关文章
|
4天前
|
人工智能 搜索推荐 API
自学记录鸿蒙API 13:实现人脸比对Core Vision Face Comparator
在完成文本识别和人脸检测项目后,我深入学习了HarmonyOS Next API 13中的Core Vision Face Comparator API,开发了一个简单的人脸比对工具。该API能进行高精度人脸比对并给出相似度评分,应用场景广泛,如照片分类、身份认证、个性化服务等。通过初始化服务、加载图片、实现比对功能和构建用户界面,最终实现了可靠的人脸比对功能。未来计划将此技术应用于更复杂的场景,如照片管理和个性化服务,并探索与其他AI能力的结合。如果你也对人脸比对感兴趣,不妨从简单的比对功能开始,逐步实现自己的创意!
95 61
|
5天前
|
人工智能 自然语言处理 文字识别
自学记录鸿蒙API 13:实现智能文本识别Core Vision Text Recognition
在完成语音助手项目后,我尝试了HarmonyOS Next API 13中的Core Vision Text Recognition API,体验其强大的文本识别功能。该API支持多语言高精度识别,能快速将图像中的文本提取为结构化信息,适用于文档扫描、票据管理和实时翻译等场景。通过权限配置、初始化服务、实现识别功能和构建用户界面,我完成了文本识别应用的开发,并探索了性能优化与功能扩展。鸿蒙生态的强大支持让开发者能更便捷地实现复杂功能。未来计划将此技术应用于实际项目,如票据管理或实时翻译工具。如果你也对文本识别感兴趣,不妨一起探索!
61 11
|
4天前
|
人工智能 监控 安全
自学记录鸿蒙 API 13:骨骼点检测应用Core Vision Skeleton Detection
骨骼点检测技术能够从图片中识别出人体的关键骨骼点位置,如头部、肩部、手肘等,广泛应用于运动健身指导、游戏交互、医疗辅助、安全监控等领域。我决定深入学习HarmonyOS Next API 13中的Skeleton Detection API,并开发一个简单的骨骼点检测应用。通过理解API核心功能、项目初始化与配置、实现检测功能、构建用户界面,以及性能优化和功能扩展,逐步实现这一技术的应用。未来计划将其应用于健身指导和智能监控领域,探索与其他AI能力的结合,开发更智能的解决方案。如果你也对骨骼点检测感兴趣,不妨一起进步!
129 9
|
5天前
|
人工智能 监控 安全
自学记录鸿蒙 API 13:实现人脸检测 Core Vision Face Detector
本文介绍了基于HarmonyOS Next API 13中的Core Vision Face Detector API实现人脸检测小应用的过程。通过研究发现,该API不仅支持人脸检测框的定位,还能识别关键点(如眼睛、鼻子和嘴角位置)及人脸姿态信息。文章详细记录了开发历程,包括项目初始化、权限配置、图像加载与人脸检测、用户界面设计,以及性能优化和功能扩展的思路。应用场景涵盖身份验证、照片管理和实时交互等。未来计划将技术应用于智能照片管理工具,提供更高效的照片分类体验。欢迎对人脸检测技术感兴趣的读者一起探讨和进步。
80 7
|
4月前
|
存储 API 数据库
如何使用 ef core 的 code first(fluent api)模式实现自定义类型转换器?
本文介绍了如何在 EF Core 的 Code First 模式下使用自定义类型转换器实现 JsonDocument 和 DateTime 类型到 SQLite 数据库的正确映射。通过自定义 ValueConverter,实现了数据类型的转换,并展示了完整的项目结构和代码实现,包括实体类定义、DbContext 配置、Repositories 仓储模式及数据库应用迁移(Migrations)操作。
79 6
如何使用 ef core 的 code first(fluent api)模式实现自定义类型转换器?
|
4月前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
5月前
|
UED 存储 自然语言处理
【语言无界·体验无疆】解锁Vaadin应用全球化秘籍:从代码到文化,让你的应用畅游世界每一个角落!
【8月更文挑战第31天】《国际化与本地化实战:构建多语言支持的Vaadin应用》详细介绍了如何使用Vaadin框架实现应用的国际化和本地化,提升用户体验和市场竞争力。文章涵盖资源文件的创建与管理、消息绑定与动态加载、日期和数字格式化及文化敏感性处理等方面,通过具体示例代码和最佳实践,帮助开发者构建适应不同语言和地区设置的Vaadin应用。通过这些步骤,您的应用将更加灵活,满足全球用户需求。
68 0
|
5月前
|
API 数据库 UED
全面解析构建高性能API的秘诀:运用Entity Framework Core与异步编程提升Web应用响应速度及并发处理能力的详细指南与实践案例
【8月更文挑战第31天】本文详细介绍了如何利用 Entity Framework Core (EF Core)的异步编程特性构建高性能 API。通过创建基于 EF Core 的 .NET Core Web API 项目,配置数据库上下文,并定义领域模型,文章展示了如何使用异步方法进行数据查询、加载相关实体及事务处理。具体代码示例涵盖了 GET、POST、PUT 和 DELETE 操作,全面展示了 EF Core 异步编程的优势,有助于提升 API 的响应速度和处理能力。
62 0
|
5月前
|
开发框架 .NET API
如何在 ASP.NET Core Web Api 项目中应用 NLog 写日志?
如何在 ASP.NET Core Web Api 项目中应用 NLog 写日志?
235 0
|
5月前
|
开发框架 中间件 .NET
分享 ASP.NET Core Web Api 中间件获取 Request Body 两个方法
分享 ASP.NET Core Web Api 中间件获取 Request Body 两个方法
205 0