Hadoop
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。
官网相关介绍:
What Is Apache Hadoop?
The Apache Hadoop project develops open-source software for reliable, scalable, distributed computing.
The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
The project includes these modules:
Hadoop Common: The common utilities that support the other Hadoop modules.
Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data.
Hadoop YARN: A framework for job scheduling and cluster resource management.
Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.
Other Hadoop-related projects at Apache include:
Ambari: A web-based tool for provisioning, managing, and monitoring Apache Hadoop clusters which includes support for Hadoop HDFS, Hadoop MapReduce, Hive, HCatalog, HBase, ZooKeeper, Oozie, Pig and Sqoop. Ambari also provides a dashboard for viewing cluster health such as heatmaps and ability to view MapReduce, Pig and Hive applications visually alongwith features to diagnose their performance characteristics in a user-friendly manner.
Avro: A data serialization system.
Cassandra: A scalable multi-master database with no single points of failure.
Chukwa: A data collection system for managing large distributed systems.
HBase: A scalable, distributed database that supports structured data storage for large tables.
Hive: A data warehouse infrastructure that provides data summarization and ad hoc querying.
Mahout: A Scalable machine learning and data mining library.
Pig: A high-level data-flow language and execution framework for parallel computation.
Spark: A fast and general compute engine for Hadoop data. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
Tez: A generalized data-flow programming framework, built on Hadoop YARN, which provides a powerful and flexible engine to execute an arbitrary DAG of tasks to process data for both batch and interactive use-cases. Tez is being adopted by Hive, Pig and other frameworks in the Hadoop ecosystem, and also by other commercial software (e.g. ETL tools), to replace Hadoop MapReduce as the underlying execution engine.
ZooKeeper: A high-performance coordination service for distributed applications.
一、MapReduce理论基础
每个MapReduce job都是Hadoop客户端想要执行的一个工作单元,它一般由输入数据、MapReduce程序和配置信息组成,而Hadoop会把每个job分隔成两类任务(task):map任务和reduce任务。在Hadoop集群中有两类节点来执行两类job进程的执行
1.1 大数据处理
任何基础业务包含了收集、分析、监控、过滤、搜索或组织web内容的公司或组织都面临着所谓的“大数据”问题:“web规模”处理即海量数据处理的代名词。社交类网站的兴起也使得这些组织面临着另一个问题:用户行为数据分析,这涉及到通过日志文件记录用户的对web页面浏览、点击、停留时长等,而后对日志文件中的大量数据进行分析以支持进行合理、正确的商业决策。
那么,大数据处理究竟意味着对多大规模的数据进行处理?一个简单的例子:Google在2004年平均每天利用MapReduce处理100GB的数据,到2008年平均每天处理的数据已经达到20PB;2009年,Facebook的数据量达到2.5PB,且以每天15TB的速度在增长。PB级别的数据集正变得越来越常见,大数据时代的到来已然是不争的事实,密集数据处理也正迅速成为现实需求。
大数据问题的处理需要以与传统数据处理方式所不同的方法去实现,这正是MapReduce思想得以大放光彩的核心所在。MapReduce在实现大数据处理上有着多个基础理论思想的支撑,然而这些基础理论甚至实现方法都未必是MapReduce所创,它们只是被MapReduce采用独特的方式加以利用而已。
(1) 向外扩展(Scale out)而非向上扩展(Scale up):大数据的处理更适合采用大量低端商业服务器(scale out)而非少量高端服务器(scale up)。后者正是向上扩展的系统性能提升方式,它通常采用有着SMP架构的主机,然而有着大量的CPU插槽(成百上千个)及大量的共享内存(可以多达数百GB)的高端服务器非常昂贵,但其性能的增长却非线性上升的,因此性价比很一般。而大量的低端商业服务器价格低廉、易于更换和伸缩等特性有效避免了向上扩展的敝端。
(2)假设故障很常见(Assume failures are common):在数据仓库架构级别,故障是不可避免且非常普遍的。假设一款服务器出故障的平均概率为1000天1次,那么10000台这种服务器每天出错的可能性将达到10次。因此,大规模向外扩展的应用场景中,一个设计优良且具有容错能力的服务必须能有效克服非常普遍的硬件故障所带来的问题,即故障不能导致用户应用层面的不一致性或非确定性。MapReduce编程模型能通过一系列机制如任务自动重启等健壮地应付系统或硬件故障。
(3)将处理移向数据(Move processing to the data):传统高性能计算应用中,超级计算机一般有着处理节点(processing node)和存储节点(storage node)两种角色,它们通过高容量的设备完成互联。然而,大多数数据密集型的处理工作并不需要多么强大的处理能力,于是把计算与存储互相分开将使得网络成为系统性能瓶颈。为了克服计算如此类的问题,MapReduce在其架构中将计算和存储合并在了一起,并将数据处理工作直接放在数据存储的位置完成,只不过这需要分布式文件系统予以支撑。
(4)顺序处理数据并避免随机访问(Process data sequentially and avoid random access):大数据处理通常意味着海量的数量难以全部载入内存,因而必须存储在磁盘上。然而,机械式磁盘寻道操作的先天性缺陷使得随机数据访问成为非常昂贵的操作,因此避免随机数据访问并以顺序处理为目的完成数据组织成为亟待之需。固态磁盘虽然避免了机械磁盘的某此缺陷,然而其高昂的价格以及并没有消除的随机访问问题仍然无法带来性能上的飞跃发展。MapReduce则主要设计用来在海量数据集上完成批处理操作,即所有的计算被组织成较长的流式处理操作,以延迟换取较大的吞吐能力。
(5)向程序员隐藏系统级别的细节(Hide system-level details from the application developer):
(6)无缝扩展(Seamless scalability):
1.2 MapReduce和大数据问题
海量数据处理的核心思想无非是将一个较大的问题进行“分割包围、逐个歼灭”。然而其难点和关键点在于如何将一个大的问题分分割成多个可以分别在不同的CPU上或不同的主机上进行处理的独立小问题,而且这些独立进行处理的小问题所产生的中间结果又该如何合并成最终结果并予以输出。因此,看似简单的化整为零的处理思想却不得不面临如下的难题:
(1) 如何将大问题分割为小任务?进一步地,如何将大问题分解为可以并行处理的小任务?
(2) 如何将分解好的小任务派送给分布式系统中的某主机且是较为适合解决此问题的主机上的worker完成处理?
(3) 如何保证某worker获取所需的数据?
(4) 如何协调不同worker之间进行同步?
(5) 如何将某worker的部分结果共享给其它需要此结果的worker?
(6) 如何在出现软件或硬件故障时仍然能保证上述工作的顺利进行?
在传统的并行或分布式编程模型中,程序员不得不显式地解决上述的部分甚至是全部问题,而在共享内存编程中,程序员需要显式地协调对共享数据结构的如互斥锁的访问、显式地通过栅(barrier)等设备解决进程同步问题、并得时刻警惕着程序中可能出现的死锁或竞争条件。虽然有些编程语言也或多或少地规避了让程序员面对上述问题,但却也避免不了将资源分配给各worker的问题。MapReduce的优势之一便是有效地向程序员隐藏了这些问题。
1.3 函数式编译语言
MapReduce是一种类似于Lisp或ML的函数式编程语言。函数式编程的核心特性之一是基于高阶函数,即能够接受其它函数作为参数的函数完成编程。MapReduce有两个常见地内置高阶函数map和fold。
如图所示,给定一个列表,map(接受一个参数)以函数f为其参数并将其应用于列表中的所有元素;fold(接受两个参数)以函数g和一个初始值作为参数,然后将g应用于初始值和列表中的第一个元素,结果被放置于中间变量中。中间变量和第二个元素将作为g函数下一次应用时的参数,而后如此操作直至将列表中的所有元素处理完毕后,fold会将最终处理结果保存至一个中间变量中。
于是,基于上述过程,我们可以把map视作利用f函数将给定数据集完成形式转换的操作,同样地,fold就可以被看作利用g函数完成数据聚合的操作。我们就可以由此得知,各函数式程序在运行时彼此间是隔离的,因此,在map中将f函数应用于列表中每一个元素的操作可以并行进行,进一步地讲,它们可以分布于集群中的不同节点上并行执行。然而,受限于数据的本地性,fold操作需要等到列表中的每一个元素都准备停当之后才能进行。幸运地是,现实生活中的应用程序并不要求g函数应用于列表中的所有元素,因此,列表中元素可以被分为多个逻辑组,并将fold操作并行地应用在这些逻辑组上即可。由此,fold操作也可以以并行的方式高效完成。
MapReduce有两个常见地内置高阶函数map和reduce,其map就类似于上述过程中的map操作,reduce对应于上述过程中的fold操作。只不过,MapReduce的执行框架能自行协调map与reduce并将其应用于在商业服务器硬件平台上并行处理海量数据。
更为精确地说,MapReduce有三个相互关联却各不相同的概念。首先,MapReduce是一个如上所述的函数式编程语言。其次,MapReduce也是一个运行框架,它能够协调运行基于MapReduce思想开发的程序。最后,MapReduce还可以被看作编程模型和执行框架的软件实现,如Google的专有实现和另一个开源实现Hadoop等。
1.4 mapper和reducer
键值对儿(Key-value pair)是MapReduce的基础数据结构。Key和Value可以是基础类型数据,如整数、浮点数、字符串或未经加工的字节数据,也可以是任意形式的复杂数据类型。程序员可以自行定义所需的数据类型,也可借助于Protocol Buffer、Thrift或Avro提供的便捷方式完成此类工作。
MapReduce算法设计的工作之一就是在给定数据集上定义“键-值”数据结构,比如在搜索引擎搜集、存储网页类工作中,key可以使用URL来表示,而value则是网页的内容。而在有些算法中,Key也可以是没有任何实际意义的数据,其在数据处理过程中可被安全忽略。在MapReduce中,程序员需要基于如下方式定义mapper和reducer:
map: (k1,v1)-->[(k2,v20)]
reduce: (k2,[v2])-->[(k3,v3)]
其中[...]意味着其可以是一个列表。这些传递给MapReduce进行处理的数据存储于分布式文件上,mapper操作将应用于每一个传递过来的键-值对并生成一定数量的中间键值对(intermediate key-value),而后reduce操作将应用于这些中间键值对并输出最终的键值对。然而,mapper操作和reducer操作之间还隐含着一个应用于中间键值对的“分组”操作,同一个键的键值对需要被归类至同一组中并发送至同一个reducer,而传送给每个reducer的分组中的键值对是基于键进行排序后的列表。reducer生成的结果将会保存至分布式文件系统,并存储为一个或多个以r(即reducer号码)结尾的文件,但mapper生成的中间键值对数据则不会被保存。
在Hadoop中,mapper和reducer是分别由MAP和REDUCE方法实现的对象。每个map任务(接收一个称作input split的键值对列表)都被初始化一个mapper对象,并会由执行框架为每个输入的键值对调用一次其map方法。程序员可以配置启动的map任务个数,但其真正启动的数目则由执行框架根据数据的物理分布最终给定。类似地,每个reduce任务由REDUCE方法初始化为一个reduce对象,并会由执行框架为其接受的每个中间键值对调用一次REDUCE方法,所不同的是,程序员可以明确限定启动的reduce任务的个数。
mapper和reducer可以直接在各自接收的数据上执行所需要的操作,然而,当使用到外部资源时,多个mapper或reducer之间可能会产生资源竞争,这势必导致其性能下降,因此,程序员必须关注其所用资源的竞争条件并加入适当处理。其次,mapper输出的中间键值对与接受的键值对可以是不同的数据类型,类似地,reducer输出的键值对与其接收的中间键值对也可以是不同的数据类型,这可能会给编程过程及程序运行中的故障排除带来困难,但这也正是MapReduce强大功能的体现之一。
除了常规的两阶段MapReduce处理流外,其还有一些变化形式。比如将mapper输出的结果直接保存至磁盘中(每个mapper对应一个文件)的没有reducer的MapReduce作业,不过仅有reducer而没有mapper的作业是不允许的。不过,就算用不着reducer处理具体的操作,利用reducer将mapper的输出结果进行重新分组和排序后进行输出也能以另一种形式提供的完整MapReduce模式。
MapReduce作业一般是通过HDFS读取和保存数据,但它也可以使用其它满足MapReduce应用的数据源或数据存储,比如Google的MapReduce实现中使用了Bigtable来完成数据的读入或输出。BigTable属于非关系的数据库,它是一个稀疏的、分布式的、持久化存储的多维度排序Map,其设计目的是可靠的处理PB级别的数据,并且能够部署到上千台机器上。在Hadoop中有一个类似的实现HBase可用于为MapReduce提供数据源和数据存储。
1.5 Hadoop运行框架
MapReduce程序也称作为MapReduce作业,一般由mapper代码、reducer代码以及其配置参数(如从哪儿读入数据,以及输出数据的保存位置)组成。准备好的作业可通过JobTracker(作业提交节点)进行提交,然后由运行框架负责完成后续的其它任务。这些任务主要包括如下几个方面。
(1) 调度
每个MapReduce作业都会划分为多个称作任务(task)的较小单元,而较大的作业划分的任务数量也可能会超出整个集群可运行的任务数,此时就需要调度器程序维护一个任务队列并能够追踪正在运行态任务的相关进程,以便让队列中处于等待状态的任务派送至某转为可用状态的节点运行。此外,调度器还要负责分属于不同作业的任务协调工作。
对于一个运行中的作业来说,只有所用的map任务都完成以后才能将中间数据分组、排序后发往reduce作业,因此,map阶段的完成时间取决于其最慢的一个作业的完成时间。类似的,reduce阶段的最后一个任务执行结束,其最终结果才为可用。因此,MapReduce作业完成速度则由两个阶段各自任务中的掉队者决定,最坏的情况下,这可能会导致作业长时间得不到完成。出于优化执行的角度,Hadoop和Google MapReduce实现了推测执行(Speculative execution)机制,即同一个任务会在不同的主机上启动多个执行副本,运行框架从其最快执行的任务中取得返回结果。不过,推测执行并不能消除其它的滞后场景,比如中间键值对数据的分发速度等。
(2) 数据和代码的协同工作(data/code co-location)
术语“数据分布”可能会带来误导,因为MapReduce尽力保证的机制是将要执行的代码送至数据所在的节点执行,因为代码的数据量通常要远小于要处理的数据本身。当然,MapReduce并不能消除数据传送,比如在某任务要处理的数据所在的节点已经启动很多任务时,此任务将不得不在其它可用节点运行。此时,考虑到同一个机架内的服务器有着较充裕的网络带宽,一个较优选择是从数据节点同一个机架内挑选一个节点来执行此任务。
(3) 同步(Synchronization)
异步环境下的一组并发进程因直接制约而互相发送消息而进行互相合作、互相等待,使得各进程按一定的速度执行的过程称为进程间同步,其可分为进程同步(或者线程同步)和数据同步。就编程方法来说,保持进程间同步的主要方法有内存屏障(Memory barrier),互斥锁(Mutex),信号量(Semaphore)和锁(Lock),管程(Monitor),消息(Message),管道(Pipe)等。MapReduce是通过在map阶段的进程与reduce阶段的进程之间实施隔离来完成进程同步的,即map阶段的所有任务都完成后对其产生的中间键值对根据键完成分组、排序后通过网络发往各reducer方可开始reduce阶段的任务,因此这个过程也称为“shuffle and sort”。
(4) 错误和故障处理(Error and fault handling)
MapReduce运行框架本身就是设计用来容易发生故障的商用服务器上了,因此,其必须有着良好的容错能力。在任何类别的硬件故障发生时,MapReduce运行框架均可自行将运行在相关节点的任务在一个新挑选出的节点上重新启动。同样,在任何程序发生故障时,运行框架也要能够捕获异常、记录异常并自动完成从异常中恢复。另外,在一个较大规模的集群中,其它任何超出程序员理解能力的故障发生时,MapReduce运行框架也要能够安全挺过。
1.6 partitioner和combiner
除了前述的内容中的组成部分,MapReduce还有着另外两个组件:partiontioner和combiner。
Partitioner负责分割中间键值对数据的键空间(即前面所谓的“分组”),并将中间分割后的中间键值对发往对应的reducer,也即partitioner负责完成为一个中间键值对指派一个reducer。最简单的partitioner实现是将键的hash码对reducer进行取余计算,并将其发往余数对应编号的reducer,这可以尽力保证每个reducer得到的键值对数目大体上是相同的。不过,由于partitioner仅考虑键而不考虑“值”,因此,发往每个reducer的键值对在键数目上的近似未必意味着数据量的近似。
Combiner是MapReduce的一种优化机制,它的主要功能是在“shuffle and sort”之前先在本地将中间键值对进行聚合,以减少在网络上发送的中间键值对数据量。因此可以把combiner视作在“shuffle and sort”阶段之前对mapper的输出结果所进行聚合操作的“mini-reducer”。在实现中,各combiner之间的操作是隔离的,因此,它不会涉及到其它mapper的数据结果。需要注意的是,就算是某combiner可以有机会处理某键相关的所有中间数据,也不能将其视作reducer的替代品,因为combiner输出的键值对类型必须要与mapper输出的键值对类型相同。无论如何,combiner的恰当应用将有机会有效提高作业的性能。
2.1 HDFS的设计理念
HDFS专为存储大文件而设计,可运行于普通的商业服务器上,基于流式数据访问模型完成数据存取。HDFS将所有文件的元数据存储于名称节点(NameNode)的内存中,能够利用分布式特性高效地管理“大”文件(GB级别甚至更大的文件),对于有着海量小文件的应用场景则会给名称节点带去巨大压力并使得其成为系统性能瓶颈。再者,HDFS为MapReduce的计算框架而设计,存储下来数据主要用于后续的处理分析,其访问模型为“一次写入、多次读取”;因此,数据在HDFS中存储完成后,仅能在文件尾部附加新数据,而不能对文件进行修改。另外,HDFS专为了高效地传输大文件进行了优化,其为了完成此目标,在“低延迟”特性上做出了很大让步,因此,其不适用于较小访问延迟的应用。
2.2 HDFS架构
2.2.1 HDFS数据块
与传统文件系统一样,HDFS也在“块(block)”级别存取文件,所不同的是,传统文件系统数据块一般较小(1KB、2KB或4KB等),HDFS的数据块大小默认为64MB,甚至可以使用128MB或256MB级别的数据块。HDFS使用块抽象层管理文件,可以实现将分块分为多个逻辑部分后分布于多个存储节点,也能够有效简化存储子系统。而对于存储节点来说,较大的块可以减少磁盘的寻道次数,进而提升I/O性能。
2.2.2 名称节点(NameNode)和数据节点(DataNode)
HDFS集群中节点的工作模型为“master-worker”:其包含一个名称节点(master)和多个数据节点(worker)。
名称节点负责管理HDFS的名称空间,即以树状结构组织的目录及文件的元数据信息,这些信息持久存储于名称节点本地磁盘上并保存为名称空间镜像(namespace image)和编辑日志(edit log)两个文件。名称节点并不存储数据块,它仅需要知道每个文件对应数据块的存储位置,即真正存储了数据块的数据节点。然而,名称节点并不会持久存储数据块所与其存储位置的对应信息,因为这些信息是在HDFS集群启动由名称节点根据各数据节点发来的信息进行重建而来。这个重建过程被称为HDFS的安全模式。数据节点的主要任务包括根据名称节点或客户的要求完成存储或读取数据块,并周期性地将其保存的数据块相关信息报告给名称节点。
默认情况下,HDFS会在集群中为每个数据块存储三个副本以确保数据的可靠性、可用性及性能表现。在一个大规模集群中,这三个副本一般会保存至不同机架中的数据节点上以应付两种常见的故障:单数据节点故障和导致某机架上的所有主机离线的网络故障。另外,如前面MapReduce运行模型中所述,为数据块保存多个副本也有利于MapReduce在作业执行过程中透明地处理节点故障等,并为MapReduce中作业协同处理以提升性能提供了现实支撑。名称节点会根据数据节点的周期性报告来检查每个数据块的副本数是否符合要求,低于配置个数要求的将会对其进行补足,而多出的将会被丢弃。
HDFS提供了POSIX网络的访问接口,所有的数据操作对客户端程序都是透明的。当客户端程序需要访问HDFS中的数据时,它首先基于TCP/IP协议与名称节点监听的TCP端口建立连接,接着通过客户端协议(Client Protocol)发起读取文件的请求,而后名称节点根据用户请求返回相关文件的块标识符(blockid)及存储了此数据块的数据节点。接下来客户端向对应的数据节点监听的端口发起请求并取回所需要数据块。当需要存储文件并写数据时,客户端程序首先会向名称节点发起名称空间更新请求,名称节点检查用户的访问权限及文件是否已经存在,如果没有问题,名称空间会挑选一个合适的数据节点分配一个空闲数据块给客户端程序。客户端程序直接将要存储的数据发往对应的数据节点,在完成存储后,数据节点将根据名称节点的指示将数据块复制多个副本至其它节点。
2.2.3 名称节点的可用性
由前一节所述的过程可以得知,名称节点的宕机将会导致HDFS文件系统中的所有数据变为不可用,而如果名称节点上的名称空间镜像文件或编辑日志文件损坏的话,整个HDFS甚至将无从重建,所有数据都会丢失。因此,出于数据可用性、可靠性等目的,必须提供额外的机制以确保此类故障不会发生,Hadoop为此提供了两种解决方案。
最简单的方式是将名称节点上的持久元数据信息实时存储多个副本于不同的存储设备中。Hadoop的名称节点可以通过属性配置使用多个不同的名称空间存储设备,而名称节点对多个设备的写入操作是同步的。当名称节点故障时,可在一台新的物理主机上加载一份可用的名称空间镜像副本和编辑日志副本完成名称空间的重建。然而,根据编辑日志的大小及集群规模,这个重建过程可能需要很长时间。
另一种方式是提供第二名称节点(Secondary NameNode)。第二名称节点并不真正扮演名称节点角色,它的主要任务是周期性地将编辑日志合并至名称空间镜像文件中以免编辑日志变得过大。它运行在一个独立的物理主机上,并需要跟名称节点同样大的内存资源来完成文件合并。另外,它还保存一份名称空间镜像的副本。然而,根据其工作机制可知,第二名称节点要滞后于主节点,因此名称节点故障时,部分数据丢失仍然不可避免。
尽管上述两种机制可以最大程序上避免数据丢失,但其并不具有高可用的特性,名称节点依然是一个单点故障,因为其宕机后,所有的数据将不能够被访问,进而所有依赖于此HDFS运行的MapReduce作业也将中止。就算是备份了名称空间镜像和编辑日志,在一个新的主机上重建名称节点并完成接收来自各数据节点的块信息报告也需要很长的时间才能完成。在有些应用环境中,这可能是无法接受的,为此,Hadoop 0.23引入了名称节点的高可用机制——设置两个名称节点工作于“主备”模型,主节点故障时,其所有服务将立即转移至备用节点。进一步信息请参考官方手册。
在大规模的HDFS集群中,为了避免名称节点成为系统瓶颈,在Hadoop 0.23版本中引入了HDFS联邦(HDFS Federation)机制。HDFS联邦中,每个名称节点管理一个由名称空间元数据和包含了所有块相关信息的块池组成名称空间卷(namespace volume),各名称节点上的名称空间卷是互相隔离的,因此,一个名称节点的损坏并不影响其它名称节点继续提供服务。进一步信息请参考官方手册。
二、安装配置hadoop:
2.1 安装前的准备工作
本示例所演示的过程基于RHEL 5.8(32bit)平台,用到的应用程序如下所示。
JDK: jdk-7u5-linux-i586.rpm
Hadoop:hadoop-0.20.2-cdh3u5.tar.gz
安全起见,运行Hadoop需要以普通用户的身份进行,因此,接下来先建立运行hadoop进程的用户hadoop并给其设定密码。
1
2
|
# useradd hadoop
# echo "password" | passwd --stdin hadoop
|
而后配置hadoop用户能够以基于密钥的验正方式登录本地主机,以便Hadoop可远程启动各节点上的Hadoop进程并执行监控等额外的管理工作。
1
2
3
|
[root@master ~]
# su - hadoop
[hadoop@master ~]$
ssh
-keygen -t rsa -P
''
[hadoop@master ~]$
ssh
-copy-
id
-i .
ssh
/id_rsa
.pub hadoop@localhost
|
2.2 安装JDK
Hadoop依赖于1.6 update 8或更新版本的Java环境。本文采用的jdk是rpm格式的安装包,在oracle官方的下载页面中即可找到合适的版本。其安装过程非常简单,使用类似如下命令即可。
1
|
# rpm -ivh jdk-7u5-linux-i586.rpm
|
Hadoop运行时需要能访问到如前安装的Java环境,这可以通过将其二进制程序(/usr/java/latest)所在的目录添加至PATH环境变量的路径中实现,也可以通过设定hadoop-env.sh脚本来进行。这里采用前一种方式,编辑/etc/profile.d/java.sh,在文件中添加如下内容:
1
2
3
|
JAVA_HOME=
/usr/java/latest/
PATH=$JAVA_HOME
/bin
:$PATH
export
JAVA_HOME PATH
|
切换至hadoop用户,并执行如下命令测试jdk环境配置是否就绪。
1
2
3
4
5
|
# su - hadoop
$ java -version
java version
"1.7.0_05"
Java(TM) SE Runtime Environment (build 1.7.0_05-b05)
Java HotSpot(TM) Client VM (build 23.1-b03, mixed mode, sharing)
|
2.3 hadoop安装配置
2.3.1 安装:
1
2
3
|
# tar xf hadoop-0.20.2-cdh3u5.tar.gz -C /usr/local
# chown -R hadoop:hadoop /usr/local/hadoop-0.20.2-cdh3u5
# ln -sv /usr/local/hadoop-0.20.2-cdh3u5 /usr/local/hadoop
|
然后编辑/etc/profile.d/hadoop.sh,设定HADOOP_HOME环境变量的值为hadoop的解压目录,并让其永久有效。编辑/etc/profile,添加如下内容:
1
2
3
|
HADOOP_BASE=
/usr/local/hadoop
PATH=$HADOOP_BASE
/bin
:$PATH
export
HADOOP_BASE PATH
|
切换至hadoop用户,并执行如下命令测试hadoop是否就绪。
1
2
3
4
5
|
# hadoop version
Hadoop 0.20.2-cdh3u5
Subversion git:
//ubuntu-slave02/var/lib/jenkins/workspace/CDH3u5-Full-RC/build/cdh3/hadoop20/0
.20.2-cdh3u5
/source
-r 30233064aaf5f2492bc687d61d72956876102109
Compiled by jenkins on Fri Oct 5 17:21:34 PDT 2012
From
source
with checksum de1770d69aa93107a133657faa8ef467
|
2.3.2 Hadoop的配置文件:
hadoop-env.sh: 用于定义hadoop运行环境相关的配置信息,比如配置JAVA_HOME环境变量、为hadoop的JVM指定特定的选项、指定日志文件所在的目录路径以及master和slave文件的位置等;
core-site.xml: 用于定义系统级别的参数,如HDFS URL、Hadoop的临时目录以及用于rack-aware集群中的配置文件的配置等,此中的参数定义会覆盖core-default.xml文件中的默认配置;
hdfs-site.xml: HDFS的相关设定,如文件副本的个数、块大小及是否使用强制权限等,此中的参数定义会覆盖hdfs-default.xml文件中的默认配置;
mapred-site.xml:HDFS的相关设定,如reduce任务的默认个数、任务所能够使用内存的默认上下限等,此中的参数定义会覆盖mapred-default.xml文件中的默认配置;
masters: hadoop的secondary-masters主机列表,当启动Hadoop时,其会在当前主机上启动NameNode和JobTracker,然后通过SSH连接此文件中的主机以作为备用NameNode;
slaves:Hadoop集群的slave主机列表,master启动时会通过SSH连接至此列表中的所有主机并为其启动DataNode和taskTracker进程;
2.3.3 Hadoop的分布式模型
Hadoop通常有三种运行模式:本地(独立)模式、伪分布式(Pseudo-distributed)模式和完全分布式(Fully distributed)模式。
安装完成后,Hadoop的默认配置即为本地模式,此时Hadoop使用本地文件系统而非分布式文件系统,而且其也不会启动任何Hadoop守护进程,Map和Reduce任务都作为同一进程的不同部分来执行。因此,本地模式下的Hadoop仅运行于本机。此种模式仅用于开发或调试MapReduce应用程序但却避免了复杂的后续操作。
伪分布式模式下,Hadoop将所有进程运行于同一台主机上,但此时Hadoop将使用分布式文件系统,而且各jobs也是由JobTracker服务管理的独立进程。同时,由于伪分布式的Hadoop集群只有一个节点,因此HDFS的块复制将限制为单个副本,其secondary-master和slave也都将运行于本地主机。此种模式除了并非真正意义的分布式之外,其程序执行逻辑完全类似于完全分布式,因此,常用于开发人员测试程序执行。
要真正发挥Hadoop的威力,就得使用完全分布式模式。由于ZooKeeper实现高可用等依赖于奇数法定数目(an odd-numbered quorum),因此,完全分布式环境需要至少三个节点。
2.3.4 配置Hadoop的伪分布式模式
传统上使用的hadoop-site.xml文件已经过时,现在分别使用core-site.xml、mapred-site.xml和hdfs-site.xml来取代core-default.xml、mapred-default.xml和 hdfs-default.xml中的默认配置。hadoop为这些文件提供了模板,其关于xml文档文件格式定义的部分及<configure></configure>已经存在,此时所需要做的仅是在其中添加相应的配置即可。
2.3.4.1 编辑conf/core-site.xml,配置Hadoop的核心属性
1
2
3
4
5
6
7
8
9
10
11
12
13
|
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<configuration>
<property>
<name>hadoop.tmp.
dir
<
/name
>
<value>
/hadoop/temp
<
/value
>
<
/property
>
<property>
<name>fs.default.name<
/name
>
<value>hdfs:
//localhost
:8020<
/value
>
<
/property
>
<
/configuration
>
|
上面示例中hadoop.tmp.dir属性用于定义Hadoop的临时目录,其默认为/tmp/hadoop-${username}。HDFS进程的许多目录默认都在此目录中,本示例将其定义到了/hadoop/temp目录,需要注意的是,要保证运行Hadoop进程的用户对其具有全部访问权限。fs.default.name属性用于定义HDFS的名称节点和其默认的文件系统,其值是一个URI,即NameNode的RPC服务器监听的地址(可以是主机名)和端口(默认为8020)。其默认值为file:///,即本地文件系统。
2.3.4.2 编辑conf/mapred-site.xml,定义MapReduce
运行MapReduce需要为其指定一个主机作为JobTracker节点,在一个小规模的Hadoop集群中,它通常跟NameNode运行于同一物理主机。可以通过mapred.job.trakcer属性定义JobTracker监听的地址(或主机名)和端口(默认为8021),与前面的fs.default.name属性的值不同的是,这不是一个URI,而仅一个“主机-端口”组合。
在MapReduce作业运行过程中,中间数据(intermediate data)和工作文件保存于本地临时文件中。根据运行的MapReduce作业不同,这些数据文件可能会非常大,因此,应该通过mapred.local.dir属性为其指定一个有着足够空间的本地文件系统路径,其默认值为${hadoop.tmp.dir}/mapred/local。mapred.job.tracker可以接受多个以逗号分隔路径列表作为其值,并会以轮流的方式将数据分散存储在这些文件系统上,因此指定位于不同磁盘上的多个文件系统路径可以分散数据I/O。
另外,MapReduce使用分布式文件系统为各TaskTracker保存共享数据,这可以通过mapred.system.dir属性进行定义,其默认值为${hadoop.tmp.dir}/mapred/system。下面给出了一个较简单的mapred-site.xml文件示例。
1
2
3
4
5
6
7
8
|
<?xml version=
"1.0"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<configuration>
<property>
<name>mapred.job.tracker<
/name
>
<value>localhost:8021<
/value
>
<
/property
>
<
/configuration
>
|
2.3.4.3 编辑conf/hdfs-site.xml,定义hdfs的属性
HDFS进程有许多属性可以定义其工作路,如dfs.name.dir属性定义的HDFS元数据持久存储路径默认为${hadoop.tmp.dir}/dfs/name、dfs.data.dir属性定义的DataNode用于存储数据块的目录路径默认为${hadoop.tmp.dir}/dfs/data、fs.checkpoint.dir属性定义的SecondaryNameNode用于存储检查点文件的目录默认为${hadoop.tmp.dir}/dfs/namesecondary。
为了数据可用性及冗余的目的,HDFS会在多个节点上保存同一个数据块的多个副本,其默认为3个。而只有一个节点的伪分布式环境中其仅用保存一个副本即可,这可以通过dfs.replication属性进行定义。如下所示的内容即可作为最简单的hdfs-site.xml配置文件。
1
2
3
4
5
6
7
8
|
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<configuration>
<property>
<name>dfs.replication<
/name
>
<value>1<
/value
>
<
/property
>
<
/configuration
>
|
2.3.4.4 格式化名称节点
以hadoop用户运行如下命令
1
|
$ hadoop namenode -
format
|
其执行后会显示为类似如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
12
/12/06
22:16:02 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost.localdomain
/127
.0.0.1
STARTUP_MSG: args = [-
format
]
STARTUP_MSG: version = 0.20.2-cdh3u5
STARTUP_MSG: build = git:
//ubuntu-slave02/var/lib/jenkins/workspace/CDH3u5-Full-RC/build/cdh3/hadoop20/0
.20.2-cdh3u5
/source
-r 30233064aaf5f2492bc687d61d72956876102109; compiled by
'jenkins'
on Fri Oct 5 17:21:34 PDT 2012
************************************************************/
12
/12/06
22:16:03 INFO util.GSet: VM
type
= 32-bit
12
/12/06
22:16:03 INFO util.GSet: 2% max memory = 19.33375 MB
12
/12/06
22:16:03 INFO util.GSet: capacity = 2^22 = 4194304 entries
12
/12/06
22:16:03 INFO util.GSet: recommended=4194304, actual=4194304
12
/12/06
22:16:03 INFO namenode.FSNamesystem: fsOwner=hadoop (auth:SIMPLE)
12
/12/06
22:16:04 INFO namenode.FSNamesystem: supergroup=supergroup
12
/12/06
22:16:04 INFO namenode.FSNamesystem: isPermissionEnabled=
true
12
/12/06
22:16:04 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=1000
12
/12/06
22:16:04 INFO namenode.FSNamesystem: isAccessTokenEnabled=
false
accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
12
/12/06
22:16:04 INFO common.Storage: Image
file
of size 112 saved
in
0 seconds.
12
/12/06
22:16:05 INFO common.Storage: Storage directory
/hadoop/temp/dfs/name
has been successfully formatted.
12
/12/06
22:16:05 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost.localdomain
/127
.0.0.1
************************************************************/
|
其中的“Storage directory /hadoop/temp/dfs/name has been successfully formatted”一行信息表明对应的存储已经格式化成功。
2.3.4.5 启动hadoop
Hadoop提供了2个脚本start-dfs.sh和start-mapred.sh,分别用于启动hdfs相关的进程和mapred相关的进程。事实上,为了使用的便捷性,在NameNode和JobTracker运行于同一主机的场景中,Hadoop还专门提供了脚本start-all.sh脚本来自动执行前述两个脚本。
1
|
$
/usr/local/hadoop/bin/start-all
.sh
|
其会输出类似如下内容:
1
2
3
4
5
|
starting namenode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-namenode-localhost
.localdomain.out
localhost: starting datanode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-datanode-localhost
.localdomain.out
localhost: starting secondarynamenode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-secondarynamenode-localhost
.localdomain.out
starting jobtracker, logging to
/usr/local/hadoop/logs/hadoop-hadoop-jobtracker-localhost
.localdomain.out
localhost: starting tasktracker, logging to
/usr/local/hadoop/logs/hadoop-hadoop-tasktracker-localhost
.localdomain.out
|
运行jps命令查看正在运行的Hadoop进程
1
2
3
4
5
6
|
$ jps |
grep
-iv
"jps"
29326 DataNode
29478 SecondaryNameNode
29685 TaskTracker
29208 NameNode
29563 JobTracker
|
2.3.4.6 Hadoop进程监听的地址和端口
Hadoop启动时会运行两个服务器进程,一个为用于Hadoop各进程之间进行通信的RPC服务器,另一个是提供了便于管理员查看Hadoop集群各进程相关信息页面的HTTP服务器。
用于定义各RPC服务器所监听的地址和端口的属性有如下几个:
fs.default.name:定义HDFS的NameNode用于提供URI所监听的地址和端口,默认端口为8020;
dfs.datanode.ipc.address:DataNode上IPC服务器监听的地址和端口,默认为0.0.0.0:50020;
mapred.job.tracker:JobTracker的PRC服务器所监听的地址和端口,默认端口为8021;
mapred.task.tracker.report.address:TaskTracker的RPC服务器监听的地址和端口;TaskTracker的子JVM使用此端口与TaskTracker进行通信,它仅需要监听在本地回环地址127.0.0.1上,因此可以使用任何端口;只有在当本地没有回环接口时才需要修改此属性的值;
除了RPC服务器之外,DataNode还会运行一个TCP/IP服务器用于数据块传输,其监听的地址和端口可以通过dfs.datanode.address属性进行定义,默认为0.0.0.0:50010。
可用于定义各HTTP服务器的属性有如下几个:
mapred.job.tracker.http.addrss:JobTracker的HTTP服务器地址和端口,默认为0.0.0.0:50030;
mapred.task.tracker.http.address:TaskTracker的HTTP服务器地址和端口,默认为0.0.0.0:50060;
dfs.http.address:NameNode的HTTP服务器地址和端口,默认为0.0.0.0:50070;
dfs.datanode.http.address:DataNode的HTTP服务器地址和端口,默认为0.0.0.0:50075;
dfs.secondary.http.address:SecondaryNameNode的HTTP服务器地址和端口,默认为0.0.0.0:50090;
上述的HTTP服务器均可以通过浏览器直接访问以获取对应进程的相关信息。
下面的命令可以查看jvm监听的端口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
$
netstat
-tnlp |
grep
"java"
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 0.0.0.0:50020 0.0.0.0:* LISTEN 29326
/java
tcp 0 0 0.0.0.0:52805 0.0.0.0:* LISTEN 29208
/java
tcp 0 0 0.0.0.0:50090 0.0.0.0:* LISTEN 29478
/java
tcp 0 0 0.0.0.0:50060 0.0.0.0:* LISTEN 29685
/java
tcp 0 0 0.0.0.0:50030 0.0.0.0:* LISTEN 29563
/java
tcp 0 0 0.0.0.0:51664 0.0.0.0:* LISTEN 29478
/java
tcp 0 0 0.0.0.0:54898 0.0.0.0:* LISTEN 29326
/java
tcp 0 0 0.0.0.0:55475 0.0.0.0:* LISTEN 29563
/java
tcp 0 0 127.0.0.1:8020 0.0.0.0:* LISTEN 29208
/java
tcp 0 0 127.0.0.1:44949 0.0.0.0:* LISTEN 29685
/java
tcp 0 0 127.0.0.1:8021 0.0.0.0:* LISTEN 29563
/java
tcp 0 0 0.0.0.0:50070 0.0.0.0:* LISTEN 29208
/java
tcp 0 0 0.0.0.0:50010 0.0.0.0:* LISTEN 29326
/java
tcp 0 0 0.0.0.0:50075 0.0.0.0:* LISTEN 29326
/java
|
2.4 Hadoop命令
hadoop有很多子命令,用于完成不同的功能,这个可以通过如下命令查看。
1
|
$ hadoop
|
其中的fs子命令用于进行跟文件系统相关的多种操作,如创建目录、复制文件或目录等,其具体的使用帮助可以使用如下命令获得。
1
|
$ hadoop fs -help
|
hadoop的fs命令能同时跟本地文件系统和HDFS交互,甚至可以跟Amazon的S3进行交互。其使用URI格式路径引用文件路径,完全的URI格式类似schema://authority/path,其中schema类似于协议,这里可以使用hdfs或file,分别用于引用 HDFS文件或本地文件系统中的文件。而对于HDFS来说,authority是指Namenode主机,path是指具体的文件路径。例如,在伪文件系统模式中,HDFS运行于本机的8020端口,因此hdfs://localhost:8020/user/hadoop/test.txt就是一个完整意义上的URI。事实上,在使用中,也可以省略URI中的schema://authority部分,此时其使用配置文件中默认名称段的定义,如我们前面定义的类似如下段的配置信息:
1
2
3
4
|
<property>
<name>fs.default.name<
/name
>
<value>hdfs:
//localhost
:9000<
/value
>
<
/property
>
|
在fs命令用于实现在本地文件系统和HDFS之间传递文件时,可以使用-get(从HDFS中复制文件至本地文件系统)或-put(将本地文件复制到HDFS中)命令实现,而fs会根据使用的命令来判断哪个是本地文件系统路径,哪个是HDFS文件系统路径,如将本地的/etc/issue复制到HDFS中存放至当前hadoop用户的目录中,则可使用如下命令:
1
|
$ hadoop fs -put
/etc/issue
hdfs:
//localhost
:9000
/user/hadoop/
|
或使用命令
1
|
$ hadoop fs -put
/etc/issue
/user/hadoop/
|
复制的结果可以使用如下命令查看:
1
2
3
|
$ hadoop fs -
ls
Found 1 items
-rw-r--r-- 1 hadoop supergroup 74 2012-09-20 23:03
/user/hadoop/issue
|
hadoop对文件系统的管理是通过java类来实现的,而其用于文件系统管理的类有多种,分别用于通过不同的方式访问不同的文件系统。hdfs和file是schema中常见的两种方式。
hadoop常用的命令行命令及其用法列表请参见如下链接:
HDFS命令:http://hadoop.apache.org/common/docs/r1.0.0/file_system_shell.html
MapReduce的job命令:http://hadoop.apache.org/common/docs/r1.0.0/commands_manual.html#job
2.5 测试Hadoop
Hadoop提供了MapReduce编程框架,其并行处理能力的发挥需要通过开发Map及Reduce程序实现。为了便于系统测试,Hadoop提供了一个单词统计的应用程序算法样例,其位于Hadoop安装目录下名称类似hadoop-examples-*.jar的文件中。除了单词统计,这个jar文件还包含了分布式运行的grep等功能的实现,这可以通过如下命令查看。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
$ hadoop jar
/usr/local/hadoop/hadoop-examples-0
.20.2-cdh3u5.jar
An example program must be given as the first argument.
Valid program names are:
aggregatewordcount: An Aggregate based map
/reduce
program that counts the words
in
the input files.
aggregatewordhist: An Aggregate based map
/reduce
program that computes the histogram of the words
in
the input files.
dbcount: An example job that count the pageview counts from a database.
grep
: A map
/reduce
program that counts the matches of a regex
in
the input.
join
: A job that effects a
join
over sorted, equally partitioned datasets
multifilewc: A job that counts words from several files.
pentomino: A map
/reduce
tile laying program to
find
solutions to pentomino problems.
pi: A map
/reduce
program that estimates Pi using monte-carlo method.
randomtextwriter: A map
/reduce
program that writes 10GB of random textual data per node.
randomwriter: A map
/reduce
program that writes 10GB of random data per node.
secondarysort: An example defining a secondary
sort
to the reduce.
sleep
: A job that sleeps at each map and reduce task.
sort
: A map
/reduce
program that sorts the data written by the random writer.
sudoku: A sudoku solver.
teragen: Generate data
for
the terasort
terasort: Run the terasort
teravalidate: Checking results of terasort
wordcount: A map
/reduce
program that counts the words
in
the input files.
|
接下来的过程来演示在HDFS的wc-in目录中存放两个测试文件,而后运行wordcount程序实现对这两个测试文件中各单词出现次数进行统计的实现过程。首先创建wc-in目录,并复制文件至HDFS文件系统中。
1
2
|
$ hadoop fs -
mkdir
wc
-
in
$ hadoop fs -put
/etc/rc
.d
/init
.d
/functions
/etc/profile
wc
-
in
|
接下来启动分布式任务,其中的wc-out为reduce任务执行结果文件所在的目录,此目标要求事先不能存在,否则运行将会报错。
1
|
$ hadoop jar
/usr/local/hadoop/hadoop-example-
*.jar wordcount
wc
-
in
wc
-out
|
输出结果类似如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
12
/12/06
23:11:38 INFO input.FileInputFormat: Total input paths to process : 2
12
/12/06
23:11:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library
for
your platform... using
builtin
-java classes where applicable
12
/12/06
23:11:38 WARN snappy.LoadSnappy: Snappy native library not loaded
12
/12/06
23:11:38 INFO mapred.JobClient: Running job: job_201212062231_0001
12
/12/06
23:11:39 INFO mapred.JobClient: map 0% reduce 0%
12
/12/06
23:11:50 INFO mapred.JobClient: map 100% reduce 0%
12
/12/06
23:11:58 INFO mapred.JobClient: map 100% reduce 33%
12
/12/06
23:12:00 INFO mapred.JobClient: map 100% reduce 100%
12
/12/06
23:12:02 INFO mapred.JobClient: Job complete: job_201212062231_0001
12
/12/06
23:12:02 INFO mapred.JobClient: Counters: 26
12
/12/06
23:12:02 INFO mapred.JobClient: Job Counters
12
/12/06
23:12:02 INFO mapred.JobClient: Launched reduce tasks=1
12
/12/06
23:12:02 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=14810
12
/12/06
23:12:02 INFO mapred.JobClient: Total
time
spent by all reduces waiting after reserving slots (ms)=0
12
/12/06
23:12:02 INFO mapred.JobClient: Total
time
spent by all maps waiting after reserving slots (ms)=0
12
/12/06
23:12:02 INFO mapred.JobClient: Launched map tasks=2
12
/12/06
23:12:02 INFO mapred.JobClient: Data-
local
map tasks=2
12
/12/06
23:12:02 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10058
12
/12/06
23:12:02 INFO mapred.JobClient: FileSystemCounters
12
/12/06
23:12:02 INFO mapred.JobClient: FILE_BYTES_READ=11699
12
/12/06
23:12:02 INFO mapred.JobClient: HDFS_BYTES_READ=15943
12
/12/06
23:12:02 INFO mapred.JobClient: FILE_BYTES_WRITTEN=182084
12
/12/06
23:12:02 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=8152
12
/12/06
23:12:02 INFO mapred.JobClient: Map-Reduce Framework
12
/12/06
23:12:02 INFO mapred.JobClient: Map input records=666
12
/12/06
23:12:02 INFO mapred.JobClient: Reduce shuffle bytes=11705
12
/12/06
23:12:02 INFO mapred.JobClient: Spilled Records=1632
12
/12/06
23:12:02 INFO mapred.JobClient: Map output bytes=23420
12
/12/06
23:12:02 INFO mapred.JobClient: CPU
time
spent (ms)=4300
12
/12/06
23:12:02 INFO mapred.JobClient: Total committed heap usage (bytes)=337190912
12
/12/06
23:12:02 INFO mapred.JobClient: Combine input records=2343
12
/12/06
23:12:02 INFO mapred.JobClient: SPLIT_RAW_BYTES=226
12
/12/06
23:12:02 INFO mapred.JobClient: Reduce input records=816
12
/12/06
23:12:02 INFO mapred.JobClient: Reduce input
groups
=762
12
/12/06
23:12:02 INFO mapred.JobClient: Combine output records=816
12
/12/06
23:12:02 INFO mapred.JobClient: Physical memory (bytes) snapshot=324112384
12
/12/06
23:12:02 INFO mapred.JobClient: Reduce output records=762
12
/12/06
23:12:02 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1122086912
12
/12/06
23:12:02 INFO mapred.JobClient: Map output records=2343
|
命令的执行结果按上面命令的指定存储于wc-out目录中:
1
2
3
4
5
|
$ hadoop fs -
ls
wc
-out
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2012-12-06 23:12
/user/hadoop/wc-out/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2012-12-06 23:11
/user/hadoop/wc-out/_logs
-rw-r--r-- 1 hadoop supergroup 8152 2012-12-06 23:11
/user/hadoop/wc-out/part-r-00000
|
其中的part-r-00000正是其执行结果的输出文件,使用如下命令查看其执行结果。
1
|
$ hadoop fs -
cat
wc
-out
/part-r-00000
|
文件的部分内容如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
"$BOOTUP"
17
"$CONSOLETYPE"
1
"$EUID"
2
"$GRAPHICAL"
1
"$HOME/.inputrc"
1
"$INPUTRC"
1
"$RC"
3
"$STRING2
"$answer"
4
"$base1
"$base"
1
"$corelimit2
"$file"
3
"$force"
1
"$gotbase"
1
"$i"
1
"$killlevel"
3
"$line"
2
"$pid"
8
"$pid_file"
9
"$rc"
1
"$remaining"
4
"$retry"
4
"$user"
1
"`id2
"after"
1
"color"
12
"hex"
1
"no1
"pidof"
1
|
三、开发mapreduce:
尽管hadoop的框架是基于JAVA语言实现,但MapReduce程序却未必一定要使用java来开发。
Hadoop Streaming:Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer;
Hadoop Pipes和SWIG:开发MapReduce程序的可兼容性 C++ API;
四、简单配置Hadoop
4.1 设定Hadoop参数
Hadoop有很多参数,其默认配置大多数仅适用于standalone模式,虽然大多情况下在完全分布式(Fully distributed)模式中也没有问题,但距最优化的运行模式去相去甚远。在生产环境中通常需要调整的参数有:
1. dfs.name.dir —— NameNode节点用于存储HDFS元数据的本地目录,官方建议为/home/hadoop/dfs/name;
2. dfs.data.dir —— DataNode节点用于存储HDFS文件数据块的本地目录,官方建议为/home/hadoop/dfs/data;
3. mapred.system.dir —— HDFS中用于存储共享的MapReduce系统文件的目录,官方建议为/hadoop/mapred/system;
4. mapred.local.dir —— TaskNode节点用于存储临时数据的本地文件目录;
5. mapred.tasktracker.{map|reduce}.tarks.maximum —— 在TaskTracker上可同时运行的的map或reduce任务的最大数目;
6. hadoop.tmp.dir —— Hadoop临时目录;
7. mapred.child.java.opts —— 每个子任务可申请使用的heap大小;官方建议为-Xmx512m;
8. mapred.reduce.tasks —— 每任务的reduce数量;
上述参数中的大多数都可以接受多个以逗号分隔的目录,尤其是对于dfs.name.dir来说,多个目录还可以达到冗余的目的;而对于拥有多块磁盘的DataNode,为其dfs.data.dir指定多个值可以存储数据于多个磁盘,并能通过并行加速I/O操作。为mapred.local.dir指定多个眼光也能起到一定的加速作用。
此外,hadoop.tmp.dir对于不同的用户来说其路径是不相同的,事实上,应该尽量避免让此路径依赖用户属性,比如可以放在一个公共位置让所有用户都可以方便地访问到。在Linux系统下,hadoop.tmp.dir的默认路径指向了/tmp,这的确是一个公共位置,但/tmp目录所在的文件系统大多数都有使用配额,而且空间也通常比较有限,因此,故此此路径殊非理想之所在。建议将其指向一个有着足够空间的文件系统上的目录。
默认配置中,Hadoop可以在每个TaskTracker上运行四个任务(两个map任务,两个reduce任务),这可以通过mapred.tasktracker.{map|reduce}.tarks.maximum进行配置,通常建议其个数为与CPU核心数目相同或者为CPU核心数目的2倍,但其最佳值通常依赖于诸多因素,而在CPU密集型的应用场景中也不应该将其最大数目设置得过高。除了CPU之外,还应该考虑每个任务所能够使用的的heap空间大小所带来的影响;默认情况下,Hadoop为每个任务指定了200MB的heap空间上限,由于每个job可能会申请使用很大的heap,因此,过高的设定可能会带来意外的结果。
每个MapReduce任务可以通过mapred.reduce.tasks配置其运行的reduce任务数,通常也应该为其指定一个在多数场景下都能工作良好的默认值,比如Hadoop默认将此数目指定为1个,这对大多数任务来讲都有着不错的性能表现。而实际使用中,一般建议将此值设定为当前Hadoop集群可以运行的reduce任务总数的0.95倍或1.75倍。0.95这个因子意味着Hadoop集群可以立即启动所有的reduce任务并在map任务完成时接收数据并进行处理,而1.75则意味着先启动部分reduce任务,执行速度快的节点上的reduce完成后可再启动一个新的reduce任务,而速度慢的节点则无须执行此类操作。这会带来较为理想的负载均衡效果。
4.2 Hadoop状态监测
Hadoop提供了fsck工具用于HDFS文件系统状态检测,其使用语法如下:
hadoop fsck [GENERIC_OPTIONS] <path> [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
其中GENERIC_OPTIONS是hadoop的各子命令均支持使用的选择,如使用-conf指定配置文件等。在执行HDFS状态检测时,fsck会忽略正在被某客户执行写操作的文件,这些正在被修改或写入数据的文件可以使用-openforwrite选项在检测结果中予以显示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
$ hadoop
fsck
/ -openforwrite
FSCK started by hadoop (auth:SIMPLE) from
/127
.0.0.1
for
path / at Thu Dec 06 23:19:35 CST 2012
.......Status: HEALTHY
Total size:84064 B
Total
dirs
:14
Total files:7
Total blocks (validated):6 (avg. block size 14010 B)
Minimally replicated blocks:6 (100.0 %)
Over-replicated blocks:0 (0.0 %)
Under-replicated blocks:0 (0.0 %)
Mis-replicated blocks:0 (0.0 %)
Default replication factor:1
Average block replication:1.0
Corrupt blocks:0
Missing replicas:0 (0.0 %)
Number of data-nodes:1
Number of racks:1
FSCK ended at Thu Dec 06 23:19:35 CST 2012
in
14 milliseconds
The filesystem under path
'/'
is HEALTHY
|
fsck会为每个健康状态的文件打印一个点号(.),如上面的输出结果所示。同时,每个文件其复制块数多于、少于配置的数目、错误的复制块、损坏的数据块及缺少数据块的相关信息也会由fsck在执行结果中显示。其中,多于或少于配置的数目,或错误的复制块不会被视作严重的错误,而错误的复制块、损坏的复制块或缺少复制块则意味着数据的永久性丢失。使用fsck的-delete选项则可用于在检测过程中删除这些损坏状态的文件,-move选项则可用于将这些文件移动至lost+found目录中。
fsck命令格式中的[-files [-blocks [-locations | -racks]]]用于让其输出更为详细的检测信息,-files后的每个选项都依赖于其前面选项,因此,要使用-blocks则必须同时使用-files,要使用-locations则必须同时使用-blocks和-files;而-locations和-racks则既可为同级别的选项,使用时二选一,也可将-racksg与-locations同时使用。-files选项用于输出文件自身的状态信息,如文件路径、大小、占用的数据块及状态;-blocks选项用于输出每个数据块的相关信息;-locations用于输出每个复制块的具体位置;-racks则用于显示数据节点位置的拓扑信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
$ hadoop
fsck
/ -openforwrite -files -blocks -locations -racks
FSCK started by hadoop (auth:SIMPLE) from
/127
.0.0.1
for
path / at Thu Dec 06 23:20:56 CST 2012
/ <
dir
>
/hadoop
<
dir
>
/hadoop/temp
<
dir
>
/hadoop/temp/mapred
<
dir
>
/hadoop/temp/mapred/staging
<
dir
>
/hadoop/temp/mapred/staging/hadoop
<
dir
>
/hadoop/temp/mapred/staging/hadoop/
.staging <
dir
>
/hadoop/temp/mapred/system
<
dir
>
/hadoop/temp/mapred/system/jobtracker
.info 4 bytes, 1 block(s): OK
0. blk_8000193754097372869_1002 len=4 repl=1 [
/default-rack/127
.0.0.1:50010]
/user
<
dir
>
/user/hadoop
<
dir
>
/user/hadoop/wc-in
<
dir
>
/user/hadoop/wc-in/functions
14291 bytes, 1 block(s): OK
0. blk_-2535948061878952264_1003 len=14291 repl=1 [
/default-rack/127
.0.0.1:50010]
/user/hadoop/wc-in/profile
1426 bytes, 1 block(s): OK
0. blk_5095873178802140996_1004 len=1426 repl=1 [
/default-rack/127
.0.0.1:50010]
/user/hadoop/wc-out
<
dir
>
/user/hadoop/wc-out/_SUCCESS
0 bytes, 0 block(s): OK
/user/hadoop/wc-out/_logs
<
dir
>
/user/hadoop/wc-out/_logs/history
<
dir
>
/user/hadoop/wc-out/_logs/history/job_201212062231_0001_1354806698562_hadoop_word
+count 14687 bytes, 1 block(s): OK
0. blk_-6941616318716878099_1014 len=14687 repl=1 [
/default-rack/127
.0.0.1:50010]
/user/hadoop/wc-out/_logs/history/localhost_1354804300040_job_201212062231_0001_conf
.xml 45504 bytes, 1 block(s): OK
0. blk_438023500023782106_1011 len=45504 repl=1 [
/default-rack/127
.0.0.1:50010]
/user/hadoop/wc-out/part-r-00000
8152 bytes, 1 block(s): OK
0. blk_-4078455009405364683_1013 len=8152 repl=1 [
/default-rack/127
.0.0.1:50010]
Status: HEALTHY
Total size:84064 B
Total
dirs
:14
Total files:7
Total blocks (validated):6 (avg. block size 14010 B)
Minimally replicated blocks:6 (100.0 %)
Over-replicated blocks:0 (0.0 %)
Under-replicated blocks:0 (0.0 %)
Mis-replicated blocks:0 (0.0 %)
Default replication factor:1
Average block replication:1.0
Corrupt blocks:0
Missing replicas:0 (0.0 %)
Number of data-nodes:1
Number of racks:1
FSCK ended at Thu Dec 06 23:20:56 CST 2012
in
28 milliseconds
The filesystem under path
'/'
is HEALTHY
|
此外,也可以使用hadoop的dfsadmin打印hdfs文件系统的状态信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
$ hadoop dfsadmin -report
Configured Capacity: 41604677632 (38.75 GB)
Present Capacity: 38942863360 (36.27 GB)
DFS Remaining: 38942621696 (36.27 GB)
DFS Used: 241664 (236 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
-------------------------------------------------
Datanodes available: 1 (1 total, 0 dead)
Name: 127.0.0.1:50010
Decommission Status : Normal
Configured Capacity: 41604677632 (38.75 GB)
DFS Used: 241664 (236 KB)
Non DFS Used: 2661814272 (2.48 GB)
DFS Remaining: 38942621696(36.27 GB)
DFS Used%: 0%
DFS Remaining%: 93.6%
Last contact: Thu Dec 06 23:22:28 CST 2012
|
4.3 启用HDFS的回收站
HDFS支持文件删除的“回收站”功能,被删除的文件不会立即从存储空间中擦除,而是先被移入用户家目录中的.Trash子目录中,当文件删除的时长超出事先预定义的时长后就会被自动从回收站中删除,在此之前,用户可以随时恢复被删除的文件。默认情况下,HDFS禁用了回收站功能,如果想启用,只需要在core-site.xml配置文件中使用fs.trash.interval定义删除文件的保留时长即可,其时长的默认单位为分钟,值0表示禁用回收站。如下所示的例子中,定义了被删除文件的保留时长为7天。
1
2
3
4
|
<property>
<name>fs.trash.interval<
/name
>
<value>10080<
/value
>
<
/property
>
|
五、完全分布式Hadoop
在需要时,可以通过conf/hadoop-env.sh脚本自定义Hadoop进程的环境变量,至少,JAVA_HOME是每个节点所必须事先正确定义的环境变量。另外,也可以通过HADOOP_*_OPTS为对应的Hadoop的5类进程(NameNode、DataNode、SecondaryNameNode、JobTracker和TaskTracker)定义运行参数,比如:HADOOP_NAMENODE_OPTS用于定义NameNode进程的运行参数,以此类推。
此外,HADOOP_LOG_DIR用于定义进程日志文件的存储目录,HADOOP_HEAPSIZE用于定义Hadoop进程可用的堆空间大小,单位是MB,默认为1000MB。
5.1、配置Hadoop进程
5.1.1 配置NomeNode的URI
在配置文件conf/core-site.xml中定义fs.default.name参数即可。
5.1.2 配置HDFS的相关参数
在配置文件conf/hdfs-site.xml中定义dfs.name.dir(NameNode用于存储名称空间和事务日志的本地文件系统路径)和dfs.data.dir(DataNode用于存储数据块的一个或多个本地文件系统路径,有多个路径时彼此间用逗号隔开)两个参数即可。
5.1.3 配置Jobtracker和TaskTracker进程的相关参数
在conf/mapred-site.xml文件中进行,主要有以下参数:
mapred.job.tracker:JobTracker进程所在主机的主机名(或IP)和端口;
mapred.system.dir:MapReduce用于存储系统文件的HDFS文件系统路径,如/hadoop/mapred/system/;
mapred.local.dir:MapReduce用于存储临时数据的本地文件系统路径,指定多个路径可以分散I/O压力,彼此间需要使用逗号分隔;
mapred.tasktracker.{map|reduce}.tasks.maximum:在每一个TaskTracker上可以运行的MapReduce任务的最大数量,默认为2(maps任务和reduces任务各两个);
dfs.hosts/dfs.hosts.exclude:允许使用或禁止使用的DataNode列表;
mapred.hosts/mapred.hosts.exclude:允许使用或禁用的TaskTracker列表;
mapred.queue.names:可以接受提交的任务的队列名称列表;MapReduce至少支持一个默认的“default”队列,因此,此参数的值列表中中必须要包含default;
mapred.queue.queue-name.acl-administer-jobs:可以查看作业详情的用户和组的列表;名称和组各自使用一个列表,两个列表之间使用空格分隔,而每个列表内的名称则使用逗号分隔,如:user1,user2 grp1,grp2;如果仅定义组列表而不提供用户列表,在组列表之前加一个空白字符即可;
5.2、生产环境中常用的配置
仅参考:http://www.cnblogs.com/zhj983452257/p/5399414.html
......
......
......
5.3、安装完全分布式Hadoop
5.3.1 安装准备工作
本安装示例将使用三台主机(RHEL 5.8 32bit)来实现,其规划如下所示:
IP地址主机名运行的进程或扮演的角色
172.16.100.11 master.shine.comNameNode,JobTracker
172.16.100.12datanode.shine.comDataNode,TaskTracker
172.16.100.13snn.shine.comSecondaryNameNode
用到的应用程序:
JDK: jdk-7u5-linux-i586.rpm
Hadoop:hadoop-0.20.2-cdh3u5.tar.gz
先在集群中的每个节点上建立运行hadoop进程的用户hadoop并给其设定密码。
1
2
|
# useradd hadoop
# echo "password" | passwd --stdin hadoop
|
设置集群各节点的/etc/hosts文件内容如下:
1
2
3
|
172.16.100.11master.shine.commaster
172.16.100.12datanode.shine.comdatanode
172.16.100.13snn.shine.comsnn
|
而后配置master节点的hadoop用户能够以基于密钥的验正方式登录其它各节点,以便启动进程并执行监控等额外的管理工作。以下命令在master节点上执行即可。
1
2
3
4
|
[root@master ~]
# su - hadoop
[hadoop@master ~]$
ssh
-keygen -t rsa -P
''
[hadoop@master ~]$
ssh
-copy-
id
-i .
ssh
/id_rsa
.pub hadoop@datanode
[hadoop@master ~]$
ssh
-copy-
id
-i .
ssh
/id_rsa
.pub hadoop@snn
|
5.3.2 安装JDK
以下操作需要在每个节点上执行一遍。
编辑/etc/profile.d/java.sh,在文件中添加如下内容:
1
2
3
|
JAVA_HOME=
/usr/java/latest/
PATH=$JAVA_HOME
/bin
:$PATH
export
JAVA_HOME PATH
|
切换至hadoop用户,并执行如下命令测试jdk环境配置是否就绪。
1
2
3
4
5
|
# su - hadoop
$ java -version
java version
"1.7.0_05"
Java(TM) SE Runtime Environment (build 1.7.0_05-b05)
Java HotSpot(TM) Client VM (build 23.1-b03, mixed mode, sharing)
|
5.3.3 安装Hadoop
集群中的每个节点均需要安装Hadoop,以根据配置或需要启动相应的进程等,因此,以下安装过程需要在每个节点上分别执行。
1
2
3
|
# tar xf hadoop-0.20.2-cdh3u5.tar.gz -C /usr/local
# chown -R hadoop:hadoop /usr/local/hadoop-0.20.2-cdh3u5
# ln -sv /usr/local/hadoop-0.20.2-cdh3u5 /usr/local/hadoop
|
然后编辑/etc/profile,设定HADOOP_HOME环境变量的值为hadoop的解压目录,并让其永久有效。编辑/etc/profile.d/hadoop.sh,添加如下内容:
1
2
3
|
HADOOP_HOME=
/usr/local/hadoop
PATH=$HADOOP_HOME
/bin
:$PATH
export
HADOOP_BASE PATH
|
切换至hadoop用户,并执行如下命令测试jdk环境配置是否就绪。
1
2
3
4
5
|
$ hadoop version
Hadoop 0.20.2-cdh3u5
Subversion git:
//ubuntu-slave02/var/lib/jenkins/workspace/CDH3u5-Full-RC/build/cdh3/hadoop20/0
.20.2-cdh3u5
/source
-r 30233064aaf5f2492bc687d61d72956876102109
Compiled by jenkins on Fri Oct 5 17:21:34 PDT 2012
From
source
with checksum de1770d69aa93107a133657faa8ef467
|
5.3.4 配置Hadoop
集群中的每个节点上Hadoop的配置均相同,Hadoop在启动时会根据配置文件判定当前节点的角色及所需要运行的进程等,因此,下述的配置文件修改需要在每一个节点上运行。
(1) 修改/usr/local/hadoop/conf/core-site.xml内容如下
1
2
3
4
5
6
7
8
9
10
11
|
<?xml version=
"1.0"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<!-- Put site-specific property overrides
in
this
file
. -->
<configuration>
<property>
<name>fs.default.name<
/name
>
<value>hdfs:
//master
.shine.com:8020<
/value
>
<final>
true
<
/final
>
<description>The name of the default
file
system. A URI whose scheme and authority determine the FileSystem implimentation.<
/description
>
<
/property
>
<
/configuration
>
|
(2)修改/usr/local/hadoop/conf/mapred-site.xml文件为如下内容
1
2
3
4
5
6
7
8
9
10
11
|
<?xml version=
"1.0"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<!-- Put site-specific property overrides
in
this
file
. -->
<configuration>
<property>
<name>mapred.job.tracker<
/name
>
<value>master.shine.com:8021<
/value
>
<final>
true
<
/final
>
<description>The host and port that the MapReduce JobTracker runs at. <
/description
>
<
/property
>
<
/configuration
>
|
(3) 修改/usr/local/hadoop/conf/hdfs-site.xml文件为如下内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<?xml version=
"1.0"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<!-- Put site-specific property overrides
in
this
file
. -->
<configuration>
<property>
<name>dfs.replication<
/name
>
<value>1<
/value
>
<description>The actual number of replications can be specified when the
file
is created.<
/description
>
<
/property
>
<property>
<name>dfs.data.
dir
<
/name
>
<value>
/hadoop/data
<
/value
>
<final>ture<
/final
>
<description>The directories where the datanode stores blocks.<
/description
>
<
/property
>
<property>
<name>dfs.name.
dir
<
/name
>
<value>
/hadoop/name
<
/value
>
<final>ture<
/final
>
<description>The directories where the namenode stores its persistent matadata.<
/description
>
<
/property
>
<property>
<name>fs.checkpoint.
dir
<
/name
>
<value>
/hadoop/namesecondary
<
/value
>
<final>ture<
/final
>
<description>The directories where the secondarynamenode stores checkpoints.<
/description
>
<
/property
>
<
/configuration
>
|
说明:根据此配置,需要事先在各节点上创建/hadoop/,并让hadoop用户对其具有全部权限。也可以不指定最后三个属性,让Hadoop为其使用默认位置。
(4)修改/usr/local/hadoop/conf/masters文件,指定SecondaryNameNode节点的主机名或IP地址,本示例中为如下内容:
snn.shine.com
(5)修改/usr/local/hadoop/conf/slaves文件,指定各DataNode节点的主机名或IP地址,本示例中只有一个DataNode:
datanode.shine.com
(6)初始化数据节点,在master上执行如下命令
1
|
$ hadoop namenode -
format
|
5.3.5 启动Hadoop
在master节点上执行Hadoop的start-all.sh脚本即可实现启动整个集群。
1
|
[hadoop@master ~]$ start-all.sh
|
其输出内容如下所示:
1
2
3
4
5
|
starting namenode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-namenode-master
.shine.com.out
datanode.shine.com: starting datanode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-datanode-datanode
.shine.com.out
snn.shine.com: starting secondarynamenode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-secondarynamenode-node3
.shine.com.out
starting jobtracker, logging to
/usr/local/hadoop/logs/hadoop-hadoop-jobtracker-master
.shine.com.out
datanode.shine.com: starting tasktracker, logging to
/usr/local/hadoop/logs/hadoop-hadoop-tasktracker-datanode
.shine.com.out
|
如果要停止Hadoop的各进程,则使用stop-all.sh脚本即可。
不过,在一个较大规模的集群环境中,NameNode节点需要在内在中维护整个名称空间中的文件和块的元数据信息,因此,其有着较大的内在需求;而在运行着众多MapReduce任务的环境中,JobTracker节点会用到大量的内存和CPU资源,因此,此场景中通常需要将NameNode和JobTracker运行在不同的物理主机上,这也意味着HDFS集群的主从节点与MapReduce的主从节点将分属于不同的拓扑。启动HDFS的主从进程则需要在NameNode节点上使用start-dfs.sh脚本,而启动MapReduce的各进程则需要在JobTracker节点上通过start-mapred.sh脚本进行。这两个脚本事实上都是通过hadoop-daemons.sh脚本来完成进程启动的。
5.4 环境设定
5.4.1 内存设定
默认情况下,Hadoop为每个进程分配1000MB(1GB)的内存空间,但这可以在hadoop-env.sh文件中通过HADOOP_HEAPSIZE环境变量进行调整。此外,TaskTracker会为worker主机上的每个map或reduce任务的分别启动一个JVM,这些JVM都需要在进程的HEAP中申请用到内存空间。每个TaskTracker可同时运行的map任务总数和reduce任务总数分别由mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum这两个属性进行指定,而它们的默认值都为2。用于运行map或reduce任务的JVM可用内存大小可由mapred.child.java.opts属性指定,其默认设定为-Xmx200m,意指每个任务可以使用最多200MB的内存空间。由此算来,每个worker主机默认将使用2800MB的内存空间。
除了使用mapred.child.java.opts为每个JVM为map任务和reduce任务设定相同可用内存属性之外,还可以使用mapreduce.map.java.opts和mapreduce.reduce.java.opts分别设定map任务和reduce任务的JVM可用内存的属性。
在TaskTracker上可同时运行的任务数取决于其主机的CPU数量。由于MapReduce作业大多为I/O密集型应用,因此,让同行运行任务数多于CPU的个数可以提高资源利用效率,其可以多出的数量取决于实际的作业本身。这里有一个通用法则,即让同行运行的作业数量为CPU数量的1至2之间的数字倍数,比如作业数1.5倍于CPU数。与此同时,一定要注意系统上可用内存的数量是否符合任务数的设定,比如在一个4颗CPU的DataNode上,可以设定mapred.tasktracker.map.tasks.maximum和mapred.tasktrackers.reduce.tasks.maximum属性的值都为3(不是4个,因为datanode和tasktracker都是运行的任务,它们要占去两个名额),假设每个任务的可用内存为400MB,那么这些作业总共需要的内存数为6*400MB,即2.4GB,考虑到操作系统及其它进程所需用的内存等,这个数值可能需要更大。
在hadoop-env.sh文件中,可以使用HADOOP_NAMENODE_OPTS、HADOOP_SECONDARYNAMENODE_OPTS、HADOOP_DATANODE_OPTS、HADOOP_BALANCER_OPTS、HADOOP_JOBTRACKER_OPTS变量分别为对应的5类进程设定运行参数,比如可以使用HADOOP_NAMENODE_OPTS单独设定namenode进程使用不的HEAPSIZE大小。
5.4.2 Hadoop日志
Hadoop的系统日志默认存放于其安装目录中的logs子目录,如果需要自定义其存储位置,可以在hadoop-env.sh中设定HADOOP_LOG_DIR环境变量来指定新位置。Hadoop的每个进程均会生成两个日志文件,一个是由log4j生成的并以.log为后缀的日志文件,另一个是以.out为后缀的日志文件,它负责记录发往标准输出和错误输出的信息。
大多数应用的日志住处均发送至.log类的文件中,因此在故障排查时其也是最需要关注的日志文件。然后,Hadoop为log4j提供的默认配置中,其每天会自动做一次日志滚动,并且永远不会删除日志文件,因此,管理员需要手动归档并删除老旧的日志文件。.out类的日志文件中很少出现信息,而且,Hadoop的每次重启都会导致日志自动滚动,并只会保存最至5次的滚动日志文件版本。
除了后缀不同之外,这两种日志文件的名称格式是一样的,默认均为hadoop-<username>-<processname>-<hostname>,比如hadoop-hadoop-namenode-master.shine.com.log。可以在hadoop-env.sh文件中使用HADOOP_IDENT_STRING变量将日志文件名称格式中的<username>限制为所需要的字符串。
5.4.3 SSH相关的设定
Hadoop运行时,它利用控制脚本(如start-dfs.sh)在master节点上基于SSH远程管理各worker节点上的进程,因此,为SSH设定合适的参数将用助于Hadoop的健壮性,如合适的连接超时时间(ConnectTimeout)将有助于Hadoop避免被失效的节点阻塞,将StrictHostKeyChecking设定为no将能够使得master自动添加各节点的主机密钥等。
Hadoop的控制脚本可以使用rsync将配置文件同步至集群中的其它节点,默认为禁用。如果需要,可以通过HADOOP_MASTER变量将其启用。不过,由于各点上HADOOP_MASTER默认均为禁用,因此,其第一次配置还是需要其它的方式进行。
其它各SSH相关参数说明及其用法在hadoop-env.sh中均有详细注释,因此,这里不再给出进一步信息。
5.5 Hadoop的其它常用属性
5.5.1 缓冲大小(Buffer size)
Hadoop为其I/O操作使用了4KB的缓冲区容量,这个值是相当保守的。在当今的硬件和操作系统上,可以安全地增大此值以提高系统性能;一般说来,128KB(131072 bytes)是比较理想的设定。如果需要,可以在core-site.xml中通过io.file.buffer.size属性进行定义。
5.5.2 HDFS块大小
HDFS默认块大小为64MB,然而128MB(134,217,728 bytes)是更常用的设定,甚至有些集群中使用了256MB。较大的块可以有效降低NameNode上的内存压力,并能支持使用更多的数据量。如果需要,可以在hdfs-site.xml文件中使用dfs.block.size属性进行定义。
5.5.3 慢启动Reduce任务
在一个作业中,默认情况下调度器会在map任务完成5%时开始调度启动当前作业的reduce任务。对于较大的作业来说,过早地启动reduce会导致集群性能的下降。在mapred-site.xml文件中为mapred.reduce.slowstart.completed.maps属性来设定一个更大的值(比如0.8,即80%)可以在更晚的时间点启动reduce作业。