Hadoop TDG 2 – Development Environment

简介:

GenericOptionsParser, Tool, and ToolRunner

Hadoop comes with a few helper classes for making it easier to run jobs from the command line
GenericOptionsParser is a class that interprets common Hadoop command-line options and sets them on a Configuration object for your application to use as desired. You don’t usually use GenericOptionsParser directly, as it’s more convenient to implement the Tool interface and run your application with the ToolRunner, which uses GenericOptionsParser internally.

 

Table 5-1. GenericOptionsParser and ToolRunner options 
Option                      Description 
-D property=value     Sets the given Hadoop configuration property to the given value.  
                                Overrides any default or site properties in the configuration, and any properties set via the -conf option. 
-conf filename ...       Adds the given files to the list of resources in the configuration. This is a convenient way to set site properties or to set a number of properties at once. 
-fs uri                       Sets the default filesystem to the given URI. Shortcut for -D fs.default.name=uri 
-jt host:port              Sets the jobtracker to the given host and port. Shortcut for –D mapred.job.tracker=host:port 
-files file1,file2,...      Copies the specified files from the local filesystem (or any filesystem if a scheme is specified) to the shared filesystem used by the jobtracker (usually HDFS) and makes 
                                them available to MapReduce programs in the task’s working directory. (See “Distributed Cache” on page 288 for more on the distributed cache mechanism for copying files to 
                                tasktracker machines.) 
-archives 
archive1,archive2,...
  Copies the specified archives from the local filesystem (or any filesystem if a scheme is specified) to the shared filesystem used by the jobtracker (usually HDFS), unarchives 
                                them, and makes them available to MapReduce programs in the task’s working directory. 
-libjars jar1,jar2,...    Copy the specified JAR files from the local filesystem (or any filesystem if a scheme is specified) to the shared filesystem used by the jobtracker (usually HDFS), and adds 
                                them to the MapReduce task’s classpath. This option is a useful way of shipping JAR files that a job is dependent on.

 

Writing a Unit Test

The map and reduce functions in MapReduce are easy to test in isolation, which is a consequence of their functional style. For known inputs, they produce known outputs. 
However, since outputs are written to a Context (or an OutputCollector in the old API), rather than simply being returned from the method call, the Context needs to be replaced with a mock so that its outputs can be verified. There are several Java mock object frameworks that can help build mocks; here we use Mockito, which is noted for its clean syntax, although any mock framework should work just as well. 
常用的UT包, MRUnit project (http://incubator.apache.org/mrunit/), which aims to make unit testing MapReduce programs easier.

Mapper

The test for the mapper is shown in Example 5-4. 
Example 5-4. Unit test for MaxTemperatureMapper

import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.junit.*;
public class MaxTemperatureMapperTest {
    @Test
    public void processesValidRecord() throws IOException, InterruptedException {
        MaxTemperatureMapper mapper = new MaxTemperatureMapper();
        Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                              // Year ^^^^
                              "99999V0203201N00261220001CN9999999N9-00111+99999999999");
                              // Temperature ^^^^^
        MaxTemperatureMapper.Context context =
        mock(MaxTemperatureMapper.Context.class);
        mapper.map(null, value, context);

        verify(context).write(new Text("1950"), new IntWritable(-11));
    }
}
看例子关键就是mock了Context, 然后可以直接verify这样的context

 

Running Locally on Test Data

Now that we’ve got the mapper and reducer working on controlled inputs, the next step is to write a job driver and run it on some test data on a development machine.

Equivalently, we could use the -fs and -jt options provided by GenericOptionsParser:

% hadoop v2.MaxTemperatureDriver -fs file:/// -jt local input/ncdc/micro output

This command executes MaxTemperatureDriver using input from the local input/ncdc/micro directory, producing output in the local output directory. 
Note that although we’ve set -fs so we use the local filesystem (file:///), the local job runner will actually work fine against any filesystem, including HDFS (and it can be handy to do this if you 
have a few files that are on HDFS).

 

