Hadoop
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。
官网相关介绍:
What Is Apache Hadoop?
The Apache Hadoop project develops open-source software for reliable, scalable, distributed computing.
The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
The project includes these modules:
Hadoop Common: The common utilities that support the other Hadoop modules.
Hadoop Distributed File System (HDFS): A distributed file system that provides high-throughput access to application data.
Hadoop YARN: A framework for job scheduling and cluster resource management.
Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.
Other Hadoop-related projects at Apache include:
Ambari: A web-based tool for provisioning, managing, and monitoring Apache Hadoop clusters which includes support for Hadoop HDFS, Hadoop MapReduce, Hive, HCatalog, HBase, ZooKeeper, Oozie, Pig and Sqoop. Ambari also provides a dashboard for viewing cluster health such as heatmaps and ability to view MapReduce, Pig and Hive applications visually alongwith features to diagnose their performance characteristics in a user-friendly manner.
Avro: A data serialization system.
Cassandra: A scalable multi-master database with no single points of failure.
Chukwa: A data collection system for managing large distributed systems.
HBase: A scalable, distributed database that supports structured data storage for large tables.
Hive: A data warehouse infrastructure that provides data summarization and ad hoc querying.
Mahout: A Scalable machine learning and data mining library.
Pig: A high-level data-flow language and execution framework for parallel computation.
Spark: A fast and general compute engine for Hadoop data. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
Tez: A generalized data-flow programming framework, built on Hadoop YARN, which provides a powerful and flexible engine to execute an arbitrary DAG of tasks to process data for both batch and interactive use-cases. Tez is being adopted by Hive, Pig and other frameworks in the Hadoop ecosystem, and also by other commercial software (e.g. ETL tools), to replace Hadoop MapReduce as the underlying execution engine.
ZooKeeper: A high-performance coordination service for distributed applications.
一、MapReduce理论基础
每个MapReduce job都是Hadoop客户端想要执行的一个工作单元,它一般由输入数据、MapReduce程序和配置信息组成,而Hadoop会把每个job分隔成两类任务(task):map任务和reduce任务。在Hadoop集群中有两类节点来执行两类job进程的执行
1.1 大数据处理
任何基础业务包含了收集、分析、监控、过滤、搜索或组织web内容的公司或组织都面临着所谓的“大数据”问题:“web规模”处理即海量数据处理的代名词。社交类网站的兴起也使得这些组织面临着另一个问题:用户行为数据分析,这涉及到通过日志文件记录用户的对web页面浏览、点击、停留时长等,而后对日志文件中的大量数据进行分析以支持进行合理、正确的商业决策。
那么,大数据处理究竟意味着对多大规模的数据进行处理?一个简单的例子:Google在2004年平均每天利用MapReduce处理100GB的数据,到2008年平均每天处理的数据已经达到20PB;2009年,Facebook的数据量达到2.5PB,且以每天15TB的速度在增长。PB级别的数据集正变得越来越常见,大数据时代的到来已然是不争的事实,密集数据处理也正迅速成为现实需求。
大数据问题的处理需要以与传统数据处理方式所不同的方法去实现,这正是MapReduce思想得以大放光彩的核心所在。MapReduce在实现大数据处理上有着多个基础理论思想的支撑,然而这些基础理论甚至实现方法都未必是MapReduce所创,它们只是被MapReduce采用独特的方式加以利用而已。
(1) 向外扩展(Scale out)而非向上扩展(Scale up):大数据的处理更适合采用大量低端商业服务器(scale out)而非少量高端服务器(scale up)。后者正是向上扩展的系统性能提升方式,它通常采用有着SMP架构的主机,然而有着大量的CPU插槽(成百上千个)及大量的共享内存(可以多达数百GB)的高端服务器非常昂贵,但其性能的增长却非线性上升的,因此性价比很一般。而大量的低端商业服务器价格低廉、易于更换和伸缩等特性有效避免了向上扩展的敝端。
(2)假设故障很常见(Assume failures are common):在数据仓库架构级别,故障是不可避免且非常普遍的。假设一款服务器出故障的平均概率为1000天1次,那么10000台这种服务器每天出错的可能性将达到10次。因此,大规模向外扩展的应用场景中,一个设计优良且具有容错能力的服务必须能有效克服非常普遍的硬件故障所带来的问题,即故障不能导致用户应用层面的不一致性或非确定性。MapReduce编程模型能通过一系列机制如任务自动重启等健壮地应付系统或硬件故障。
(3)将处理移向数据(Move processing to the data):传统高性能计算应用中,超级计算机一般有着处理节点(processing node)和存储节点(storage node)两种角色,它们通过高容量的设备完成互联。然而,大多数数据密集型的处理工作并不需要多么强大的处理能力,于是把计算与存储互相分开将使得网络成为系统性能瓶颈。为了克服计算如此类的问题,MapReduce在其架构中将计算和存储合并在了一起,并将数据处理工作直接放在数据存储的位置完成,只不过这需要分布式文件系统予以支撑。
(4)顺序处理数据并避免随机访问(Process data sequentially and avoid random access):大数据处理通常意味着海量的数量难以全部载入内存,因而必须存储在磁盘上。然而,机械式磁盘寻道操作的先天性缺陷使得随机数据访问成为非常昂贵的操作,因此避免随机数据访问并以顺序处理为目的完成数据组织成为亟待之需。固态磁盘虽然避免了机械磁盘的某此缺陷,然而其高昂的价格以及并没有消除的随机访问问题仍然无法带来性能上的飞跃发展。MapReduce则主要设计用来在海量数据集上完成批处理操作,即所有的计算被组织成较长的流式处理操作,以延迟换取较大的吞吐能力。
(5)向程序员隐藏系统级别的细节(Hide system-level details from the application developer):
(6)无缝扩展(Seamless scalability):
1.2 MapReduce和大数据问题
海量数据处理的核心思想无非是将一个较大的问题进行“分割包围、逐个歼灭”。然而其难点和关键点在于如何将一个大的问题分分割成多个可以分别在不同的CPU上或不同的主机上进行处理的独立小问题,而且这些独立进行处理的小问题所产生的中间结果又该如何合并成最终结果并予以输出。因此,看似简单的化整为零的处理思想却不得不面临如下的难题:
(1) 如何将大问题分割为小任务?进一步地,如何将大问题分解为可以并行处理的小任务?
(2) 如何将分解好的小任务派送给分布式系统中的某主机且是较为适合解决此问题的主机上的worker完成处理?
(3) 如何保证某worker获取所需的数据?
(4) 如何协调不同worker之间进行同步?
(5) 如何将某worker的部分结果共享给其它需要此结果的worker?
(6) 如何在出现软件或硬件故障时仍然能保证上述工作的顺利进行?
在传统的并行或分布式编程模型中,程序员不得不显式地解决上述的部分甚至是全部问题,而在共享内存编程中,程序员需要显式地协调对共享数据结构的如互斥锁的访问、显式地通过栅(barrier)等设备解决进程同步问题、并得时刻警惕着程序中可能出现的死锁或竞争条件。虽然有些编程语言也或多或少地规避了让程序员面对上述问题,但却也避免不了将资源分配给各worker的问题。MapReduce的优势之一便是有效地向程序员隐藏了这些问题。
1.3 函数式编译语言
MapReduce是一种类似于Lisp或ML的函数式编程语言。函数式编程的核心特性之一是基于高阶函数,即能够接受其它函数作为参数的函数完成编程。MapReduce有两个常见地内置高阶函数map和fold。
如图所示,给定一个列表,map(接受一个参数)以函数f为其参数并将其应用于列表中的所有元素;fold(接受两个参数)以函数g和一个初始值作为参数,然后将g应用于初始值和列表中的第一个元素,结果被放置于中间变量中。中间变量和第二个元素将作为g函数下一次应用时的参数,而后如此操作直至将列表中的所有元素处理完毕后,fold会将最终处理结果保存至一个中间变量中。
于是,基于上述过程,我们可以把map视作利用f函数将给定数据集完成形式转换的操作,同样地,fold就可以被看作利用g函数完成数据聚合的操作。我们就可以由此得知,各函数式程序在运行时彼此间是隔离的,因此,在map中将f函数应用于列表中每一个元素的操作可以并行进行,进一步地讲,它们可以分布于集群中的不同节点上并行执行。然而,受限于数据的本地性,fold操作需要等到列表中的每一个元素都准备停当之后才能进行。幸运地是,现实生活中的应用程序并不要求g函数应用于列表中的所有元素,因此,列表中元素可以被分为多个逻辑组,并将fold操作并行地应用在这些逻辑组上即可。由此,fold操作也可以以并行的方式高效完成。
MapReduce有两个常见地内置高阶函数map和reduce,其map就类似于上述过程中的map操作,reduce对应于上述过程中的fold操作。只不过,MapReduce的执行框架能自行协调map与reduce并将其应用于在商业服务器硬件平台上并行处理海量数据。
更为精确地说,MapReduce有三个相互关联却各不相同的概念。首先,MapReduce是一个如上所述的函数式编程语言。其次,MapReduce也是一个运行框架,它能够协调运行基于MapReduce思想开发的程序。最后,MapReduce还可以被看作编程模型和执行框架的软件实现,如Google的专有实现和另一个开源实现Hadoop等。
1.4 mapper和reducer
键值对儿(Key-value pair)是MapReduce的基础数据结构。Key和Value可以是基础类型数据,如整数、浮点数、字符串或未经加工的字节数据,也可以是任意形式的复杂数据类型。程序员可以自行定义所需的数据类型,也可借助于Protocol Buffer、Thrift或Avro提供的便捷方式完成此类工作。
MapReduce算法设计的工作之一就是在给定数据集上定义“键-值”数据结构,比如在搜索引擎搜集、存储网页类工作中,key可以使用URL来表示,而value则是网页的内容。而在有些算法中,Key也可以是没有任何实际意义的数据,其在数据处理过程中可被安全忽略。在MapReduce中,程序员需要基于如下方式定义mapper和reducer:
map: (k1,v1)-->[(k2,v20)]
reduce: (k2,[v2])-->[(k3,v3)]
其中[...]意味着其可以是一个列表。这些传递给MapReduce进行处理的数据存储于分布式文件上,mapper操作将应用于每一个传递过来的键-值对并生成一定数量的中间键值对(intermediate key-value),而后reduce操作将应用于这些中间键值对并输出最终的键值对。然而,mapper操作和reducer操作之间还隐含着一个应用于中间键值对的“分组”操作,同一个键的键值对需要被归类至同一组中并发送至同一个reducer,而传送给每个reducer的分组中的键值对是基于键进行排序后的列表。reducer生成的结果将会保存至分布式文件系统,并存储为一个或多个以r(即reducer号码)结尾的文件,但mapper生成的中间键值对数据则不会被保存。
在Hadoop中,mapper和reducer是分别由MAP和REDUCE方法实现的对象。每个map任务(接收一个称作input split的键值对列表)都被初始化一个mapper对象,并会由执行框架为每个输入的键值对调用一次其map方法。程序员可以配置启动的map任务个数,但其真正启动的数目则由执行框架根据数据的物理分布最终给定。类似地,每个reduce任务由REDUCE方法初始化为一个reduce对象,并会由执行框架为其接受的每个中间键值对调用一次REDUCE方法,所不同的是,程序员可以明确限定启动的reduce任务的个数。
mapper和reducer可以直接在各自接收的数据上执行所需要的操作,然而,当使用到外部资源时,多个mapper或reducer之间可能会产生资源竞争,这势必导致其性能下降,因此,程序员必须关注其所用资源的竞争条件并加入适当处理。其次,mapper输出的中间键值对与接受的键值对可以是不同的数据类型,类似地,reducer输出的键值对与其接收的中间键值对也可以是不同的数据类型,这可能会给编程过程及程序运行中的故障排除带来困难,但这也正是MapReduce强大功能的体现之一。
除了常规的两阶段MapReduce处理流外,其还有一些变化形式。比如将mapper输出的结果直接保存至磁盘中(每个mapper对应一个文件)的没有reducer的MapReduce作业,不过仅有reducer而没有mapper的作业是不允许的。不过,就算用不着reducer处理具体的操作,利用reducer将mapper的输出结果进行重新分组和排序后进行输出也能以另一种形式提供的完整MapReduce模式。
MapReduce作业一般是通过HDFS读取和保存数据,但它也可以使用其它满足MapReduce应用的数据源或数据存储,比如Google的MapReduce实现中使用了Bigtable来完成数据的读入或输出。BigTable属于非关系的数据库,它是一个稀疏的、分布式的、持久化存储的多维度排序Map,其设计目的是可靠的处理PB级别的数据,并且能够部署到上千台机器上。在Hadoop中有一个类似的实现HBase可用于为MapReduce提供数据源和数据存储。
1.5 Hadoop运行框架
MapReduce程序也称作为MapReduce作业,一般由mapper代码、reducer代码以及其配置参数(如从哪儿读入数据,以及输出数据的保存位置)组成。准备好的作业可通过JobTracker(作业提交节点)进行提交,然后由运行框架负责完成后续的其它任务。这些任务主要包括如下几个方面。
(1) 调度
每个MapReduce作业都会划分为多个称作任务(task)的较小单元,而较大的作业划分的任务数量也可能会超出整个集群可运行的任务数,此时就需要调度器程序维护一个任务队列并能够追踪正在运行态任务的相关进程,以便让队列中处于等待状态的任务派送至某转为可用状态的节点运行。此外,调度器还要负责分属于不同作业的任务协调工作。
对于一个运行中的作业来说,只有所用的map任务都完成以后才能将中间数据分组、排序后发往reduce作业,因此,map阶段的完成时间取决于其最慢的一个作业的完成时间。类似的,reduce阶段的最后一个任务执行结束,其最终结果才为可用。因此,MapReduce作业完成速度则由两个阶段各自任务中的掉队者决定,最坏的情况下,这可能会导致作业长时间得不到完成。出于优化执行的角度,Hadoop和Google MapReduce实现了推测执行(Speculative execution)机制,即同一个任务会在不同的主机上启动多个执行副本,运行框架从其最快执行的任务中取得返回结果。不过,推测执行并不能消除其它的滞后场景,比如中间键值对数据的分发速度等。
(2) 数据和代码的协同工作(data/code co-location)
术语“数据分布”可能会带来误导,因为MapReduce尽力保证的机制是将要执行的代码送至数据所在的节点执行,因为代码的数据量通常要远小于要处理的数据本身。当然,MapReduce并不能消除数据传送,比如在某任务要处理的数据所在的节点已经启动很多任务时,此任务将不得不在其它可用节点运行。此时,考虑到同一个机架内的服务器有着较充裕的网络带宽,一个较优选择是从数据节点同一个机架内挑选一个节点来执行此任务。
(3) 同步(Synchronization)
异步环境下的一组并发进程因直接制约而互相发送消息而进行互相合作、互相等待,使得各进程按一定的速度执行的过程称为进程间同步,其可分为进程同步(或者线程同步)和数据同步。就编程方法来说,保持进程间同步的主要方法有内存屏障(Memory barrier),互斥锁(Mutex),信号量(Semaphore)和锁(Lock),管程(Monitor),消息(Message),管道(Pipe)等。MapReduce是通过在map阶段的进程与reduce阶段的进程之间实施隔离来完成进程同步的,即map阶段的所有任务都完成后对其产生的中间键值对根据键完成分组、排序后通过网络发往各reducer方可开始reduce阶段的任务,因此这个过程也称为“shuffle and sort”。
(4) 错误和故障处理(Error and fault handling)
MapReduce运行框架本身就是设计用来容易发生故障的商用服务器上了,因此,其必须有着良好的容错能力。在任何类别的硬件故障发生时,MapReduce运行框架均可自行将运行在相关节点的任务在一个新挑选出的节点上重新启动。同样,在任何程序发生故障时,运行框架也要能够捕获异常、记录异常并自动完成从异常中恢复。另外,在一个较大规模的集群中,其它任何超出程序员理解能力的故障发生时,MapReduce运行框架也要能够安全挺过。
1.6 partitioner和combiner
除了前述的内容中的组成部分,MapReduce还有着另外两个组件:partiontioner和combiner。
Partitioner负责分割中间键值对数据的键空间(即前面所谓的“分组”),并将中间分割后的中间键值对发往对应的reducer,也即partitioner负责完成为一个中间键值对指派一个reducer。最简单的partitioner实现是将键的hash码对reducer进行取余计算,并将其发往余数对应编号的reducer,这可以尽力保证每个reducer得到的键值对数目大体上是相同的。不过,由于partitioner仅考虑键而不考虑“值”,因此,发往每个reducer的键值对在键数目上的近似未必意味着数据量的近似。
Combiner是MapReduce的一种优化机制,它的主要功能是在“shuffle and sort”之前先在本地将中间键值对进行聚合,以减少在网络上发送的中间键值对数据量。因此可以把combiner视作在“shuffle and sort”阶段之前对mapper的输出结果所进行聚合操作的“mini-reducer”。在实现中,各combiner之间的操作是隔离的,因此,它不会涉及到其它mapper的数据结果。需要注意的是,就算是某combiner可以有机会处理某键相关的所有中间数据,也不能将其视作reducer的替代品,因为combiner输出的键值对类型必须要与mapper输出的键值对类型相同。无论如何,combiner的恰当应用将有机会有效提高作业的性能。
2.1 HDFS的设计理念
HDFS专为存储大文件而设计,可运行于普通的商业服务器上,基于流式数据访问模型完成数据存取。HDFS将所有文件的元数据存储于名称节点(NameNode)的内存中,能够利用分布式特性高效地管理“大”文件(GB级别甚至更大的文件),对于有着海量小文件的应用场景则会给名称节点带去巨大压力并使得其成为系统性能瓶颈。再者,HDFS为MapReduce的计算框架而设计,存储下来数据主要用于后续的处理分析,其访问模型为“一次写入、多次读取”;因此,数据在HDFS中存储完成后,仅能在文件尾部附加新数据,而不能对文件进行修改。另外,HDFS专为了高效地传输大文件进行了优化,其为了完成此目标,在“低延迟”特性上做出了很大让步,因此,其不适用于较小访问延迟的应用。
2.2 HDFS架构
2.2.1 HDFS数据块
与传统文件系统一样,HDFS也在“块(block)”级别存取文件,所不同的是,传统文件系统数据块一般较小(1KB、2KB或4KB等),HDFS的数据块大小默认为64MB,甚至可以使用128MB或256MB级别的数据块。HDFS使用块抽象层管理文件,可以实现将分块分为多个逻辑部分后分布于多个存储节点,也能够有效简化存储子系统。而对于存储节点来说,较大的块可以减少磁盘的寻道次数,进而提升I/O性能。
2.2.2 名称节点(NameNode)和数据节点(DataNode)
HDFS集群中节点的工作模型为“master-worker”:其包含一个名称节点(master)和多个数据节点(worker)。
名称节点负责管理HDFS的名称空间,即以树状结构组织的目录及文件的元数据信息,这些信息持久存储于名称节点本地磁盘上并保存为名称空间镜像(namespace image)和编辑日志(edit log)两个文件。名称节点并不存储数据块,它仅需要知道每个文件对应数据块的存储位置,即真正存储了数据块的数据节点。然而,名称节点并不会持久存储数据块所与其存储位置的对应信息,因为这些信息是在HDFS集群启动由名称节点根据各数据节点发来的信息进行重建而来。这个重建过程被称为HDFS的安全模式。数据节点的主要任务包括根据名称节点或客户的要求完成存储或读取数据块,并周期性地将其保存的数据块相关信息报告给名称节点。
默认情况下,HDFS会在集群中为每个数据块存储三个副本以确保数据的可靠性、可用性及性能表现。在一个大规模集群中,这三个副本一般会保存至不同机架中的数据节点上以应付两种常见的故障:单数据节点故障和导致某机架上的所有主机离线的网络故障。另外,如前面MapReduce运行模型中所述,为数据块保存多个副本也有利于MapReduce在作业执行过程中透明地处理节点故障等,并为MapReduce中作业协同处理以提升性能提供了现实支撑。名称节点会根据数据节点的周期性报告来检查每个数据块的副本数是否符合要求,低于配置个数要求的将会对其进行补足,而多出的将会被丢弃。
HDFS提供了POSIX网络的访问接口,所有的数据操作对客户端程序都是透明的。当客户端程序需要访问HDFS中的数据时,它首先基于TCP/IP协议与名称节点监听的TCP端口建立连接,接着通过客户端协议(Client Protocol)发起读取文件的请求,而后名称节点根据用户请求返回相关文件的块标识符(blockid)及存储了此数据块的数据节点。接下来客户端向对应的数据节点监听的端口发起请求并取回所需要数据块。当需要存储文件并写数据时,客户端程序首先会向名称节点发起名称空间更新请求,名称节点检查用户的访问权限及文件是否已经存在,如果没有问题,名称空间会挑选一个合适的数据节点分配一个空闲数据块给客户端程序。客户端程序直接将要存储的数据发往对应的数据节点,在完成存储后,数据节点将根据名称节点的指示将数据块复制多个副本至其它节点。
2.2.3 名称节点的可用性
由前一节所述的过程可以得知,名称节点的宕机将会导致HDFS文件系统中的所有数据变为不可用,而如果名称节点上的名称空间镜像文件或编辑日志文件损坏的话,整个HDFS甚至将无从重建,所有数据都会丢失。因此,出于数据可用性、可靠性等目的,必须提供额外的机制以确保此类故障不会发生,Hadoop为此提供了两种解决方案。
最简单的方式是将名称节点上的持久元数据信息实时存储多个副本于不同的存储设备中。Hadoop的名称节点可以通过属性配置使用多个不同的名称空间存储设备,而名称节点对多个设备的写入操作是同步的。当名称节点故障时,可在一台新的物理主机上加载一份可用的名称空间镜像副本和编辑日志副本完成名称空间的重建。然而,根据编辑日志的大小及集群规模,这个重建过程可能需要很长时间。
另一种方式是提供第二名称节点(Secondary NameNode)。第二名称节点并不真正扮演名称节点角色,它的主要任务是周期性地将编辑日志合并至名称空间镜像文件中以免编辑日志变得过大。它运行在一个独立的物理主机上,并需要跟名称节点同样大的内存资源来完成文件合并。另外,它还保存一份名称空间镜像的副本。然而,根据其工作机制可知,第二名称节点要滞后于主节点,因此名称节点故障时,部分数据丢失仍然不可避免。
尽管上述两种机制可以最大程序上避免数据丢失,但其并不具有高可用的特性,名称节点依然是一个单点故障,因为其宕机后,所有的数据将不能够被访问,进而所有依赖于此HDFS运行的MapReduce作业也将中止。就算是备份了名称空间镜像和编辑日志,在一个新的主机上重建名称节点并完成接收来自各数据节点的块信息报告也需要很长的时间才能完成。在有些应用环境中,这可能是无法接受的,为此,Hadoop 0.23引入了名称节点的高可用机制——设置两个名称节点工作于“主备”模型,主节点故障时,其所有服务将立即转移至备用节点。进一步信息请参考官方手册。
在大规模的HDFS集群中,为了避免名称节点成为系统瓶颈,在Hadoop 0.23版本中引入了HDFS联邦(HDFS Federation)机制。HDFS联邦中,每个名称节点管理一个由名称空间元数据和包含了所有块相关信息的块池组成名称空间卷(namespace volume),各名称节点上的名称空间卷是互相隔离的,因此,一个名称节点的损坏并不影响其它名称节点继续提供服务。进一步信息请参考官方手册。
二、安装配置hadoop:
2.1 安装前的准备工作
本示例所演示的过程基于RHEL 5.8(32bit)平台,用到的应用程序如下所示。
JDK: jdk-7u5-linux-i586.rpm
Hadoop:hadoop-0.20.2-cdh3u5.tar.gz
安全起见,运行Hadoop需要以普通用户的身份进行,因此,接下来先建立运行hadoop进程的用户hadoop并给其设定密码。
1
2
|
# useradd hadoop
# echo "password" | passwd --stdin hadoop
|
而后配置hadoop用户能够以基于密钥的验正方式登录本地主机,以便Hadoop可远程启动各节点上的Hadoop进程并执行监控等额外的管理工作。
1
2
3
|
[root@master ~]
# su - hadoop
[hadoop@master ~]$
ssh
-keygen -t rsa -P
''
[hadoop@master ~]$
ssh
-copy-
id
-i .
ssh
/id_rsa
.pub hadoop@localhost
|
2.2 安装JDK
Hadoop依赖于1.6 update 8或更新版本的Java环境。本文采用的jdk是rpm格式的安装包,在oracle官方的下载页面中即可找到合适的版本。其安装过程非常简单,使用类似如下命令即可。
1
|
# rpm -ivh jdk-7u5-linux-i586.rpm
|
Hadoop运行时需要能访问到如前安装的Java环境,这可以通过将其二进制程序(/usr/java/latest)所在的目录添加至PATH环境变量的路径中实现,也可以通过设定hadoop-env.sh脚本来进行。这里采用前一种方式,编辑/etc/profile.d/java.sh,在文件中添加如下内容:
1
2
3
|
JAVA_HOME=
/usr/java/latest/
PATH=$JAVA_HOME
/bin
:$PATH
export
JAVA_HOME PATH
|
切换至hadoop用户,并执行如下命令测试jdk环境配置是否就绪。
1
2
3
4
5
|
# su - hadoop
$ java -version
java version
"1.7.0_05"
Java(TM) SE Runtime Environment (build 1.7.0_05-b05)
Java HotSpot(TM) Client VM (build 23.1-b03, mixed mode, sharing)
|
2.3 hadoop安装配置
2.3.1 安装:
1
2
3
|
# tar xf hadoop-0.20.2-cdh3u5.tar.gz -C /usr/local
# chown -R hadoop:hadoop /usr/local/hadoop-0.20.2-cdh3u5
# ln -sv /usr/local/hadoop-0.20.2-cdh3u5 /usr/local/hadoop
|
然后编辑/etc/profile.d/hadoop.sh,设定HADOOP_HOME环境变量的值为hadoop的解压目录,并让其永久有效。编辑/etc/profile,添加如下内容:
1
2
3
|
HADOOP_BASE=
/usr/local/hadoop
PATH=$HADOOP_BASE
/bin
:$PATH
export
HADOOP_BASE PATH
|
切换至hadoop用户,并执行如下命令测试hadoop是否就绪。
1
2
3
4
5
|
# hadoop version
Hadoop 0.20.2-cdh3u5
Subversion git:
//ubuntu-slave02/var/lib/jenkins/workspace/CDH3u5-Full-RC/build/cdh3/hadoop20/0
.20.2-cdh3u5
/source
-r 30233064aaf5f2492bc687d61d72956876102109
Compiled by jenkins on Fri Oct 5 17:21:34 PDT 2012
From
source
with checksum de1770d69aa93107a133657faa8ef467
|
2.3.2 Hadoop的配置文件:
hadoop-env.sh: 用于定义hadoop运行环境相关的配置信息,比如配置JAVA_HOME环境变量、为hadoop的JVM指定特定的选项、指定日志文件所在的目录路径以及master和slave文件的位置等;
core-site.xml: 用于定义系统级别的参数,如HDFS URL、Hadoop的临时目录以及用于rack-aware集群中的配置文件的配置等,此中的参数定义会覆盖core-default.xml文件中的默认配置;
hdfs-site.xml: HDFS的相关设定,如文件副本的个数、块大小及是否使用强制权限等,此中的参数定义会覆盖hdfs-default.xml文件中的默认配置;
mapred-site.xml:HDFS的相关设定,如reduce任务的默认个数、任务所能够使用内存的默认上下限等,此中的参数定义会覆盖mapred-default.xml文件中的默认配置;
masters: hadoop的secondary-masters主机列表,当启动Hadoop时,其会在当前主机上启动NameNode和JobTracker,然后通过SSH连接此文件中的主机以作为备用NameNode;
slaves:Hadoop集群的slave主机列表,master启动时会通过SSH连接至此列表中的所有主机并为其启动DataNode和taskTracker进程;
2.3.3 Hadoop的分布式模型
Hadoop通常有三种运行模式:本地(独立)模式、伪分布式(Pseudo-distributed)模式和完全分布式(Fully distributed)模式。
安装完成后,Hadoop的默认配置即为本地模式,此时Hadoop使用本地文件系统而非分布式文件系统,而且其也不会启动任何Hadoop守护进程,Map和Reduce任务都作为同一进程的不同部分来执行。因此,本地模式下的Hadoop仅运行于本机。此种模式仅用于开发或调试MapReduce应用程序但却避免了复杂的后续操作。
伪分布式模式下,Hadoop将所有进程运行于同一台主机上,但此时Hadoop将使用分布式文件系统,而且各jobs也是由JobTracker服务管理的独立进程。同时,由于伪分布式的Hadoop集群只有一个节点,因此HDFS的块复制将限制为单个副本,其secondary-master和slave也都将运行于本地主机。此种模式除了并非真正意义的分布式之外,其程序执行逻辑完全类似于完全分布式,因此,常用于开发人员测试程序执行。
要真正发挥Hadoop的威力,就得使用完全分布式模式。由于ZooKeeper实现高可用等依赖于奇数法定数目(an odd-numbered quorum),因此,完全分布式环境需要至少三个节点。
2.3.4 配置Hadoop的伪分布式模式
传统上使用的hadoop-site.xml文件已经过时,现在分别使用core-site.xml、mapred-site.xml和hdfs-site.xml来取代core-default.xml、mapred-default.xml和 hdfs-default.xml中的默认配置。hadoop为这些文件提供了模板,其关于xml文档文件格式定义的部分及<configure></configure>已经存在,此时所需要做的仅是在其中添加相应的配置即可。
2.3.4.1 编辑conf/core-site.xml,配置Hadoop的核心属性
1
2
3
4
5
6
7
8
9
10
11
12
13
|
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<configuration>
<property>
<name>hadoop.tmp.
dir
<
/name
>
<value>
/hadoop/temp
<
/value
>
<
/property
>
<property>
<name>fs.default.name<
/name
>
<value>hdfs:
//localhost
:8020<
/value
>
<
/property
>
<
/configuration
>
|
上面示例中hadoop.tmp.dir属性用于定义Hadoop的临时目录,其默认为/tmp/hadoop-${username}。HDFS进程的许多目录默认都在此目录中,本示例将其定义到了/hadoop/temp目录,需要注意的是,要保证运行Hadoop进程的用户对其具有全部访问权限。fs.default.name属性用于定义HDFS的名称节点和其默认的文件系统,其值是一个URI,即NameNode的RPC服务器监听的地址(可以是主机名)和端口(默认为8020)。其默认值为file:///,即本地文件系统。
2.3.4.2 编辑conf/mapred-site.xml,定义MapReduce
运行MapReduce需要为其指定一个主机作为JobTracker节点,在一个小规模的Hadoop集群中,它通常跟NameNode运行于同一物理主机。可以通过mapred.job.trakcer属性定义JobTracker监听的地址(或主机名)和端口(默认为8021),与前面的fs.default.name属性的值不同的是,这不是一个URI,而仅一个“主机-端口”组合。
在MapReduce作业运行过程中,中间数据(intermediate data)和工作文件保存于本地临时文件中。根据运行的MapReduce作业不同,这些数据文件可能会非常大,因此,应该通过mapred.local.dir属性为其指定一个有着足够空间的本地文件系统路径,其默认值为${hadoop.tmp.dir}/mapred/local。mapred.job.tracker可以接受多个以逗号分隔路径列表作为其值,并会以轮流的方式将数据分散存储在这些文件系统上,因此指定位于不同磁盘上的多个文件系统路径可以分散数据I/O。
另外,MapReduce使用分布式文件系统为各TaskTracker保存共享数据,这可以通过mapred.system.dir属性进行定义,其默认值为${hadoop.tmp.dir}/mapred/system。下面给出了一个较简单的mapred-site.xml文件示例。
1
2
3
4
5
6
7
8
|
<?xml version=
"1.0"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<configuration>
<property>
<name>mapred.job.tracker<
/name
>
<value>localhost:8021<
/value
>
<
/property
>
<
/configuration
>
|
2.3.4.3 编辑conf/hdfs-site.xml,定义hdfs的属性
HDFS进程有许多属性可以定义其工作路,如dfs.name.dir属性定义的HDFS元数据持久存储路径默认为${hadoop.tmp.dir}/dfs/name、dfs.data.dir属性定义的DataNode用于存储数据块的目录路径默认为${hadoop.tmp.dir}/dfs/data、fs.checkpoint.dir属性定义的SecondaryNameNode用于存储检查点文件的目录默认为${hadoop.tmp.dir}/dfs/namesecondary。
为了数据可用性及冗余的目的,HDFS会在多个节点上保存同一个数据块的多个副本,其默认为3个。而只有一个节点的伪分布式环境中其仅用保存一个副本即可,这可以通过dfs.replication属性进行定义。如下所示的内容即可作为最简单的hdfs-site.xml配置文件。
1
2
3
4
5
6
7
8
|
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<?xml-stylesheet
type
=
"text/xsl"
href=
"configuration.xsl"
?>
<configuration>
<property>
<name>dfs.replication<
/name
>
<value>1<
/value
>
<
/property
>
<
/configuration
>
|
2.3.4.4 格式化名称节点
以hadoop用户运行如下命令
1
|
$ hadoop namenode -
format
|
其执行后会显示为类似如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
12
/12/06
22:16:02 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost.localdomain
/127
.0.0.1
STARTUP_MSG: args = [-
format
]
STARTUP_MSG: version = 0.20.2-cdh3u5
STARTUP_MSG: build = git:
//ubuntu-slave02/var/lib/jenkins/workspace/CDH3u5-Full-RC/build/cdh3/hadoop20/0
.20.2-cdh3u5
/source
-r 30233064aaf5f2492bc687d61d72956876102109; compiled by
'jenkins'
on Fri Oct 5 17:21:34 PDT 2012
************************************************************/
12
/12/06
22:16:03 INFO util.GSet: VM
type
= 32-bit
12
/12/06
22:16:03 INFO util.GSet: 2% max memory = 19.33375 MB
12
/12/06
22:16:03 INFO util.GSet: capacity = 2^22 = 4194304 entries
12
/12/06
22:16:03 INFO util.GSet: recommended=4194304, actual=4194304
12
/12/06
22:16:03 INFO namenode.FSNamesystem: fsOwner=hadoop (auth:SIMPLE)
12
/12/06
22:16:04 INFO namenode.FSNamesystem: supergroup=supergroup
12
/12/06
22:16:04 INFO namenode.FSNamesystem: isPermissionEnabled=
true
12
/12/06
22:16:04 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=1000
12
/12/06
22:16:04 INFO namenode.FSNamesystem: isAccessTokenEnabled=
false
accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
12
/12/06
22:16:04 INFO common.Storage: Image
file
of size 112 saved
in
0 seconds.
12
/12/06
22:16:05 INFO common.Storage: Storage directory
/hadoop/temp/dfs/name
has been successfully formatted.
12
/12/06
22:16:05 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost.localdomain
/127
.0.0.1
************************************************************/
|
其中的“Storage directory /hadoop/temp/dfs/name has been successfully formatted”一行信息表明对应的存储已经格式化成功。
2.3.4.5 启动hadoop
Hadoop提供了2个脚本start-dfs.sh和start-mapred.sh,分别用于启动hdfs相关的进程和mapred相关的进程。事实上,为了使用的便捷性,在NameNode和JobTracker运行于同一主机的场景中,Hadoop还专门提供了脚本start-all.sh脚本来自动执行前述两个脚本。
1
|
$
/usr/local/hadoop/bin/start-all
.sh
|
其会输出类似如下内容:
1
2
3
4
5
|
starting namenode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-namenode-localhost
.localdomain.out
localhost: starting datanode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-datanode-localhost
.localdomain.out
localhost: starting secondarynamenode, logging to
/usr/local/hadoop/logs/hadoop-hadoop-secondarynamenode-localhost
.localdomain.out
starting jobtracker, logging to
/usr/local/hadoop/logs/hadoop-hadoop-jobtracker-localhost
.localdomain.out
localhost: starting tasktracker, logging to
/usr/local/hadoop/logs/hadoop-hadoop-tasktracker-localhost
.localdomain.out
|
运行jps命令查看正在运行的Hadoop进程
1
2
3
4
5
6
|
$ jps |
grep
-iv
"jps"
29326 DataNode
29478 SecondaryNameNode
29685 TaskTracker
29208 NameNode
29563 JobTracker
|
2.3.4.6 Hadoop进程监听的地址和端口
Hadoop启动时会运行两个服务器进程,一个为用于Hadoop各进程之间进行通信的RPC服务器,另一个是提供了便于管理员查看Hadoop集群各进程相关信息页面的HTTP服务器。
用于定义各RPC服务器所监听的地址和端口的属性有如下几个:
fs.default.name:定义HDFS的NameNode用于提供URI所监听的地址和端口,默认端口为8020;
dfs.datanode.ipc.address:DataNode上IPC服务器监听的地址和端口,默认为0.0.0.0:50020;
mapred.job.tracker:JobTracker的PRC服务器所监听的地址和端口,默认端口为8021;
mapred.task.tracker.report.address:TaskTracker的RPC服务器监听的地址和端口;TaskTracker的子JVM使用此端口与TaskTracker进行通信,它仅需要监听在本地回环地址127.0.0.1上,因此可以使用任何端口;只有在当本地没有回环接口时才需要修改此属性的值;
除了RPC服务器之外,DataNode还会运行一个TCP/IP服务器用于数据块传输,其监听的地址和端口可以通过dfs.datanode.address属性进行定义,默认为0.0.0.0:50010。
可用于定义各HTTP服务器的属性有如下几个:
mapred.job.tracker.http.addrss:JobTracker的HTTP服务器地址和端口,默认为0.0.0.0:50030;
mapred.task.tracker.http.address:TaskTracker的HTTP服务器地址和端口,默认为0.0.0.0:50060;
dfs.http.address:NameNode的HTTP服务器地址和端口,默认为0.0.0.0:50070;
dfs.datanode.http.address:DataNode的HTTP服务器地址和端口,默认为0.0.0.0:50075;
dfs.secondary.http.address:SecondaryNameNode的HTTP服务器地址和端口,默认为0.0.0.0:50090;
上述的HTTP服务器均可以通过浏览器直接访问以获取对应进程的相关信息。
下面的命令可以查看jvm监听的端口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
$
netstat
-tnlp |
grep
"java"
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 0.0.0.0:50020 0.0.0.0:* LISTEN 29326
/java
tcp 0 0 0.0.0.0:52805 0.0.0.0:* LISTEN 29208
/java
tcp 0 0 0.0.0.0:50090 0.0.0.0:* LISTEN 29478
/java
tcp 0 0 0.0.0.0:50060 0.0.0.0:* LISTEN 29685
/java
tcp 0 0 0.0.0.0:50030 0.0.0.0:* LISTEN 29563
/java
tcp 0 0 0.0.0.0:51664 0.0.0.0:* LISTEN 29478
/java
tcp 0 0 0.0.0.0:54898 0.0.0.0:* LISTEN 29326
/java
tcp 0 0 0.0.0.0:55475 0.0.0.0:* LISTEN 29563
/java
tcp 0 0 127.0.0.1:8020 0.0.0.0:* LISTEN 29208
/java
tcp 0 0 127.0.0.1:44949 0.0.0.0:* LISTEN 29685
/java
tcp 0 0 127.0.0.1:8021 0.0.0.0:* LISTEN 29563
/java
tcp 0 0 0.0.0.0:50070 0.0.0.0:* LISTEN 29208
/java
tcp 0 0 0.0.0.0:50010 0.0.0.0:* LISTEN 29326
/java
tcp 0 0 0.0.0.0:50075 0.0.0.0:* LISTEN 29326
/java
|
2.4 Hadoop命令
hadoop有很多子命令,用于完成不同的功能,这个可以通过如下命令查看。
1
|
$ hadoop
|
其中的fs子命令用于进行跟文件系统相关的多种操作,如创建目录、复制文件或目录等,其具体的使用帮助可以使用如下命令获得。
1
|
$ hadoop fs -help
|
hadoop的fs命令能同时跟本地文件系统和HDFS交互,甚至可以跟Amazon的S3进行交互。其使用URI格式路径引用文件路径,完全的URI格式类似schema://authority/path,其中schema类似于协议,这里可以使用hdfs或file,分别用于引用 HDFS文件或本地文件系统中的文件。而对于HDFS来说,authority是指Namenode主机,path是指具体的文件路径。例如,在伪文件系统模式中,HDFS运行于本机的8020端口,因此hdfs://localhost:8020/user/hadoop/test.txt就是一个完整意义上的URI。事实上,在使用中,也可以省略URI中的schema://authority部分,此时其使用配置文件中默认名称段的定义,如我们前面定义的类似如下段的配置信息:
1
2
3
4
|
<property>
<name>fs.default.name<
/name
>
<value>hdfs:
//localhost
:9000<
/value
>
<
/property
>
|
在fs命令用于实现在本地文件系统和HDFS之间传递文件时,可以使用-get(从HDFS中复制文件至本地文件系统)或-put(将本地文件复制到HDFS中)命令实现,而fs会根据使用的命令来判断哪个是本地文件系统路径,哪个是HDFS文件系统路径,如将本地的/etc/issue复制到HDFS中存放至当前hadoop用户的目录中,则可使用如下命令:
1
|
$ hadoop fs -put
/etc/issue
hdfs:
//localhost
:9000
/user/hadoop/
|
或使用命令
1
|
$ hadoop fs -put
/etc/issue
/user/hadoop/
|
复制的结果可以使用如下命令查看:
1
2
3
|
$ hadoop fs -
ls
Found 1 items
-rw-r--r-- 1 hadoop supergroup 74 2012-09-20 23:03
/user/hadoop/issue
|
hadoop对文件系统的管理是通过java类来实现的,而其用于文件系统管理的类有多种,分别用于通过不同的方式访问不同的文件系统。hdfs和file是schema中常见的两种方式。
hadoop常用的命令行命令及其用法列表请参见如下链接:
HDFS命令:http://hadoop.apache.org/common/docs/r1.0.0/file_system_shell.html
MapReduce的job命令:http://hadoop.apache.org/common/docs/r1.0.0/commands_manual.html#job
2.5 测试Hadoop
Hadoop提供了MapReduce编程框架,其并行处理能力的发挥需要通过开发Map及Reduce程序实现。为了便于系统测试,Hadoop提供了一个单词统计的应用程序算法样例,其位于Hadoop安装目录下名称类似hadoop-examples-*.jar的文件中。除了单词统计,这个jar文件还包含了分布式运行的grep等功能的实现,这可以通过如下命令查看。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
$ hadoop jar
/usr/local/hadoop/hadoop-examples-0
.20.2-cdh3u5.jar
An example program must be given as the first argument.
Valid program names are:
aggregatewordcount: An Aggregate based map
/reduce
program that counts the words
in
the input files.
aggregatewordhist: An Aggregate based map
/reduce
program that computes the histogram of the words
in
the input files.
dbcount: An example job that count the pageview counts from a database.
grep
: A map
/reduce
program that counts the matches of a regex
in
the input.
join
: A job that effects a
join
over sorted, equally partitioned datasets
multifilewc: A job that counts words from several files.
pentomino: A map
/reduce
tile laying program to
find
solutions to pentomino problems.
pi: A map
/reduce
program that estimates Pi using monte-carlo method.
randomtextwriter: A map
/reduce
program that writes 10GB of random textual data per node.
randomwriter: A map
/reduce
program that writes 10GB of random data per node.
secondarysort: An example defining a secondary
sort
to the reduce.
sleep
: A job that sleeps at each map and reduce task.
sort
: A map
/reduce
program that sorts the data written by the random writer.
sudoku: A sudoku solver.
teragen: Generate data
for
the terasort
terasort: Run the terasort
teravalidate: Checking results of terasort
wordcount: A map
/reduce
program that counts the words
in
the input files.
|
接下来的过程来演示在HDFS的wc-in目录中存放两个测试文件,而后运行wordcount程序实现对这两个测试文件中各单词出现次数进行统计的实现过程。首先创建wc-in目录,并复制文件至HDFS文件系统中。
1
2
|
$ hadoop fs -
mkdir
wc
-
in
$ hadoop fs -put
/etc/rc
.d
/init
.d
/functions
/etc/profile
wc
-
in
|
接下来启动分布式任务,其中的wc-out为reduce任务执行结果文件所在的目录,此目标要求事先不能存在,否则运行将会报错。
1
|
$ hadoop jar
/usr/local/hadoop/hadoop-example-
*.jar wordcount
wc
-
in
wc
-out
|
输出结果类似如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
12
/12/06
23:11:38 INFO input.FileInputFormat: Total input paths to process : 2
12
/12/06
23:11:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library
for
your platform... using
builtin
-java classes where applicable
12
/12/06
23:11:38 WARN snappy.LoadSnappy: Snappy native library not loaded
12
/12/06
23:11:38 INFO mapred.JobClient: Running job: job_201212062231_0001
12
/12/06
23:11:39 INFO mapred.JobClient: map 0% reduce 0%
12
/12/06
23:11:50 INFO mapred.JobClient: map 100% reduce 0%
12
/12/06
23:11:58 INFO mapred.JobClient: map 100% reduce 33%
12
/12/06
23:12:00 INFO mapred.JobClient: map 100% reduce 100%
12
/12/06
23:12:02 INFO mapred.JobClient: Job complete: job_201212062231_0001
12
/12/06
23:12:02 INFO mapred.JobClient: Counters: 26
12
/12/06
23:12:02 INFO mapred.JobClient: Job Counters
12
/12/06
23:12:02 INFO mapred.JobClient: Launched reduce tasks=1
12
/12/06
23:12:02 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=14810
12
/12/06
23:12:02 INFO mapred.JobClient: Total
time
spent by all reduces waiting after reserving slots (ms)=0
12
/12/06
23:12:02 INFO mapred.JobClient: Total
time
spent by all maps waiting after reserving slots (ms)=0
12
/12/06
23:12:02 INFO mapred.JobClient: Launched map tasks=2
12
/12/06
23:12:02 INFO mapred.JobClient: Data-
local
map tasks=2
12
/12/06
|