hadoop实例 RandomWriter

简介:

参考文献:http://www.hadooper.cn/dct/page/65778

1.概述

RandomWriter(随机写)例子利用 Map/Reduce把 数据随机的写到dfs中。每个map输入单个文件名,然后随机写BytesWritable的键和值到DFS顺序文件。map没有产生任何输出,所以reduce没有执行。产生的数据是可以配置的。配置变量如下

名字
默认值
描述

test.randomwriter.maps_per_host

10
每个节点运行的map任务数

test.randomwrite.bytes_per_map

1073741824
每个map任务产生的数据量

test.randomwrite.min_key

10
minimum size of the key in bytes

test.randomwrite.max_key

1000
maximum size of the key in bytes

test.randomwrite.min_value

0
minimum size of the value

test.randomwrite.max_value

20000
maximum size of the value

test.randomwriter.maps_per_host表示每个工作节点(datanode)上运行map的次数。默认情况下,只有一个数据节点,那么就有10个map,每个map的数据量为1G,因此要将10G数据写入到hdfs中。我配置的试验环境中只有2个工作节点,不过我希望每个工作节点只有1个map任务。

test.randomwrite.bytes_per_map我原本以为是随机写输出的测试文件的大小,默认为1G=1*1024*1024*1024,但是我将这个数据改成1*1024*1024以后,输出的测试文件还是1G,这让我很不解。(PS:2011-11-2,今天知道这个参数表示没个map任务产生的数据量,如果将其改为1*1024*1024,那么就表示没个map任务产生的数据量为1MB。)(PS:2011-11-3,修改参数test.randomwrite.bytes_per_map并不能更改每个map任务产生的数据量,还是1G,不管我将这个参数设定为什么值。不过修改参数:test.randomwriter.maps_per_host是有效的。测试发现将该参数设为1和2都测试通过。问题:在哪里修改test.randomwrite.bytes_per_map才能真正修改map任务产生的数据量。!

2.代码实例

其中test.randomwrite.bytes_per_map=1*1024*1024,test.randomwriter.maps_per_host=1。

  1. /** 
  2.  * Licensed to the Apache Software Foundation (ASF) under one 
  3.  * or more contributor license agreements.  See the NOTICE file 
  4.  * distributed with this work for additional information 
  5.  * regarding copyright ownership.  The ASF licenses this file 
  6.  * to you under the Apache License, Version 2.0 (the 
  7.  * "License"); you may not use this file except in compliance 
  8.  * with the License.  You may obtain a copy of the License at 
  9.  * 
  10.  *     http://www.apache.org/licenses/LICENSE-2.0 
  11.  * 
  12.  * Unless required by applicable law or agreed to in writing, software 
  13.  * distributed under the License is distributed on an "AS IS" BASIS, 
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  15.  * See the License for the specific language governing permissions and 
  16.  * limitations under the License. 
  17.  */  
  18.   
  19. package org.apache.hadoop.examples;  
  20.   
  21. import java.io.IOException;  
  22. import java.util.Date;  
  23. import java.util.Random;  
  24.   
  25. import org.apache.hadoop.conf.Configuration;  
  26. import org.apache.hadoop.conf.Configured;  
  27. import org.apache.hadoop.fs.Path;  
  28. import org.apache.hadoop.io.BytesWritable;  
  29. import org.apache.hadoop.io.Text;  
  30. import org.apache.hadoop.io.Writable;  
  31. import org.apache.hadoop.io.WritableComparable;  
  32. import org.apache.hadoop.mapred.ClusterStatus;  
  33. import org.apache.hadoop.mapred.FileOutputFormat;  
  34. import org.apache.hadoop.mapred.FileSplit;  
  35. import org.apache.hadoop.mapred.InputFormat;  
  36. import org.apache.hadoop.mapred.InputSplit;  
  37. import org.apache.hadoop.mapred.JobClient;  
  38. import org.apache.hadoop.mapred.JobConf;  
  39. import org.apache.hadoop.mapred.MapReduceBase;  
  40. import org.apache.hadoop.mapred.Mapper;  
  41. import org.apache.hadoop.mapred.OutputCollector;  
  42. import org.apache.hadoop.mapred.RecordReader;  
  43. import org.apache.hadoop.mapred.Reporter;  
  44. import org.apache.hadoop.mapred.SequenceFileOutputFormat;  
  45. import org.apache.hadoop.mapred.lib.IdentityReducer;  
  46. import org.apache.hadoop.util.GenericOptionsParser;  
  47. import org.apache.hadoop.util.Tool;  
  48. import org.apache.hadoop.util.ToolRunner;  
  49.   
  50. /** 
  51.  * This program uses map/reduce to just run a distributed job where there is 
  52.  * no interaction between the tasks and each task write a large unsorted 
  53.  * random binary sequence file of BytesWritable. 
  54.  * In order for this program to generate data for terasort with 10-byte keys 
  55.  * and 90-byte values, have the following config: 
  56.  * <xmp> 
  57.  * <?xml version="1.0"?> 
  58.  * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 
  59.  * <configuration> 
  60.  *   <property> 
  61.  *     <name>test.randomwrite.min_key</name> 
  62.  *     <value>10</value> 
  63.  *   </property> 
  64.  *   <property> 
  65.  *     <name>test.randomwrite.max_key</name> 
  66.  *     <value>10</value> 
  67.  *   </property> 
  68.  *   <property> 
  69.  *     <name>test.randomwrite.min_value</name> 
  70.  *     <value>90</value> 
  71.  *   </property> 
  72.  *   <property> 
  73.  *     <name>test.randomwrite.max_value</name> 
  74.  *     <value>90</value> 
  75.  *   </property> 
  76.  *   <property> 
  77.  *     <name>test.randomwrite.total_bytes</name> 
  78.  *     <value>1099511627776</value> 
  79.  *   </property> 
  80.  * </configuration></xmp> 
  81.  *  
  82.  * Equivalently, {@link RandomWriter} also supports all the above options 
  83.  * and ones supported by {@link GenericOptionsParser} via the command-line. 
  84.  */  
  85. public class RandomWriter extends Configured implements Tool {  
  86.     
  87.   /** 
  88.    * User counters 
  89.    */  
  90.   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }  
  91.     
  92.   /** 
  93.    * A custom input format that creates virtual inputs of a single string 
  94.    * for each map. 
  95.    */  
  96.   static class RandomInputFormat implements InputFormat<Text, Text> {  
  97.   
  98.     /**  
  99.      * Generate the requested number of file splits, with the filename 
  100.      * set to the filename of the output file. 
  101.      */  
  102.     public InputSplit[] getSplits(JobConf job,   
  103.                                   int numSplits) throws IOException {  
  104.       InputSplit[] result = new InputSplit[numSplits];  
  105.       Path outDir = FileOutputFormat.getOutputPath(job);  
  106.       for(int i=0; i < result.length; ++i) {  
  107.         result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,   
  108.                                   (String[])null);  
  109.       }  
  110.       return result;  
  111.     }  
  112.   
  113.     /** 
  114.      * Return a single record (filename, "") where the filename is taken from 
  115.      * the file split. 
  116.      */  
  117.     static class RandomRecordReader implements RecordReader<Text, Text> {  
  118.       Path name;  
  119.       public RandomRecordReader(Path p) {  
  120.         name = p;  
  121.       }  
  122.       public boolean next(Text key, Text value) {  
  123.         if (name != null) {  
  124.           key.set(name.getName());  
  125.           name = null;  
  126.           return true;  
  127.         }  
  128.         return false;  
  129.       }  
  130.       public Text createKey() {  
  131.         return new Text();  
  132.       }  
  133.       public Text createValue() {  
  134.         return new Text();  
  135.       }  
  136.       public long getPos() {  
  137.         return 0;  
  138.       }  
  139.       public void close() {}  
  140.       public float getProgress() {  
  141.         return 0.0f;  
  142.       }  
  143.     }  
  144.   
  145.     public RecordReader<Text, Text> getRecordReader(InputSplit split,  
  146.                                         JobConf job,   
  147.                                         Reporter reporter) throws IOException {  
  148.       return new RandomRecordReader(((FileSplit) split).getPath());  
  149.     }  
  150.   }  
  151.   
  152.   static class Map extends MapReduceBase  
  153.     implements Mapper<WritableComparable, Writable,  
  154.                       BytesWritable, BytesWritable> {  
  155.       
  156.     private long numBytesToWrite;  
  157.     private int minKeySize;  
  158.     private int keySizeRange;  
  159.     private int minValueSize;  
  160.     private int valueSizeRange;  
  161.     private Random random = new Random();  
  162.     private BytesWritable randomKey = new BytesWritable();  
  163.     private BytesWritable randomValue = new BytesWritable();  
  164.       
  165.     private void randomizeBytes(byte[] data, int offset, int length) {  
  166.       for(int i=offset + length - 1; i >= offset; --i) {  
  167.         data[i] = (byte) random.nextInt(256);  
  168.       }  
  169.     }  
  170.       
  171.     /** 
  172.      * Given an output filename, write a bunch of random records to it. 
  173.      */  
  174.     public void map(WritableComparable key,   
  175.                     Writable value,  
  176.                     OutputCollector<BytesWritable, BytesWritable> output,   
  177.                     Reporter reporter) throws IOException {  
  178.       int itemCount = 0;  
  179.       while (numBytesToWrite > 0) {  
  180.         int keyLength = minKeySize +   
  181.           (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);  
  182.         randomKey.setSize(keyLength);  
  183.         randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());  
  184.         int valueLength = minValueSize +  
  185.           (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);  
  186.         randomValue.setSize(valueLength);  
  187.         randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());  
  188.         output.collect(randomKey, randomValue);  
  189.         numBytesToWrite -= keyLength + valueLength;  
  190.         reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);  
  191.         reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);  
  192.         if (++itemCount % 200 == 0) {  
  193.           reporter.setStatus("wrote record " + itemCount + ". " +   
  194.                              numBytesToWrite + " bytes left.");  
  195.         }  
  196.       }  
  197.       reporter.setStatus("done with " + itemCount + " records.");  
  198.     }  
  199.       
  200.     /** 
  201.      * Save the values out of the configuaration that we need to write 
  202.      * the data. 
  203.      */  
  204.     @Override  
  205.     public void configure(JobConf job) {  
  206.       numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",  
  207.                                     1*1024*1024);  
  208.       minKeySize = job.getInt("test.randomwrite.min_key", 10);  
  209.       keySizeRange =   
  210.         job.getInt("test.randomwrite.max_key", 1000) - minKeySize;  
  211.       minValueSize = job.getInt("test.randomwrite.min_value", 0);  
  212.       valueSizeRange =   
  213.         job.getInt("test.randomwrite.max_value", 20000) - minValueSize;  
  214.     }  
  215.       
  216.   }  
  217.     
  218.   /** 
  219.    * This is the main routine for launching a distributed random write job. 
  220.    * It runs 10 maps/node and each node writes 1 gig of data to a DFS file. 
  221.    * The reduce doesn't do anything. 
  222.    *  
  223.    * @throws IOException  
  224.    */  
  225.   public int run(String[] args) throws Exception {      
  226.     if (args.length == 0) {  
  227.       System.out.println("Usage: writer <out-dir>");  
  228.       ToolRunner.printGenericCommandUsage(System.out);  
  229.       return -1;  
  230.     }  
  231.       
  232.     Path outDir = new Path(args[0]);  
  233.     JobConf job = new JobConf(getConf());  
  234.       
  235.     job.setJarByClass(RandomWriter.class);  
  236.     job.setJobName("random-writer");  
  237.     FileOutputFormat.setOutputPath(job, outDir);  
  238.       
  239.     job.setOutputKeyClass(BytesWritable.class);  
  240.     job.setOutputValueClass(BytesWritable.class);  
  241.       
  242.     job.setInputFormat(RandomInputFormat.class);  
  243.     job.setMapperClass(Map.class);          
  244.     job.setReducerClass(IdentityReducer.class);  
  245.     job.setOutputFormat(SequenceFileOutputFormat.class);  
  246.       
  247.     JobClient client = new JobClient(job);  
  248.     ClusterStatus cluster = client.getClusterStatus();  
  249.     int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 1);  
  250.     long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",  
  251.                                              1*1024*1024);  
  252.     if (numBytesToWritePerMap == 0) {  
  253.       System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");  
  254.       return -2;  
  255.     }  
  256.     long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",   
  257.          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());  
  258.     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);  
  259.     if (numMaps == 0 && totalBytesToWrite > 0) {  
  260.       numMaps = 1;  
  261.       job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);  
  262.     }  
  263.       
  264.     job.setNumMapTasks(numMaps);  
  265.     System.out.println("Running " + numMaps + " maps.");  
  266.       
  267.     // reducer NONE  
  268.     job.setNumReduceTasks(0);  
  269.       
  270.     Date startTime = new Date();  
  271.     System.out.println("Job started: " + startTime);  
  272.     JobClient.runJob(job);  
  273.     Date endTime = new Date();  
  274.     System.out.println("Job ended: " + endTime);  
  275.     System.out.println("The job took " +   
  276.                        (endTime.getTime() - startTime.getTime()) /1000 +   
  277.                        " seconds.");  
  278.       
  279.     return 0;  
  280.   }  
  281.     
  282.   public static void main(String[] args) throws Exception {  
  283.     int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);  
  284.     System.exit(res);  
  285.   }  
  286.   
  287. }  
