Spark/Scala - 读取 RcFile && OrcFile

简介: ​上文提到了MapReduce - 读取 OrcFile, RcFile 文件,这里通过 Java + MapReduce 实现了读取 RcFile 和 OrcFile 文件,后续又遇到MapReduce - 同时读取 RcFile 和 OrcFile 的依赖冲突,也顺利解决,但是平常开发还是习惯 spark 所以改用 spark 实现读取 OrcFile 和 RcFile 以及 Map-Reduce 的功能。......

一.引言

上文提到了 MapReduce - 读取 OrcFile, RcFile 文件,这里通过 Java + MapReduce 实现了读取 RcFile 和 OrcFile 文件,后续又遇到 MapReduce - 同时读取 RcFile 和 OrcFile 的依赖冲突,也顺利解决,但是平常开发还是习惯 spark 所以改用 spark 实现读取 OrcFile 和 RcFile 以及 Map-Reduce 的功能。

二.读取 RcFile

image.gif编辑

前面 mr 的任务我们已经对 RcFile 的形式有了了解,其 key 的形式不关键,LongWritable 是行号, NullWritable 就是忽略行号,主要是 value 的形式: BytesRefArrayWritable,所以可以使用 spark 的 hadoopFile API 实现 RcFile 的读取,先看下 hadoopFile 的参数:

def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

image.gif

keyClass 和 ValueClass 上面已经确定,inputFormat 也好确定,根据 MR - MultipleInputs.addInputPath 可以得知其 inputFormatClass 为 RCFileInputFormat,下面开始读取:

val conf = (new SparkConf).setAppName("TestReadRcFile").setMaster("local[1]")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    val sc = spark.sparkContext
    val rcFileInput = “”
    val minPartitions = 100
    println(repeatString("=", 30) + "开始读取 RcFile" + repeatString("=", 30))
    val rcFileRdd = sc.hadoopFile(rcFileInput, classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[LongWritable, BytesRefArrayWritable]], classOf[LongWritable], classOf[BytesRefArrayWritable], minPartitions)
      .map(line => {
        val key = LazyBinaryRCFileUtils.readString(line._2.get(0))
        val value = LazyBinaryRCFileUtils.readString(line._2.get(1))
        (key, value)
      })
    println(repeatString("=", 30) + "结束读取 RcFile" + repeatString("=", 30))

image.gif

image.gif编辑

Tips:

readString 函数:

public static String readString(BytesRefWritable v) throws IOException {
    Text txt = new Text();
    txt.set(v.getData(), v.getStart(), v.getLength());
    return txt.toString();
  }

image.gif

三.读取 OrcFile

相比于读取 RcFile 需要用 hadoopFile,因为 SparkSession 提供了直接读取 orcFile 的 API,使得 spark 读取 OrcFile 相当的丝滑,注意这里 orc 读取后返回的是 DataSet,需要通过 .rdd 转化为 Spark 的常规 RDD。

val conf = (new SparkConf).setAppName("TestReadRcFile").setMaster("local[1]")
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
    println(repeatString("=", 30) + "开始读取 OrcFile" + repeatString("=", 30))
    import spark.implicits._
    val orcInput = “”
    val orcFileRdd = spark.read.orc(orcInput).map(row => {
       val key = row.getString(0)
       val value = row.getString(1)
      (key, value)
    }).rdd
    println(repeatString("=", 30) + "结束读取 OrcFile" + repeatString("=", 30))

image.gif

image.gif编辑

四.spark 实现 Map-Reduce

上述 rcFileRdd 和 orcFileRdd 两个 PairRdd 可以看作是两个 Mapper,下面执行 reduce 操作,通过 union 实现各个 pairRdd 的合并,随后执行 groupByKey 对目标 key 进行 value 聚合,随后执行 reduce 的操作即可:

rcFileRdd.union(orcFileRdd).groupByKey().map(info => {
      val key: String = info._1
      val values: Iterable[String] = info._2
        ... reduce func ...
    })

image.gif

五.总结

相比于 MR 读取 RcFile 与 OrcFile,spark 的 API 还是相对简洁一些,有需要的小伙伴可以尝试,肥肠的奈斯~

目录
相关文章
|
3天前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
11 0
|
3天前
|
分布式计算 Hadoop Scala
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
7 0
|
1月前
|
SQL 存储 分布式计算
在scala中使用spark
在scala中使用spark
25 0
|
1月前
|
分布式计算 Java Scala
spark 与 scala 的对应版本查看、在idea中maven版本不要选择17,弄了好久,换成11就可以啦
spark 与 scala 的对应版本查看、.在idea中maven版本不要选择17,弄了好久,换成11就可以啦
150 2
|
1月前
|
分布式计算 Java Scala
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
|
1月前
|
分布式计算 数据处理 Scala
Spark 集群和 Scala 编程语言的关系
Spark 集群和 Scala 编程语言的关系
|
8天前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
35 7