[Hadoop系列]Hadoop的MapReduce中多文件输出

简介:

  inkfish原创,请勿商业性质转载,转载请注明来源(http://blog.csdn.net/inkfish )。

  Hadoop默认的输出是TextOutputFormat,输出文件名不可定制。hadoop 0.19.X中有一个org.apache.hadoop.mapred.lib.MultipleOutputFormat,可以输出多份文件且可以自定义文件名,但是从hadoop 0.20.x中MultipleOutputFormat所在包的所有类被标记为“已过时”,当前如果再使用MultipleOutputFormat,在将来版本的hadoop中可能无法使用。本篇文章中,我们自己实现一个简单的MultipleOutputFormat,并修改hadoop自带的WordCount示例程序来测试结果。(来源:http://blog.csdn.net/inkfish)

 

环境:

  Ubuntu 8.0.4 Server 32bit
  Hadoop 0.20.1
  JDK 1.6.0_16-b01
  Eclipse 3.5(来源:http://blog.csdn.net/inkfish)

 

 

所有代码分为3个类: (来源:http://blog.csdn.net/inkfish)

1.LineRecordWriter:

  RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。(来源:http://blog.csdn.net/inkfish)

package inkfish.hadoop.study; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /**摘自{@link TextOutputFormat}中的LineRecordWriter。 */ public class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "/n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "/t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }

 

2.MultipleOutputFormat:

  抽象类,主要参考org.apache.hadoop.mapred.lib.MultipleOutputFormat。子类唯一需要实现的方法是:String generateFileNameForKeyValue(K key, V value, Configuration conf),即通过key和value及conf配置信息决定文件名(含扩展名)。(来源:http://blog.csdn.net/inkfish)

package inkfish.hadoop.study; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } /**通过key, value, conf来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter<K, V> { /**RecordWriter的缓存*/ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /**输出目录*/ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }

 

3.WordCount:

  基本上维持hadoop示例中的WordCount原样,主要增加一个静态内部类AlphabetOutputFormat,这个类实现了MultipleOutputFormat,文件命名规则是:以英文字母开头的单词以“首字母.txt”为文件名保存,其他以“other.txt”保存。(来源:http://blog.csdn.net/inkfish)

package inkfish.hadoop.study; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> { @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "other.txt"; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(AlphabetOutputFormat.class);//设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

 

在我测试环境中运行结果:

10/01/08 20:35:34 INFO mapred.JobClient: Job complete: job_201001052238_0013 10/01/08 20:35:34 INFO mapred.JobClient: Counters: 15 10/01/08 20:35:34 INFO mapred.JobClient: Job Counters 10/01/08 20:35:34 INFO mapred.JobClient: Launched reduce tasks=1 10/01/08 20:35:34 INFO mapred.JobClient: Rack-local map tasks=38 10/01/08 20:35:34 INFO mapred.JobClient: Launched map tasks=38 10/01/08 20:35:34 INFO mapred.JobClient: FileSystemCounters 10/01/08 20:35:34 INFO mapred.JobClient: FILE_BYTES_READ=1473227 10/01/08 20:35:34 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1370636 10/01/08 20:35:34 INFO mapred.JobClient: Map-Reduce Framework 10/01/08 20:35:34 INFO mapred.JobClient: Reduce input groups=0 10/01/08 20:35:34 INFO mapred.JobClient: Combine output records=29045 10/01/08 20:35:34 INFO mapred.JobClient: Map input records=19313 10/01/08 20:35:34 INFO mapred.JobClient: Reduce shuffle bytes=517685 10/01/08 20:35:34 INFO mapred.JobClient: Reduce output records=0 10/01/08 20:35:34 INFO mapred.JobClient: Spilled Records=58090 10/01/08 20:35:34 INFO mapred.JobClient: Map output bytes=1393868 10/01/08 20:35:34 INFO mapred.JobClient: Combine input records=119552 10/01/08 20:35:34 INFO mapred.JobClient: Map output records=119552 10/01/08 20:35:34 INFO mapred.JobClient: Reduce input records=29045 user@cloud-2:~/software/test$ ls out/ a.txt c.txt e.txt g.txt i.txt k.txt l.txt n.txt o.txt q.txt s.txt u.txt w.txt y.txt b.txt d.txt f.txt h.txt j.txt _logs m.txt other.txt p.txt r.txt t.txt v.txt x.txt z.txt user@cloud-2:~/software/test$

目录
相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
71 2
|
7天前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
100 3
|
2月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
46 1
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
52 1
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
96 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
43 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
56 0
|
4月前
|
缓存 分布式计算 算法
优化Hadoop MapReduce性能的最佳实践
【8月更文第28天】Hadoop MapReduce是一个用于处理大规模数据集的软件框架,适用于分布式计算环境。虽然MapReduce框架本身具有很好的可扩展性和容错性,但在某些情况下,任务执行可能会因为各种原因导致性能瓶颈。本文将探讨如何通过调整配置参数和优化算法逻辑来提高MapReduce任务的效率。
588 0
|
6月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
65 1