MapReduce默认的输出格式为TextOutputFormat,它的父类是FileOutputFormat,即按行来写,且内容写到一个文本文件中去,但是并不能满足我们实际开发中的所有需求,所以就需要我们自定义OutPutFormat。
自定义OutPutFormat
输出数据到MySQL、HBase或者Elasticsearch等存储框架中。
步骤
继承FileOutputFormat
继承RecordWriter类
重写write方法
案例
需求
过滤log日志,将包含sxau的网站输出到sxau.log,其他则输出到other.log
输入数据
www.baidu.com www.google.com www.sxau.com www.jd.com www.bing.com www.sina.com www.csdn.com www.github.com LogMapper类 public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //直接写出 context.write(value,NullWritable.get()); } }
LogReducer类
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //防止相同数据丢失 for (NullWritable value : values) { context.write(key,NullWritable.get()); } } }
MyOutputFormat类
返回我们自定义的RecordWriter类
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { //将job对象传进去,才能方便对数据流进行操作 MyRecordWriter recordWriter = new MyRecordWriter(job); return recordWriter; } }
MyRecordWriter类
核心是重写write方法
public class MyRecordWriter extends RecordWriter<Text, NullWritable> { private FSDataOutputStream sxauOut; private FSDataOutputStream otherOut; public MyRecordWriter(TaskAttemptContext job) { //创建两个流 try { FileSystem fs = FileSystem.get(job.getConfiguration()); //输出路径 sxauOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\sxau.log")); otherOut = fs.create(new Path("D:\\MapReduce_Data_Test\\myoutputformat\\other.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String log = key.toString(); System.out.println(log); //具体的写出 if (log.contains("sxau")){ sxauOut.writeBytes(log+"\n"); }else { otherOut.writeBytes(log+"\n"); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //关闭数据流 IOUtils.closeStream(sxauOut); IOUtils.closeStream(otherOut); } }
LogRunner类
设置输出格式
public class LogRunner extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(),new LogRunner(),args); } @Override public int run(String[] args) throws Exception { //1.获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "my outputFormat"); //2.配置jar包路径 job.setJarByClass(LogRunner.class); //3.关联mapper和reducer job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); //4.设置map、reduce输出的k、v类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //设置自定义输出格式 job.setOutputFormatClass(MyOutputFormat.class); //5.设置统计文件输入的路径,将命令行的第一个参数作为输入文件的路径 FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\input")); //6.设置结果数据存放路径,将命令行的第二个参数作为数据的输出路径 FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\myoutputformat\\output1")); return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息 } }
运行结果