参考文献:http://www.hadooper.cn/dct/page/65778
1.概述
RandomWriter(随机写)例子利用 Map/Reduce把 数据随机的写到dfs中。每个map输入单个文件名,然后随机写BytesWritable的键和值到DFS顺序文件。map没有产生任何输出,所以reduce没有执行。产生的数据是可以配置的。配置变量如下
名字
|
默认值
|
描述
|
test.randomwriter.maps_per_host |
10
|
每个节点运行的map任务数
|
test.randomwrite.bytes_per_map |
1073741824
|
每个map任务产生的数据量
|
test.randomwrite.min_key |
10
|
minimum size of the key in bytes
|
test.randomwrite.max_key |
1000
|
maximum size of the key in bytes
|
test.randomwrite.min_value |
0
|
minimum size of the value
|
test.randomwrite.max_value |
20000
|
maximum size of the value
|
test.randomwriter.maps_per_host表示每个工作节点(datanode)上运行map的次数。默认情况下,只有一个数据节点,那么就有10个map,每个map的数据量为1G,因此要将10G数据写入到hdfs中。我配置的试验环境中只有2个工作节点,不过我希望每个工作节点只有1个map任务。
test.randomwrite.bytes_per_map我原本以为是随机写输出的测试文件的大小,默认为1G=1*1024*1024*1024,但是我将这个数据改成1*1024*1024以后,输出的测试文件还是1G,这让我很不解。(PS:2011-11-2,今天知道这个参数表示没个map任务产生的数据量,如果将其改为1*1024*1024,那么就表示没个map任务产生的数据量为1MB。)(PS:2011-11-3,修改参数test.randomwrite.bytes_per_map并不能更改每个map任务产生的数据量,还是1G,不管我将这个参数设定为什么值。不过修改参数:test.randomwriter.maps_per_host是有效的。测试发现将该参数设为1和2都测试通过。问题:在哪里修改test.randomwrite.bytes_per_map才能真正修改map任务产生的数据量。!)
2.代码实例
其中test.randomwrite.bytes_per_map=1*1024*1024,test.randomwriter.maps_per_host=1。
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.examples;
- import java.io.IOException;
- import java.util.Date;
- import java.util.Random;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapred.ClusterStatus;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.InputFormat;
- import org.apache.hadoop.mapred.InputSplit;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.SequenceFileOutputFormat;
- import org.apache.hadoop.mapred.lib.IdentityReducer;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- /**
- * This program uses map/reduce to just run a distributed job where there is
- * no interaction between the tasks and each task write a large unsorted
- * random binary sequence file of BytesWritable.
- * In order for this program to generate data for terasort with 10-byte keys
- * and 90-byte values, have the following config:
- * <xmp>
- * <?xml version="1.0"?>
- * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- * <configuration>
- * <property>
- * <name>test.randomwrite.min_key</name>
- * <value>10</value>
- * </property>
- * <property>
- * <name>test.randomwrite.max_key</name>
- * <value>10</value>
- * </property>
- * <property>
- * <name>test.randomwrite.min_value</name>
- * <value>90</value>
- * </property>
- * <property>
- * <name>test.randomwrite.max_value</name>
- * <value>90</value>
- * </property>
- * <property>
- * <name>test.randomwrite.total_bytes</name>
- * <value>1099511627776</value>
- * </property>
- * </configuration></xmp>
- *
- * Equivalently, {@link RandomWriter} also supports all the above options
- * and ones supported by {@link GenericOptionsParser} via the command-line.
- */
- public class RandomWriter extends Configured implements Tool {
- /**
- * User counters
- */
- static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
- /**
- * A custom input format that creates virtual inputs of a single string
- * for each map.
- */
- static class RandomInputFormat implements InputFormat<Text, Text> {
- /**
- * Generate the requested number of file splits, with the filename
- * set to the filename of the output file.
- */
- public InputSplit[] getSplits(JobConf job,
- int numSplits) throws IOException {
- InputSplit[] result = new InputSplit[numSplits];
- Path outDir = FileOutputFormat.getOutputPath(job);
- for(int i=0; i < result.length; ++i) {
- result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
- (String[])null);
- }
- return result;
- }
- /**
- * Return a single record (filename, "") where the filename is taken from
- * the file split.
- */
- static class RandomRecordReader implements RecordReader<Text, Text> {
- Path name;
- public RandomRecordReader(Path p) {
- name = p;
- }
- public boolean next(Text key, Text value) {
- if (name != null) {
- key.set(name.getName());
- name = null;
- return true;
- }
- return false;
- }
- public Text createKey() {
- return new Text();
- }
- public Text createValue() {
- return new Text();
- }
- public long getPos() {
- return 0;
- }
- public void close() {}
- public float getProgress() {
- return 0.0f;
- }
- }
- public RecordReader<Text, Text> getRecordReader(InputSplit split,
- JobConf job,
- Reporter reporter) throws IOException {
- return new RandomRecordReader(((FileSplit) split).getPath());
- }
- }
- static class Map extends MapReduceBase
- implements Mapper<WritableComparable, Writable,
- BytesWritable, BytesWritable> {
- private long numBytesToWrite;
- private int minKeySize;
- private int keySizeRange;
- private int minValueSize;
- private int valueSizeRange;
- private Random random = new Random();
- private BytesWritable randomKey = new BytesWritable();
- private BytesWritable randomValue = new BytesWritable();
- private void randomizeBytes(byte[] data, int offset, int length) {
- for(int i=offset + length - 1; i >= offset; --i) {
- data[i] = (byte) random.nextInt(256);
- }
- }
- /**
- * Given an output filename, write a bunch of random records to it.
- */
- public void map(WritableComparable key,
- Writable value,
- OutputCollector<BytesWritable, BytesWritable> output,
- Reporter reporter) throws IOException {
- int itemCount = 0;
- while (numBytesToWrite > 0) {
- int keyLength = minKeySize +
- (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
- randomKey.setSize(keyLength);
- randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
- int valueLength = minValueSize +
- (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
- randomValue.setSize(valueLength);
- randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
- output.collect(randomKey, randomValue);
- numBytesToWrite -= keyLength + valueLength;
- reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
- reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
- if (++itemCount % 200 == 0) {
- reporter.setStatus("wrote record " + itemCount + ". " +
- numBytesToWrite + " bytes left.");
- }
- }
- reporter.setStatus("done with " + itemCount + " records.");
- }
- /**
- * Save the values out of the configuaration that we need to write
- * the data.
- */
- @Override
- public void configure(JobConf job) {
- numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
- 1*1024*1024);
- minKeySize = job.getInt("test.randomwrite.min_key", 10);
- keySizeRange =
- job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
- minValueSize = job.getInt("test.randomwrite.min_value", 0);
- valueSizeRange =
- job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
- }
- }
- /**
- * This is the main routine for launching a distributed random write job.
- * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
- * The reduce doesn't do anything.
- *
- * @throws IOException
- */
- public int run(String[] args) throws Exception {
- if (args.length == 0) {
- System.out.println("Usage: writer <out-dir>");
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- }
- Path outDir = new Path(args[0]);
- JobConf job = new JobConf(getConf());
- job.setJarByClass(RandomWriter.class);
- job.setJobName("random-writer");
- FileOutputFormat.setOutputPath(job, outDir);
- job.setOutputKeyClass(BytesWritable.class);
- job.setOutputValueClass(BytesWritable.class);
- job.setInputFormat(RandomInputFormat.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(IdentityReducer.class);
- job.setOutputFormat(SequenceFileOutputFormat.class);
- JobClient client = new JobClient(job);
- ClusterStatus cluster = client.getClusterStatus();
- int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 1);
- long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
- 1*1024*1024);
- if (numBytesToWritePerMap == 0) {
- System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
- return -2;
- }
- long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",
- numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
- int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
- if (numMaps == 0 && totalBytesToWrite > 0) {
- numMaps = 1;
- job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
- }
- job.setNumMapTasks(numMaps);
- System.out.println("Running " + numMaps + " maps.");
- // reducer NONE
- job.setNumReduceTasks(0);
- Date startTime = new Date();
- System.out.println("Job started: " + startTime);
- JobClient.runJob(job);
- Date endTime = new Date();
- System.out.println("Job ended: " + endTime);
- System.out.println("The job took " +
- (endTime.getTime() - startTime.getTime()) /1000 +
- " seconds.");
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
- System.exit(res);
- }
- }
- 11/10/17 13:27:46 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
- Running 2 maps.
- Job started: Mon Oct 17 13:27:47 CST 2011
- 11/10/17 13:27:47 INFO mapred.JobClient: Running job: job_201110171322_0001
- 11/10/17 13:27:48 INFO mapred.JobClient: map 0% reduce 0%
- 11/10/17 13:29:58 INFO mapred.JobClient: map 50% reduce 0%
- 11/10/17 13:30:05 INFO mapred.JobClient: map 100% reduce 0%
- 11/10/17 13:30:07 INFO mapred.JobClient: Job complete: job_201110171322_0001
- 11/10/17 13:30:07 INFO mapred.JobClient: Counters: 8
- 11/10/17 13:30:07 INFO mapred.JobClient: Job Counters
- 11/10/17 13:30:07 INFO mapred.JobClient: Launched map tasks=3
- 11/10/17 13:30:07 INFO mapred.JobClient: org.apache.hadoop.examples.RandomWriter$Counters
- 11/10/17 13:30:07 INFO mapred.JobClient: BYTES_WRITTEN=2147504078
- 11/10/17 13:30:07 INFO mapred.JobClient: RECORDS_WRITTEN=204528
- 11/10/17 13:30:07 INFO mapred.JobClient: FileSystemCounters
- 11/10/17 13:30:07 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=2154580318
- 11/10/17 13:30:07 INFO mapred.JobClient: Map-Reduce Framework
- 11/10/17 13:30:07 INFO mapred.JobClient: Map input records=2
- 11/10/17 13:30:07 INFO mapred.JobClient: Spilled Records=0
- 11/10/17 13:30:07 INFO mapred.JobClient: Map input bytes=0
- 11/10/17 13:30:07 INFO mapred.JobClient: Map output records=204528
- Job ended: Mon Oct 17 13:30:07 CST 2011
- The job took 140 seconds.
本文转自xwdreamer博客园博客,原文链接:http://www.cnblogs.com/xwdreamer/archive/2011/10/17/2296957.html,如需转载请自行联系原作者