MapReduce - 读取 ORC, RcFile 文件

简介: 一.引言MR 任务处理相关 hive 表数据时格式为 orc 和 rcFile,下面记录两种处理方法。二.偷懒版读取 ORC,RcFile 文件最初不太熟悉 mr,只会 textFormat 一种输入模式,于是遇到 orc 和 rcFile 形式的 hive 数据需要在 mr 读取时,都是先通过 INSERTOVERWRITEDIRECTORY 将 hive 表重新输出一份 hdfs 的 text 数据,随后用 mr 读取该 text 文件,该方法适合偷懒且原始 hive 数据不大,..

一.引言

MR 任务处理相关 hive 表数据时格式为 orc 和 rcFile,下面记录两种处理方法。

image.gif编辑

二.偷懒版读取 ORC, RcFile 文件

最初不太熟悉 mr,只会 textFormat 一种输入模式,于是遇到 orc 和 rcFile 形式的 hive 数据需要在 mr 读取时,都是先通过 INSERT OVERWRITE DIRECTORY 将 hive 表重新输出一份 hdfs 的 text 数据,随后用 mr 读取该 text 文件,该方法适合偷懒且原始 hive 数据不大,如果原始 hive 表数据很大,那转换操作比较耗费资源。

function insert() {
hive -e"
INSERT OVERWRITE DIRECTORY '$path'
select * from Table where dt='$dt';"
}

image.gif

此时使用 TextFormat 读取文件处理生成 Mapper 即可:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TextMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        try {
            String[] s = value.toString().split("\t");
            String _key = s[0];
            String _value = s[1];
            context.write(new Text(_key), new Text(_value));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

image.gif

三.正式版读取 ORC, RcFile 文件

1.pom 依赖

主要是 hadoop map-reduce 和 org.orc 相关依赖

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.orc</groupId>
            <artifactId>orc-mapreduce</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive.hcatalog</groupId>
            <artifactId>hive-hcatalog-core</artifactId>
            <version>0.13.1</version>
        </dependency>

image.gif

2.读取 orc 文件

读取 Orc File 需要选择对应的 OrcStruct 和 OriInputFormat

A.读取 orc 文件的 mapper

getFiledValue() 选取对应列的内容,直接获取内容为 WritableComparable 类型,需要 toString 转换。

public static class OrcMapper extends Mapper<LongWritable, OrcStruct, Text, Text> {
@Override
protected void map(LongWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
    String key = value.getFieldValue(0).toString();
    context.write(new Text(key), new Text(value.getFieldValue(1)));
    context.write(new Text(key), new Text(value.getFieldValue(2)));
    }
}

image.gif

B.添加 mapper

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
MultipleInputs.addInputPath(job, new Path(input), OrcInputFormat.class, OrcMapper.class);

image.gif

3.读取 RcFile 文件

A.读写 RcFile 文件的 mapper

这里获取的 value 形式为 BytesRefWritable,需要反序列化读取才能获取 String 类型字符。

import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
public static class RcFileMapper extends Mapper<LongWritable, BytesRefArrayWritable, Text, Text> {
    @Override
    protected void map(LongWritable key, BytesRefArrayWritable value, Context context)
            throws IOException, InterruptedException {
        BytesRefWritable _id = value.get(0);
        BytesRefWritable _value = value.get(1);
        String id = LazyBinaryRCFileUtils.readString(_id).trim();
        String value = LazyBinaryRCFileUtils.readString(_value).trim();
        context.write(new Text(id), new Text(value));
    }
}

image.gif

Tips:

readString 函数:

public static String readString(BytesRefWritable v) throws IOException {
    Text txt = new Text();
    txt.set(v.getData(), v.getStart(), v.getLength());
    return txt.toString();
  }

image.gif

B.添加 mapper

MultipleInputs.addInputPath(job, new Path(input), RcfileCombineFileInputFormat.class, RcFileMapper.class);

image.gif

四.总结

偷懒版的形成还是因为自己最初学习不到位导致,才会出此下策对数据多一步转化和落盘,同学们要引以为戒呀,多多学习更优的方法。

目录
相关文章
|
7天前
|
存储 分布式计算 算法
MapReduce 处理压缩文件的能力
【8月更文挑战第12天】
20 4
|
2月前
|
数据采集 SQL 分布式计算
|
3月前
|
分布式计算
如何在MapReduce中处理多个输入文件?
如何在MapReduce中处理多个输入文件?
124 0
|
3月前
|
存储 分布式计算 自然语言处理
MapReduce【小文件的优化-Sequence文件】
MapReduce【小文件的优化-Sequence文件】
|
存储 分布式计算 资源调度
MapReduce之小文件问题
针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然储了很多个文件,但是文件的体积并不大,这样就没有意义了。
115 0
|
分布式计算
有一个日志文件visitlog.txt,其中记录了用户访问网站的日期和访问的网站地址信息,每行一条记录。要求编写mapreduce程序完成以下功能: 1、 将不同访问日期的访问记录分配给不同的red
有一个日志文件visitlog.txt,其中记录了用户访问网站的日期和访问的网站地址信息,每行一条记录。要求编写mapreduce程序完成以下功能: 1、 将不同访问日期的访问记录分配给不同的red
113 0
|
分布式计算 Hadoop
Hadoop学习:MapReduce实现文件的解压缩
Hadoop学习:MapReduce实现文件的解压缩
125 0
|
分布式计算 自然语言处理 Java
MapReduce实现与自定义词典文件基于hanLP的中文分词详解
文本分类任务的第1步,就是对语料进行分词。在单机模式下,可以选择python jieba分词,使用起来较方便。但是如果希望在Hadoop集群上通过mapreduce程序来进行分词,则hanLP更加胜任。
2699 0
|
分布式计算 DataWorks Java
[MaxCompute MapReduce实践]通过简单瘦身,解决Dataworks 10M文件限制问题
用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。 解决方案: jar -resources test_mr.
3075 0