InputFormat接口实现类案例

简介: InputFormat接口实现类案例

MapReduce任务的输入文件一般是存储在HDFS里面。输入的文件格式包括:基于行的日志文件、二进制格式文件等。这些文件一般会很大,达到数十GB,甚至更大。那么MapReduce是如何读取这些数据的呢?下面我们首先学习InputFormat接口。

InputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。


1)TextInputFormat


TextInputFormat是默认的InputFormat。每条记录是一行输入。键K是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。

以下是一个示例,比如,一个分片包含了如下4条文本记录。


Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise


每条记录表示为以下键/值对:


(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(75,From the real demand for more close to the enterprise)


很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。


计算公式:字符个数+空格+偏移量(自占一位)1


2)KeyValueTextInputFormat


每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");来设定分隔符。默认分隔符是tab(\t)。


job.setInputFormatClass(KeyValueTextInputFormat.class);


以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符。


line1 ——>Rich,learning form
line2 ——>Intelligent,learning engine
line3 ——>Learning,more convenient
line4 ——>From,the real demand for more close to the enterprise

每条记录表示为以下键/值对:


(Rich,learning form)
(Intelligent,learning engine)
(Learning,more convenient)
(From the,real demand for more close to the enterprise)


此时的键是每行排在制表符之前的Text序列。


3)NLineInputFormat


如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数(20),如果不整除,切片数=商+1。


以下是一个示例,仍然以上面的4行输入为例。


Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise


例如,如果N是2,则每个输入分片包含两行。开启2个maptask。


(0,Rich learning form)
(19,Intelligent learning engine)


另一个 mapper 则收到后两行:


(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)


   这里的键和值与TextInputFormat生成的一样。


4)自定义InputFormat


1)概述


(1)自定义一个类继承FileInputFormat。

(2)改写RecordReader,实现一次读取一个完整文件封装为KV。

(3)在输出时使用SequenceFileOutPutFormat输出合并文件。


2)案例实操


小文件处理(自定义InputFormat)。


目标


3个小文件通过mapreduce输出到一个文件中:


准备3个文件a.txt b.txt c.txt

46a9d80a6e05e4e3b19d57a0ee70bcdf.png

代码

MyInPutFormat.java
package com.hfl.input1;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class MyInPutFormat extends FileInputFormat<NullWritable,BytesWritable> {
    /**
     * 不切分
     * */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        MyRecordRead recordRead = new MyRecordRead();
        recordRead.initialize(inputSplit,context);
        return recordRead;
    }
}


MyRecordRead.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class MyRecordRead extends RecordReader<NullWritable, BytesWritable> {
    private Configuration configuration;
    private FileSplit split;
    //是否数据加工
    private boolean processed = false;
    private BytesWritable value = new BytesWritable();
    /**
     * 初始化
     * */
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        configuration = context.getConfiguration();
        split = (FileSplit) inputSplit;
    }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed){
            //定义存储数据的缓冲区
            byte[] contents = new byte[(int) split.getLength()];
            FileSystem fs = null;
            FSDataInputStream fis = null;
            try {
                //获取文件系统
                Path path = split.getPath();
                fs = path.getFileSystem(configuration);
                //创建读数据的流
                fis = fs.open(path);
                //读取文件
                IOUtils.readFully(fis,contents,0,contents.length);
                //写文件
                value.set(contents,0,contents.length);
            }catch (Exception e){
            }finally {
                IOUtils.closeStream(fis);
            }
            //不重复读数据
            processed = true;
            return true;
        }
        return false;
    }
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed?1:0;
    }
    @Override
    public void close() throws IOException {
    }
}


SequenceMap.java
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class SequenceMap extends Mapper<NullWritable,BytesWritable,Text,BytesWritable> {
    Text k = new Text();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取文件的路径
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        String name = fileSplit.getPath().toString();
        k.set(name);
    }
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(k,value);
    }
}


SequenceReduce.java
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SequenceReduce extends Reducer<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        for (BytesWritable b : values  ) {
            context.write(key, b);
        }
    }
}


SequenceDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
public class SequenceDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"F:\\input\\inpuformat","F:\\output\\inputformat"};
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SequenceDriver.class);
        job.setMapperClass(SequenceMap.class);
        job.setReducerClass(SequenceReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        //设置自定义的inputformat
        job.setInputFormatClass(MyInPutFormat.class);
        //设置输出的二进制
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //输入输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.waitForCompletion(true);
    }
}


运行结果:

1dc618a0ed9580ce8bfa6facb208c08f.png

5d4c6812c8535adbb050f4ddf2e1bce8.png


大功告成,收工!!


相关文章
|
8月前
|
Java Spring
Spring注入类的两种形式
Spring注入类的两种形式
88 0
Spring注入类的两种形式
|
Java 索引 容器
27 java集合之collection接口、子接口及实现类
集合概念:像数组一样是java的一个容器;和数组不同的是数组只能存同类型的数据,且长度定义之后就不可变,集合不仅可以存多种类型的数据,而且还提供了增、删、改、查的方法;
200 0
|
Java
Java枚举类(1)--枚举类的定义、方法使用和接口实现
Java枚举类(1)--枚举类的定义、方法使用和接口实现
438 0
Java枚举类(1)--枚举类的定义、方法使用和接口实现
|
Java 编译器
Java用相同的方法在一个类中实现两个接口哪种接口方法被覆盖?
Java用相同的方法在一个类中实现两个接口哪种接口方法被覆盖?
136 0
|
分布式计算
OutputFormat接口实现类案例
OutputFormat接口实现类案例
123 0
OutputFormat接口实现类案例
|
存储 分布式计算 自然语言处理
Hadoop序列化、概述、自定义bean对象实现序列化接口(Writable)、序列化案例实操、编写流量统计的Bean对象、编写Mapper类、编写Reducer类、编写Driver驱动类
Hadoop序列化、概述、自定义bean对象实现序列化接口(Writable)、序列化案例实操、编写流量统计的Bean对象、编写Mapper类、编写Reducer类、编写Driver驱动类
Hadoop序列化、概述、自定义bean对象实现序列化接口(Writable)、序列化案例实操、编写流量统计的Bean对象、编写Mapper类、编写Reducer类、编写Driver驱动类
|
Java
Java抽象类 Java接口 抽象类和接口使用区别: activity 深度继承关系
Java抽象类 Java接口 抽象类和接口使用区别: activity 深度继承关系
98 0
|
Java 开发者
自定义 ClassLoader 处理类|学习笔记
快速学习自定义 ClassLoader 处理类
116 0
自定义 ClassLoader 处理类|学习笔记
|
Java Spring 容器
自定义ApplicationContextInitializer接口实现
自定义ApplicationContextInitializer接口实现
271 0
|
分布式计算 Hadoop 开发者
OutPutFomat 接口实现类 | 学习笔记
快速学习 OutPutFomat 接口实现类
138 0

热门文章

最新文章

下一篇
开通oss服务