前言
在实际开发过程中,我们会经常碰到求TopN这样常见的需求,那在Spark中,是如何实现求TopN呢?带着这个问题,就来看一下TopN的实现方式都有哪些!
方式1:采用groupByKey
思路:
- 按照key对数据进行聚合(groupByKey)
- 对同组的key的所有value先转换为List,然后进行排序,最后取TopN
代码实现:
// 构造上下文 val conf = new SparkConf().setMaster("local").setAppName("topn") val sc = SparkContext.getOrCreate(conf) // 创建rdd val path = "datas/groupsort.txt" val rdd = sc.textFile(path) // rdd操作得出结果 val rdd2 = rdd.map(_.split(" ")) val result = rdd2.map(arr => (arr(0).trim, arr(1).trim.toInt)) .groupByKey() .map { case (key, values) => { // 对values中的数据进行排序,然后获取最大的前三个数据 val sortedValues = values.toList.sorted val top3Values = sortedValues.takeRight(3).reverse (key, top3Values) } } // 打印输出 result.collect().foreach(println) sc.stop()
方式2:采用两阶段聚合优化
思路:
- 第一阶段给每个key加上一个随机值前缀,然后进行局部的聚合操作
- 第二阶段去除每个key的前缀,然后进行全局的聚合操作
代码实现:
// 构造上下文 val conf = new SparkConf().setMaster("local").setAppName("topn") val sc = SparkContext.getOrCreate(conf) // 创建rdd val path = "datas/groupsort.txt" val rdd = sc.textFile(path) // rdd操作得出结果 val rdd2 = rdd.map(_.split(" ")) val result = rdd2.mapPartitions(iter => { val random = Random iter.map(arr => { val key = arr(0).trim val value = arr(1).trim.toInt ((random.nextInt(5),key),value) }) }).groupByKey() .flatMap{ case ((_,key),values) => { val sortedValues = values.toList.sorted val top3Values = sortedValues.takeRight(3).reverse top3Values.map(count => (key,count)) } } .groupByKey() .flatMap{ case (key, values) => { val sortedValues = values.toList.sorted val top3Values = sortedValues.takeRight(3).reverse top3Values.map((key,_)) } } // 打印输出 result.collect().foreach(println) sc.stop()
方式3:先获取每个分区的TopN,后获取全局TopN
思路:
- 对于每一个key获取每个分区中的TopN
- 做全局的数据聚合操作,获取TopN
代码实现:
// 构造上下文 val conf = new SparkConf().setMaster("local").setAppName("topn") val sc = SparkContext.getOrCreate(conf) // 创建rdd val path = "datas/groupsort.txt" val rdd = sc.textFile(path) // rdd操作得出结果 val rdd2 = rdd.map(_.split(" ")) val result3 = rdd2.map(arr => { val key = arr(0).trim val count = arr(1).trim.toInt (key, count) }) .mapPartitions(iter => { import scala.collection.mutable val temp = iter.foldLeft(mutable.Map[String, ArrayBuffer[Int]]())((a, b) => { val key = b._1 val count = b._2 val buf = a.getOrElseUpdate(key, new mutable.ArrayBuffer[Int]()) buf += count if (buf.size > 3) { val max3Vals = buf.sorted.takeRight(3) a(key) = max3Vals } a }) val top3IterPrePartition = temp.toList.flatMap { case (key, countIters) => countIters.map(count => (key, count)) } top3IterPrePartition.toIterator }) .groupByKey() .flatMap { case (key, values) => { val sorted = values.toList.sorted val top3 = sorted.takeRight(3).reverse top3.map((key, _)) } } result3.foreachPartition(iter => iter.foreach(println)) sc.stop()
方式4:采用aggregateByKey
思路:
- 初始值为mutable.ArrayBufferInt
- 对每组key中的每个value和之前的聚合值进行聚合操作,就是在分区中,来一个value和上次取出的TopN进行一次排序,取出新的TopN
- 对每个分区操作后的局部聚合结果进行合并聚合操作,就是在分区间,来一个分区和上次取出的TopN进行一次合并排序,取出新的TopN
代码实现:
// 构造上下文 val conf = new SparkConf().setMaster("local").setAppName("topn") val sc = SparkContext.getOrCreate(conf) // 创建rdd val path = "datas/groupsort.txt" val rdd = sc.textFile(path) // rdd操作得出结果 val rdd2 = rdd.map(_.split(" ")) import scala.collection.mutable val result4 = rdd2.map(arr => { val key = arr(0).trim val count = arr(1).trim.toInt (key, count) }).aggregateByKey(mutable.ArrayBuffer[Int]())( (u, v) => { u += v u.sorted.takeRight(3).reverse }, (u1, u2) => { u1 ++= u2 u1.sorted.takeRight(3).reverse } ).flatMap { case (key, values) => { values.toList.map((key, _)) } } result4.foreachPartition(iter => iter.foreach(println)) sc.stop()
优缺点
方式1的缺点:
- groupByKey会将相同key的所有value全部加载到内存进行处理,当value特别多的时候可能出现OOM异常
- groupByKey会将所有的value数据均发送给下一个RDD,性能比较低,因为在实际聚合操作中只需要部分数据
方式2的优缺点:
- 对于聚合类Shuffle操作(groupByKey,reduceByKey等)产生的问题能够很好的解决
- 对于非聚合类(join等)产生的问题很难使用该方法解决
方式3、方式4:
- 解决了方式1实现方式的两个缺点
- 都采用了先分区内预聚合,然后进行全局聚合的思想
结语
好了,今天就为大家分享到这里了。咱们下期见!
如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢🙏🏻