MapReduce【自定义OutputFormat】

简介: MapReduce【自定义OutputFormat】

MapReduce默认的输出格式为TextOutputFormat,它的父类是FileOutputFormat,即按行来写,且内容写到一个文本文件中去,但是并不能满足我们实际开发中的所有需求,所以就需要我们自定义OutPutFormat。

自定义OutPutFormat

输出数据到MySQL、HBase或者Elasticsearch等存储框架中。

步骤

继承FileOutputFormat

继承RecordWriter类

重写write方法

案例

需求

过滤log日志,将包含sxau的网站输出到sxau.log,其他则输出到other.log

输入数据

www.baidu.com
www.google.com
www.sxau.com
www.jd.com
www.bing.com
www.sina.com
www.csdn.com
www.github.com

LogMapper类

public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //直接写出
        context.write(value,NullWritable.get());
    }
}

LogReducer类

public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //防止相同数据丢失
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}

MyOutputFormat类

返回我们自定义的RecordWriter类

public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        //将job对象传进去,才能方便对数据流进行操作
        MyRecordWriter recordWriter = new MyRecordWriter(job);
        return recordWriter;
    }
}

MyRecordWriter类

核心是重写write方法

public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream sxauOut;
    private FSDataOutputStream otherOut;
    public MyRecordWriter(TaskAttemptContext job) {
        //创建两个流
        try {
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //输出路径
            sxauOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\sxau.log"));
            otherOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();
        System.out.println(log);
        //具体的写出
        if (log.contains("sxau")){
            sxauOut.writeBytes(log+"\n");
        }else {
            otherOut.writeBytes(log+"\n");
        }
    }
    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //关闭数据流
        IOUtils.closeStream(sxauOut);
        IOUtils.closeStream(otherOut);
    }
}

LogRunner类

设置输出格式

public class LogRunner extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new LogRunner(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "my outputFormat");
        //2.配置jar包路径
        job.setJarByClass(LogRunner.class);
        //3.关联mapper和reducer
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //设置自定义输出格式
        job.setOutputFormatClass(MyOutputFormat.class);
        //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\input"));
        //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\output1"));
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

运行结果


相关文章
|
20天前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
32 0
|
7月前
|
分布式计算 数据库
35 MAPREDUCE自定义outputFormat
35 MAPREDUCE自定义outputFormat
18 0
|
10月前
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
20天前
|
分布式计算
MapReduce【自定义InputFormat】
MapReduce【自定义InputFormat】
|
20天前
|
分布式计算
MapReduce【自定义分区Partitioner】
MapReduce【自定义分区Partitioner】
|
7月前
|
分布式计算
36 MAPREDUCE自定义GroupingComparator
36 MAPREDUCE自定义GroupingComparator
22 0
|
7月前
|
数据采集 分布式计算
34 MAPREDUCE自定义inputFormat
34 MAPREDUCE自定义inputFormat
19 0
|
10月前
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
分布式计算
MapReduce自定义分区
MapReduce自定义分区
|
20天前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
43 1