MapReduce【自定义OutputFormat】

简介: MapReduce默认的输出格式为TextOutputFormat,它的父类是FileOutputFormat,即按行来写,且内容写到一个文本文件中去,但是并不能满足我们实际开发中的所有需求,所以就需要我们自定义OutPutFormat。

MapReduce默认的输出格式为TextOutputFormat,它的父类是FileOutputFormat,即按行来写,且内容写到一个文本文件中去,但是并不能满足我们实际开发中的所有需求,所以就需要我们自定义OutPutFormat。



自定义OutPutFormat

输出数据到MySQL、HBase或者Elasticsearch等存储框架中。


步骤

继承FileOutputFormat


继承RecordWriter类


重写write方法


案例

需求

过滤log日志,将包含sxau的网站输出到sxau.log,其他则输出到other.log


输入数据

www.baidu.com
www.google.com
www.sxau.com
www.jd.com
www.bing.com
www.sina.com
www.csdn.com
www.github.com
 LogMapper类
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //直接写出
        context.write(value,NullWritable.get());
    }
}

LogReducer类

public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //防止相同数据丢失
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}

MyOutputFormat类

返回我们自定义的RecordWriter类


public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        //将job对象传进去,才能方便对数据流进行操作
        MyRecordWriter recordWriter = new MyRecordWriter(job);
        return recordWriter;
    }
}

MyRecordWriter类

核心是重写write方法


public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream sxauOut;
    private FSDataOutputStream otherOut;
    public MyRecordWriter(TaskAttemptContext job) {
        //创建两个流
        try {
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //输出路径
            sxauOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\sxau.log"));
            otherOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();
        System.out.println(log);
        //具体的写出
        if (log.contains("sxau")){
            sxauOut.writeBytes(log+"\n");
        }else {
            otherOut.writeBytes(log+"\n");
        }
    }
    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //关闭数据流
        IOUtils.closeStream(sxauOut);
        IOUtils.closeStream(otherOut);
    }
}


LogRunner类

设置输出格式

public class LogRunner extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new LogRunner(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "my outputFormat");
        //2.配置jar包路径
        job.setJarByClass(LogRunner.class);
        //3.关联mapper和reducer
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //设置自定义输出格式
        job.setOutputFormatClass(MyOutputFormat.class);
        //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\input"));
        //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\output1"));
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

运行结果


image.png

image.png

相关文章
|
分布式计算
MapReduce【自定义InputFormat】
MapReduce在处理小文件时效率很低,但面对大量的小文件又不可避免,这个时候就需要相应的解决方案。
|
分布式计算
MapReduce【自定义分区Partitioner】
实际开发中我们可能根据需求需要将MapReduce的运行结果生成多个不同的文件,比如上一个案例【MapReduce计算广州2022年每月最高温度】,我们需要将前半年和后半年的数据分开写到两个文件中。
|
存储 分布式计算 Hadoop
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
|
分布式计算
MapReduce自定义分区
MapReduce自定义分区
|
分布式计算 数据挖掘
五十三、Mapreduce之自定义outputformat案例
五十三、Mapreduce之自定义outputformat案例
五十三、Mapreduce之自定义outputformat案例
|
分布式计算 自然语言处理 Java
MapReduce实现与自定义词典文件基于hanLP的中文分词详解
文本分类任务的第1步,就是对语料进行分词。在单机模式下,可以选择python jieba分词,使用起来较方便。但是如果希望在Hadoop集群上通过mapreduce程序来进行分词,则hanLP更加胜任。
2729 0
|
分布式计算 Java Hadoop
一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现
1:首先搞好实体类对象:   write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 1 package com.
1165 0
|
分布式计算
MapReduce编程实例之自定义分区
任务描述: 一组数据,按照年份的不同将其分别存放在不同的文件里 example Data: 2013 1 2013 5 2014 5 2014 8 2015 9 2015 4 Code: package mrTest; import java.
739 0
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)