聊一聊Spark实现TopN的几种方式

简介: Spark实现TopN

前言

在实际开发过程中,我们会经常碰到求TopN这样常见的需求,那在Spark中,是如何实现求TopN呢?带着这个问题,就来看一下TopN的实现方式都有哪些!

方式1:采用groupByKey

思路:

  1. 按照key对数据进行聚合(groupByKey)
  2. 对同组的key的所有value先转换为List,然后进行排序,最后取TopN

代码实现:

// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
val result = rdd2.map(arr => (arr(0).trim, arr(1).trim.toInt))
    .groupByKey()
    .map {
        case (key, values) => {
            // 对values中的数据进行排序,然后获取最大的前三个数据
            val sortedValues = values.toList.sorted
            val top3Values = sortedValues.takeRight(3).reverse
            (key, top3Values)
        }
    }
// 打印输出
result.collect().foreach(println)
sc.stop()

方式2:采用两阶段聚合优化

思路:

  1. 第一阶段给每个key加上一个随机值前缀,然后进行局部的聚合操作
  2. 第二阶段去除每个key的前缀,然后进行全局的聚合操作

代码实现:

// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
val result = rdd2.mapPartitions(iter => {
      val random = Random
      iter.map(arr => {
        val key = arr(0).trim
        val value = arr(1).trim.toInt
        ((random.nextInt(5),key),value)
      })
    }).groupByKey()
      .flatMap{
          case ((_,key),values) => {
            val sortedValues = values.toList.sorted
            val top3Values = sortedValues.takeRight(3).reverse
            top3Values.map(count => (key,count))
          }
        }
        .groupByKey()
        .flatMap{
          case (key, values) => {
            val sortedValues = values.toList.sorted
            val top3Values = sortedValues.takeRight(3).reverse
            top3Values.map((key,_))
          }
        }
// 打印输出
result.collect().foreach(println)
sc.stop()

方式3:先获取每个分区的TopN,后获取全局TopN

思路:

  1. 对于每一个key获取每个分区中的TopN
  2. 做全局的数据聚合操作,获取TopN

代码实现:

// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
val result3 = rdd2.map(arr => {
    val key = arr(0).trim
    val count = arr(1).trim.toInt
    (key, count)
})
.mapPartitions(iter => {
    import scala.collection.mutable
    val temp = iter.foldLeft(mutable.Map[String, ArrayBuffer[Int]]())((a, b) => {
        val key = b._1
        val count = b._2
        val buf = a.getOrElseUpdate(key, new mutable.ArrayBuffer[Int]())
        buf += count
        if (buf.size > 3) {
            val max3Vals = buf.sorted.takeRight(3)
            a(key) = max3Vals
        }
        a
    })
    val top3IterPrePartition = temp.toList.flatMap {
        case (key, countIters) => countIters.map(count => (key, count))
    }
    top3IterPrePartition.toIterator
})
.groupByKey()
.flatMap {
    case (key, values) => {
        val sorted = values.toList.sorted
        val top3 = sorted.takeRight(3).reverse
        top3.map((key, _))
    }
}
result3.foreachPartition(iter => iter.foreach(println))
sc.stop()

方式4:采用aggregateByKey

思路:

  1. 初始值为mutable.ArrayBufferInt
  2. 对每组key中的每个value和之前的聚合值进行聚合操作,就是在分区中,来一个value和上次取出的TopN进行一次排序,取出新的TopN
  3. 对每个分区操作后的局部聚合结果进行合并聚合操作,就是在分区间,来一个分区和上次取出的TopN进行一次合并排序,取出新的TopN

代码实现:

// 构造上下文
val conf = new SparkConf().setMaster("local").setAppName("topn")
val sc = SparkContext.getOrCreate(conf)
// 创建rdd
val path = "datas/groupsort.txt"
val rdd = sc.textFile(path)
// rdd操作得出结果
val rdd2 = rdd.map(_.split(" "))
import scala.collection.mutable
val result4 = rdd2.map(arr => {
    val key = arr(0).trim
    val count = arr(1).trim.toInt
    (key, count)
}).aggregateByKey(mutable.ArrayBuffer[Int]())(
    (u, v) => {
        u += v
        u.sorted.takeRight(3).reverse
    },
    (u1, u2) => {
        u1 ++= u2
        u1.sorted.takeRight(3).reverse
    }
).flatMap {
    case (key, values) => {
        values.toList.map((key, _))
    }
}
result4.foreachPartition(iter => iter.foreach(println))
sc.stop()

优缺点

方式1的缺点:

  1. groupByKey会将相同key的所有value全部加载到内存进行处理,当value特别多的时候可能出现OOM异常
  2. groupByKey会将所有的value数据均发送给下一个RDD,性能比较低,因为在实际聚合操作中只需要部分数据

方式2的优缺点:

  1. 对于聚合类Shuffle操作(groupByKey,reduceByKey等)产生的问题能够很好的解决
  2. 对于非聚合类(join等)产生的问题很难使用该方法解决

方式3、方式4:

  1. 解决了方式1实现方式的两个缺点
  2. 都采用了先分区内预聚合,然后进行全局聚合的思想

结语

好了,今天就为大家分享到这里了。咱们下期见!

如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢🙏🏻

相关文章
|
分布式计算 并行计算 Ubuntu
|
分布式计算 Spark
【Spark 应用】实现分组取topN
【Spark 应用】实现分组取topN
199 0
|
3月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
129 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
168 3
|
1月前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
39 3
|
2月前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
52 3
|
2月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
4月前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
164 59
|
2月前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
59 0
|
2月前
|
分布式计算 Hadoop 大数据
Spark 与 Hadoop 的大数据之战:一场惊心动魄的技术较量,决定数据处理的霸权归属!
【8月更文挑战第7天】无论是 Spark 的高效内存计算,还是 Hadoop 的大规模数据存储和处理能力,它们都为大数据的发展做出了重要贡献。
70 2
下一篇
无影云桌面