Spark多路径输出和二次排序

简介: 打开微信扫一扫,关注微信公众号【数据与算法联盟】 转载请注明出处:http://blog.csdn.net/gamer_gyt 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer 在实际应用场景中,我们对于Spark往往有各式各样的需求,比如说想MR中的二次排序,Top N,多路劲输出等。
+关注继续查看


公众号二维码
打开微信扫一扫,关注微信公众号【数据与算法联盟】

转载请注明出处:http://blog.csdn.net/gamer_gyt
博主微博:http://weibo.com/234654758
Github:https://github.com/thinkgamer

在实际应用场景中,我们对于Spark往往有各式各样的需求,比如说想MR中的二次排序,Top N,多路劲输出等。那么这篇文章我们就来看下这几个问题。

二次排序

假设我们的数据是这样的:

1   2
1   3
1   1
1   6
1   4
2   5
2   8
2   3

我们想要实现第一列按降序排列,当第一列相同时,第二列按降序排列

定义一个SecondSortKey类:

class SecondSortKey(val first: Int, val second: Int)
  extends Ordered[SecondSortKey] with Serializable {
  override def compare(that: SecondSortKey): Int = {
    if (this.first - that.first == 0) {
      this.second - that.second
    } else {
      this.first - that.first
    }
  }
}

然后这样去使用

val lines = sc.textFile("test.txt")
val pairs = lines.map { x =>
      (new SecondSortKey(x.split("\\s+")(0).toInt,
        x.split("\\s+")(1).toInt), x)
    }
val sortedPairs = pairs.sortByKey(false);
sortedPairs.map(_._2).foreach(println)

当然这里如果想按第一列升序,当第一列相同时,第二列升序的顺序排列,只需要对SecondSoryKey做如下修改即可

class SecondSortKey(val first: Int, val second: Int)
  extends Ordered[SecondSortKey] with Serializable {
  override def compare(that: SecondSortKey): Int = {
    if (this.first - that.first !== 0) {
      this.second - that.second
    } else {
      this.first - that.first
    }
  }
}

当时使用的使用去掉

pairs.sortByKey(false)

中的false

Top N

同样还是上边的数据,假设我们要得到第一列中的前五位

val lines = sc.textFile("test.txt")
val rdd = lines
        .map(x => x.split("\\s+"))
        .map(x => (x(0),x(1)))
        .sortByKey()
rdd.take(N).foreach(println)

多路径输出

自己在使用的过程中,通过搜索发现了两种方法
1:调用saveAsHadoopFile函数并自定义一个OutputFormat类

自定义RDDMultipleTextOutputFormat类

RDDMultipleTextOutputFormat类中的generateFileNameForKeyValue函数有三个参数,key和value就是我们RDD的Key和Value,而name参数是每个Reduce的编号。本例中没有使用该参数,而是直接将同一个Key的数据输出到同一个文件中。

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat  

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {  
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =  
    key.asInstanceOf[String]  
}  

调用

sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))  
      .map(value => (value._1, value._2 + "Test"))  
      .partitionBy(new HashPartitioner(3))  
      .saveAsHadoopFile("/iteblog", classOf[String],classOf[String],classOf[RDDMultipleTextOutputFormat])  

这里的

new HashPartitioner(3)

中的3是有key的种类决定的,当然在实际应用场景中,我们可能并不知道有多少k,这个时候就可以通过一个rdd 的 distinct操作来得到唯一key的数目。

2:使用dataframe

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2,"charlie")])
people_df = people_rdd.toDF(["number", "name"])
people_df.write.partitionBy("number").format("text").save(path  )

当然这两种方法都有一个缺陷,就是当数据量特别大的时候,数据在repartition的过程中特别耗费资源,也会容易出现任务failed的情况,小编采用的解决办法是,适当的对原rdd进行split,然后遍历每个rdd,进行multioutput操作

形似如下:

val rdd = sc.textFile(input)
var split_rdd = rdd.randomSplit(Array(1.0,1.0,1.0,1.0))
for (one <- Array(1,2,3,4))
{
    split_rdd(one)XXXX
}

参考:

相关文章
|
1月前
|
分布式计算 Hadoop 大数据
大数据技术解析:Hadoop、Spark、Flink和数据湖的对比
Hadoop、Spark、Flink 和数据湖都在大数据处理领域有着重要的地位,但它们各自的优势和劣势也需考虑实际应用场景。Hadoop 适用于批处理任务,Spark 更适合实时分析,而 Flink 则强调低延迟的流式处理。数据湖则是存储和管理大规模多样性数据的选择。
90 1
大数据技术解析:Hadoop、Spark、Flink和数据湖的对比
|
1月前
|
分布式计算 算法 大数据
大数据Spark企业级实战与Hadoop实战&PDF和PPT
今天给大家分享的是《大数据Spark企业级实战》与《Hadoop实战》《大数据处理系统·Hadoop源代码情景分析》《50个大厂大数据算法教程》等销量排行前10名的大数据技术书籍(文末领取PDF版)。这些书籍具有以下几个优点:易读、实践性强,对解决工作中遇到的业务问题具有一定启发性。
|
2月前
|
机器学习/深度学习 存储 分布式计算
Hadoop生态系统中的机器学习与数据挖掘技术:Apache Mahout和Apache Spark MLlib的应用
Hadoop生态系统中的机器学习与数据挖掘技术:Apache Mahout和Apache Spark MLlib的应用
|
2月前
|
分布式计算 Hadoop Java
Hadoop生态系统中的流式数据处理技术:Apache Flink和Apache Spark的比较
Hadoop生态系统中的流式数据处理技术:Apache Flink和Apache Spark的比较
|
4月前
|
SQL 分布式计算 运维
Hadoop/Spark 太重
Hadoop/Spark 太重
|
5月前
|
存储 SQL 分布式计算
Hadoop和Spark的异同
Hadoop实质上是解决大数据大到无法在一台计算机上进行存储、无法在要求的时间内进行处理的问题,是一个分布式数据基础设施。 HDFS,它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,通过将块保存到多个副本上,提供高可靠的文件存储。 MapReduce,通过简单的Mapper和Reducer的抽象提供一个编程模型,可以在一个由几十台上百台的机器上并发地分布式处理大量数据集,而把并发、分布式和故障恢复等细节隐藏。
106 0
|
8月前
|
分布式计算 Hadoop Java
spark编译:构建基于hadoop的spark安装包及遇到问题总结
spark编译:构建基于hadoop的spark安装包及遇到问题总结
221 0
spark编译:构建基于hadoop的spark安装包及遇到问题总结
|
8月前
|
SQL 消息中间件 分布式计算
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
443 0
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
|
8月前
|
分布式计算 Hadoop Scala
spark中 map和reduce理解及与hadoop的map、reduce区别
spark中 map和reduce理解及与hadoop的map、reduce区别
219 0
|
9月前
|
SQL 存储 分布式计算
HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】
开端: 今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。 沙问题? java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。
239 0
HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】
推荐文章
更多