一.引言
MR 任务处理相关 hive 表数据时格式为 orc 和 rcFile,下面记录两种处理方法。
编辑
二.偷懒版读取 ORC, RcFile 文件
最初不太熟悉 mr,只会 textFormat 一种输入模式,于是遇到 orc 和 rcFile 形式的 hive 数据需要在 mr 读取时,都是先通过 INSERT OVERWRITE DIRECTORY 将 hive 表重新输出一份 hdfs 的 text 数据,随后用 mr 读取该 text 文件,该方法适合偷懒且原始 hive 数据不大,如果原始 hive 表数据很大,那转换操作比较耗费资源。
function insert() { hive -e" INSERT OVERWRITE DIRECTORY '$path' select * from Table where dt='$dt';" }
此时使用 TextFormat 读取文件处理生成 Mapper 即可:
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class TextMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String[] s = value.toString().split("\t"); String _key = s[0]; String _value = s[1]; context.write(new Text(_key), new Text(_value)); } catch (Exception e) { e.printStackTrace(); } } }
三.正式版读取 ORC, RcFile 文件
1.pom 依赖
主要是 hadoop map-reduce 和 org.orc 相关依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-mapreduce</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-core</artifactId> <version>0.13.1</version> </dependency>
2.读取 orc 文件
读取 Orc File 需要选择对应的 OrcStruct 和 OriInputFormat
A.读取 orc 文件的 mapper
getFiledValue() 选取对应列的内容,直接获取内容为 WritableComparable 类型,需要 toString 转换。
public static class OrcMapper extends Mapper<LongWritable, OrcStruct, Text, Text> { @Override protected void map(LongWritable key, OrcStruct value, Context context) throws IOException, InterruptedException { String key = value.getFieldValue(0).toString(); context.write(new Text(key), new Text(value.getFieldValue(1))); context.write(new Text(key), new Text(value.getFieldValue(2))); } }
B.添加 mapper
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; MultipleInputs.addInputPath(job, new Path(input), OrcInputFormat.class, OrcMapper.class);
3.读取 RcFile 文件
A.读写 RcFile 文件的 mapper
这里获取的 value 形式为 BytesRefWritable,需要反序列化读取才能获取 String 类型字符。
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; public static class RcFileMapper extends Mapper<LongWritable, BytesRefArrayWritable, Text, Text> { @Override protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException, InterruptedException { BytesRefWritable _id = value.get(0); BytesRefWritable _value = value.get(1); String id = LazyBinaryRCFileUtils.readString(_id).trim(); String value = LazyBinaryRCFileUtils.readString(_value).trim(); context.write(new Text(id), new Text(value)); } }
Tips:
readString 函数:
public static String readString(BytesRefWritable v) throws IOException { Text txt = new Text(); txt.set(v.getData(), v.getStart(), v.getLength()); return txt.toString(); }
B.添加 mapper
MultipleInputs.addInputPath(job, new Path(input), RcfileCombineFileInputFormat.class, RcFileMapper.class);
四.总结
偷懒版的形成还是因为自己最初学习不到位导致,才会出此下策对数据多一步转化和落盘,同学们要引以为戒呀,多多学习更优的方法。