一.引言
上文提到了 MapReduce - 读取 OrcFile, RcFile 文件,这里通过 Java + MapReduce 实现了读取 RcFile 和 OrcFile 文件,后续又遇到 MapReduce - 同时读取 RcFile 和 OrcFile 的依赖冲突,也顺利解决,但是平常开发还是习惯 spark 所以改用 spark 实现读取 OrcFile 和 RcFile 以及 Map-Reduce 的功能。
二.读取 RcFile
编辑
前面 mr 的任务我们已经对 RcFile 的形式有了了解,其 key 的形式不关键,LongWritable 是行号, NullWritable 就是忽略行号,主要是 value 的形式: BytesRefArrayWritable,所以可以使用 spark 的 hadoopFile API 实现 RcFile 的读取,先看下 hadoopFile 的参数:
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
keyClass 和 ValueClass 上面已经确定,inputFormat 也好确定,根据 MR - MultipleInputs.addInputPath 可以得知其 inputFormatClass 为 RCFileInputFormat,下面开始读取:
val conf = (new SparkConf).setAppName("TestReadRcFile").setMaster("local[1]") val spark = SparkSession .builder .config(conf) .getOrCreate() val sc = spark.sparkContext val rcFileInput = “” val minPartitions = 100 println(repeatString("=", 30) + "开始读取 RcFile" + repeatString("=", 30)) val rcFileRdd = sc.hadoopFile(rcFileInput, classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[LongWritable, BytesRefArrayWritable]], classOf[LongWritable], classOf[BytesRefArrayWritable], minPartitions) .map(line => { val key = LazyBinaryRCFileUtils.readString(line._2.get(0)) val value = LazyBinaryRCFileUtils.readString(line._2.get(1)) (key, value) }) println(repeatString("=", 30) + "结束读取 RcFile" + repeatString("=", 30))
编辑
Tips:
readString 函数:
public static String readString(BytesRefWritable v) throws IOException { Text txt = new Text(); txt.set(v.getData(), v.getStart(), v.getLength()); return txt.toString(); }
三.读取 OrcFile
相比于读取 RcFile 需要用 hadoopFile,因为 SparkSession 提供了直接读取 orcFile 的 API,使得 spark 读取 OrcFile 相当的丝滑,注意这里 orc 读取后返回的是 DataSet,需要通过 .rdd 转化为 Spark 的常规 RDD。
val conf = (new SparkConf).setAppName("TestReadRcFile").setMaster("local[1]") val spark = SparkSession .builder .config(conf) .getOrCreate() println(repeatString("=", 30) + "开始读取 OrcFile" + repeatString("=", 30)) import spark.implicits._ val orcInput = “” val orcFileRdd = spark.read.orc(orcInput).map(row => { val key = row.getString(0) val value = row.getString(1) (key, value) }).rdd println(repeatString("=", 30) + "结束读取 OrcFile" + repeatString("=", 30))
编辑
四.spark 实现 Map-Reduce
上述 rcFileRdd 和 orcFileRdd 两个 PairRdd 可以看作是两个 Mapper,下面执行 reduce 操作,通过 union 实现各个 pairRdd 的合并,随后执行 groupByKey 对目标 key 进行 value 聚合,随后执行 reduce 的操作即可:
rcFileRdd.union(orcFileRdd).groupByKey().map(info => { val key: String = info._1 val values: Iterable[String] = info._2 ... reduce func ... })
五.总结
相比于 MR 读取 RcFile 与 OrcFile,spark 的 API 还是相对简洁一些,有需要的小伙伴可以尝试,肥肠的奈斯~