输出信息:

  1. 11/10/17 13:27:46 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively  
  2. Running 2 maps.  
  3. Job started: Mon Oct 17 13:27:47 CST 2011  
  4. 11/10/17 13:27:47 INFO mapred.JobClient: Running job: job_201110171322_0001  
  5. 11/10/17 13:27:48 INFO mapred.JobClient:  map 0% reduce 0%  
  6. 11/10/17 13:29:58 INFO mapred.JobClient:  map 50% reduce 0%  
  7. 11/10/17 13:30:05 INFO mapred.JobClient:  map 100% reduce 0%  
  8. 11/10/17 13:30:07 INFO mapred.JobClient: Job complete: job_201110171322_0001  
  9. 11/10/17 13:30:07 INFO mapred.JobClient: Counters: 8  
  10. 11/10/17 13:30:07 INFO mapred.JobClient:   Job Counters   
  11. 11/10/17 13:30:07 INFO mapred.JobClient:     Launched map tasks=3  
  12. 11/10/17 13:30:07 INFO mapred.JobClient:   org.apache.hadoop.examples.RandomWriter$Counters  
  13. 11/10/17 13:30:07 INFO mapred.JobClient:     BYTES_WRITTEN=2147504078  
  14. 11/10/17 13:30:07 INFO mapred.JobClient:     RECORDS_WRITTEN=204528  
  15. 11/10/17 13:30:07 INFO mapred.JobClient:   FileSystemCounters  
  16. 11/10/17 13:30:07 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=2154580318  
  17. 11/10/17 13:30:07 INFO mapred.JobClient:   Map-Reduce Framework  
  18. 11/10/17 13:30:07 INFO mapred.JobClient:     Map input records=2  
  19. 11/10/17 13:30:07 INFO mapred.JobClient:     Spilled Records=0  
  20. 11/10/17 13:30:07 INFO mapred.JobClient:     Map input bytes=0  
  21. 11/10/17 13:30:07 INFO mapred.JobClient:     Map output records=204528  
  22. Job ended: Mon Oct 17 13:30:07 CST 2011  
  23. The job took 140 seconds.  
在hdfs上产生了两个文件,在/home/hadoop/rand目录下,分别是part-00000(1Gb,r3)和part-00001(1Gb,r3)





本文转自xwdreamer博客园博客,原文链接:http://www.cnblogs.com/xwdreamer/archive/2011/10/17/2296957.html,如需转载请自行联系原作者

目录
相关文章
|
6月前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
75 0
|
6月前
|
分布式计算 Hadoop Scala
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
60 0
|
分布式计算 Hadoop Java
hadoop第一个运行实例wordcount
hadoop第一个运行实例wordcount
161 0
|
分布式计算 Hadoop Java
|
分布式计算 Hadoop Java
Hadoop获取 FileSystem 实例
Hadoop获取 FileSystem 实例
|
分布式计算 Apache
|
Web App开发 分布式计算 前端开发

相关实验场景

更多