inkfish原创,请勿商业性质转载,转载请注明来源(http://blog.csdn.net/inkfish )。
Hadoop默认的输出是TextOutputFormat,输出文件名不可定制。hadoop 0.19.X中有一个org.apache.hadoop.mapred.lib.MultipleOutputFormat,可以输出多份文件且可以自定义文件名,但是从hadoop 0.20.x中MultipleOutputFormat所在包的所有类被标记为“已过时”,当前如果再使用MultipleOutputFormat,在将来版本的hadoop中可能无法使用。本篇文章中,我们自己实现一个简单的MultipleOutputFormat,并修改hadoop自带的WordCount示例程序来测试结果。
环境:
Ubuntu 8.0.4 Server 32bit
Hadoop 0.20.1
JDK 1.6.0_16-b01
Eclipse 3.5
所有代码分为3个类:
1.LineRecordWriter:
RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。
package inkfish.hadoop.study; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /**摘自{@link TextOutputFormat}中的LineRecordWriter。 */ public class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "/n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "/t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
2.MultipleOutputFormat:
抽象类,主要参考org.apache.hadoop.mapred.lib.MultipleOutputFormat。子类唯一需要实现的方法是:String generateFileNameForKeyValue(K key, V value, Configuration conf),即通过key和value及conf配置信息决定文件名(含扩展名)。
package inkfish.hadoop.study; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } /**通过key, value, conf来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter<K, V> { /**RecordWriter的缓存*/ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /**输出目录*/ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }
3.WordCount:
基本上维持hadoop示例中的WordCount原样,主要增加一个静态内部类AlphabetOutputFormat,这个类实现了MultipleOutputFormat,文件命名规则是:以英文字母开头的单词以“首字母.txt”为文件名保存,其他以“other.txt”保存。
package inkfish.hadoop.study; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> { @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "other.txt"; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(AlphabetOutputFormat.class);//设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在我测试环境中运行结果:
10/01/08 20:35:34 INFO mapred.JobClient: Job complete: job_201001052238_0013 10/01/08 20:35:34 INFO mapred.JobClient: Counters: 15 10/01/08 20:35:34 INFO mapred.JobClient: Job Counters 10/01/08 20:35:34 INFO mapred.JobClient: Launched reduce tasks=1 10/01/08 20:35:34 INFO mapred.JobClient: Rack-local map tasks=38 10/01/08 20:35:34 INFO mapred.JobClient: Launched map tasks=38 10/01/08 20:35:34 INFO mapred.JobClient: FileSystemCounters 10/01/08 20:35:34 INFO mapred.JobClient: FILE_BYTES_READ=1473227 10/01/08 20:35:34 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1370636 10/01/08 20:35:34 INFO mapred.JobClient: Map-Reduce Framework 10/01/08 20:35:34 INFO mapred.JobClient: Reduce input groups=0 10/01/08 20:35:34 INFO mapred.JobClient: Combine output records=29045 10/01/08 20:35:34 INFO mapred.JobClient: Map input records=19313 10/01/08 20:35:34 INFO mapred.JobClient: Reduce shuffle bytes=517685 10/01/08 20:35:34 INFO mapred.JobClient: Reduce output records=0 10/01/08 20:35:34 INFO mapred.JobClient: Spilled Records=58090 10/01/08 20:35:34 INFO mapred.JobClient: Map output bytes=1393868 10/01/08 20:35:34 INFO mapred.JobClient: Combine input records=119552 10/01/08 20:35:34 INFO mapred.JobClient: Map output records=119552 10/01/08 20:35:34 INFO mapred.JobClient: Reduce input records=29045 user@cloud-2:~/software/test$ ls out/ a.txt c.txt e.txt g.txt i.txt k.txt l.txt n.txt o.txt q.txt s.txt u.txt w.txt y.txt b.txt d.txt f.txt h.txt j.txt _logs m.txt other.txt p.txt r.txt t.txt v.txt x.txt z.txt user@cloud-2:~/software/test$