Running on a Cluster

Now that we are happy with the program running on a small test dataset, we are ready to try it on the full dataset on a Hadoop cluster.

 

Packaging 
We don’t need to make any modifications to the program to run on a cluster rather than on a single machine, but we do need to package the program as a JAR file to send to the cluster.

 

Launching a Job 
To launch the job, we need to run the driver, specifying the cluster that we want to run the job on with the -conf option (we could equally have used the -fs and -jt options):

% hadoop jar hadoop-examples.jar v3.MaxTemperatureDriver -conf conf/hadoop-cluster.xml  input/ncdc/all max-temp

Job, Task, and Task Attempt IDs 
The format of a job ID is composed of the time that the jobtracker (not the job) started and an incrementing countermaintained by the jobtracker to uniquely identify the job to that instance of the jobtracker. So the job with this ID:

job_200904110811_0002 
is the second (0002, job IDs are 1-based) job run by the jobtracker which started at 08:11 on April 11, 2009.

Tasks belong to a job, and their IDs are formed by replacing the job prefix of a job ID with a task prefix, and adding a suffix to identify the task within the job. For example:

task_200904110811_0002_m_000003 
is the fourth (000003, task IDs are 0-based) map (m) task of the job with ID job_200904110811_0002.

 

Tasks may be executed more than once, due to failure (see “Task Failure” on page 200) or speculative execution (see “Speculative Execution” on page 213), so to identify different instances of a task execution, task attempts are given unique IDs on the jobtracker. For example:

attempt_200904110811_0002_m_000003_0 
is the first (0, attempt IDs are 0-based) attempt at running task task_200904110811_0002_m_000003.

 

Tuning a Job

After a job is working, the question many developers ask is, “Can I make it run faster?” There are a few Hadoop-specific “usual suspects” that are worth checking to see if they are responsible for a performance problem. You should run through the checklist in Table 5-3 before you start trying to profile or optimize at the task level.

 

Number of mappers 
How long are your mappers running for? If they are only running for a few seconds on average, then you should see if there’s a way to have fewer mappers and make them all run longer, a minute or so, as a rule of thumb. The extent to which this is possible depends on the input format you are using.  Refer “Small files and CombineFileInputFormat” on page 237

Number of reducers 
For maximum performance, the number of reducers should be slightly less than the number of reduce slots in the cluster. This allows the reducers to finish in one wave and fully utilizes the cluster during the reduce phase. Refer to “Choosing the Number of Reducers” on page 229


Combiners 
Can your job take advantage of a combiner to reduce the amount of data in passing through the shuffle? Refer to “Combiner Functions” on page 34


Intermediate compression 
Job execution time can almost always benefit from enabling map output compression. Refer to “Compressing map output” on page 94


Custom serialization 
If you are using your own custom Writable objects, or custom comparators, then make sure you have implemented RawComparator. Refer to “Implementing a RawComparator for speed” on page 108


Shuffle tweaks 
The MapReduce shuffle exposes around a dozen tuning parameters for memory management, which may help you eke out the last bit of performance. Refer to “Configuration Tuning” on page 209

 

Apache Oozie

If you need to run a complex workflow, or one on a tight production schedule, or you have a large number of connected workflows with data dependencies between them, then a more sophisticated approach is required. Apache Oozie (http://incubator.apache.org/oozie/) fits the bill in any or all of these cases. It has been designed to manage the executions of thousands of dependent workflows, each composed of possibly thousands of consistuent actions at the level of an individual Map-Reduce job.



本文章摘自博客园,原文发布日期:2012-09-08 

目录
相关文章
|
分布式计算 Hadoop Java
|
JSON 分布式计算 Java
|
存储 分布式计算 Hadoop
|
分布式计算 Hadoop API
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
132 6
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
62 2
|
9天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
44 2
|
9天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
42 1
|
27天前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
49 1
|
1月前
|
存储 分布式计算 资源调度
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
72 5

相关实验场景

更多
下一篇
无影云桌面