MapReduce任务的输入文件一般是存储在HDFS里面。输入的文件格式包括:基于行的日志文件、二进制格式文件等。这些文件一般会很大,达到数十GB,甚至更大。那么MapReduce是如何读取这些数据的呢?下面我们首先学习InputFormat接口。
InputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
1)TextInputFormat
TextInputFormat是默认的InputFormat。每条记录是一行输入。键K是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。
以下是一个示例,比如,一个分片包含了如下4条文本记录。
Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise
每条记录表示为以下键/值对:
(0,Rich learning form) (20,Intelligent learning engine) (49,Learning more convenient) (75,From the real demand for more close to the enterprise)
很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。
计算公式:字符个数+空格+偏移量(自占一位)1
2)KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");来设定分隔符。默认分隔符是tab(\t)。
job.setInputFormatClass(KeyValueTextInputFormat.class);
以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符。
line1 ——>Rich,learning form line2 ——>Intelligent,learning engine line3 ——>Learning,more convenient line4 ——>From,the real demand for more close to the enterprise
每条记录表示为以下键/值对:
(Rich,learning form) (Intelligent,learning engine) (Learning,more convenient) (From the,real demand for more close to the enterprise)
此时的键是每行排在制表符之前的Text序列。
3)NLineInputFormat
如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数(20),如果不整除,切片数=商+1。
以下是一个示例,仍然以上面的4行输入为例。
Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise
例如,如果N是2,则每个输入分片包含两行。开启2个maptask。
(0,Rich learning form) (19,Intelligent learning engine)
另一个 mapper 则收到后两行:
(47,Learning more convenient) (72,From the real demand for more close to the enterprise)
这里的键和值与TextInputFormat生成的一样。
4)自定义InputFormat
1)概述
(1)自定义一个类继承FileInputFormat。
(2)改写RecordReader,实现一次读取一个完整文件封装为KV。
(3)在输出时使用SequenceFileOutPutFormat输出合并文件。
2)案例实操
小文件处理(自定义InputFormat)。
目标
3个小文件通过mapreduce输出到一个文件中:
准备3个文件a.txt b.txt c.txt
代码
MyInPutFormat.java package com.hfl.input1; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class MyInPutFormat extends FileInputFormat<NullWritable,BytesWritable> { /** * 不切分 * */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { MyRecordRead recordRead = new MyRecordRead(); recordRead.initialize(inputSplit,context); return recordRead; } }
MyRecordRead.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class MyRecordRead extends RecordReader<NullWritable, BytesWritable> { private Configuration configuration; private FileSplit split; //是否数据加工 private boolean processed = false; private BytesWritable value = new BytesWritable(); /** * 初始化 * */ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { configuration = context.getConfiguration(); split = (FileSplit) inputSplit; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed){ //定义存储数据的缓冲区 byte[] contents = new byte[(int) split.getLength()]; FileSystem fs = null; FSDataInputStream fis = null; try { //获取文件系统 Path path = split.getPath(); fs = path.getFileSystem(configuration); //创建读数据的流 fis = fs.open(path); //读取文件 IOUtils.readFully(fis,contents,0,contents.length); //写文件 value.set(contents,0,contents.length); }catch (Exception e){ }finally { IOUtils.closeStream(fis); } //不重复读数据 processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return processed?1:0; } @Override public void close() throws IOException { } }
SequenceMap.java import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class SequenceMap extends Mapper<NullWritable,BytesWritable,Text,BytesWritable> { Text k = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取文件的路径 FileSplit fileSplit = (FileSplit) context.getInputSplit(); String name = fileSplit.getPath().toString(); k.set(name); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(k,value); } }
SequenceReduce.java import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SequenceReduce extends Reducer<Text, BytesWritable, Text, BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { for (BytesWritable b : values ) { context.write(key, b); } } }
SequenceDriver.java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class SequenceDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"F:\\input\\inpuformat","F:\\output\\inputformat"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SequenceDriver.class); job.setMapperClass(SequenceMap.class); job.setReducerClass(SequenceReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); //设置自定义的inputformat job.setInputFormatClass(MyInPutFormat.class); //设置输出的二进制 job.setOutputFormatClass(SequenceFileOutputFormat.class); //输入输出路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }
运行结果:
大功告成,收工!!