大数据介绍
大数据这个概念一直都是如火如荼,那什么是大数据呢?首先从名字来看,我们可以简单地认为数据量大,而数据量大也就意味着计算量大。这样理解本身是没有任何问题的,只不过这并不能很好地定义大数据。
而业界的一家权威机构,针对大数据做了描述,认为大数据应该具备如下特征:
1)数据量(Volume):数据量大,可以达到 TB、PB 甚至更高。而这种规模的数据,传统的数据库已经不好处理了,所以才有了现在各种各样的大数据框架。
2)多样性(Variety):数据的类型繁多,比如简单的数值、文本,地理位置、图片、音频、视频等等,数据存在多样性。而处理多样性的数据所带来的挑战,也就比以前更高;
3)价值(Value):原始数据都是杂乱无章的,我们无法直接得到有效信息。而对数据进行清洗、分类、汇总等各种处理之后,我们能够从中找到一些规律,将其变成商业价值,这一步也就是数据分析师要做的事情。所以分析数据的目的是从中找到一些规律、信息,将其变成价值。而数据的价值密度和数据的总量通常是不成正比的,你的数据量增加一倍,但是价值未必增加一倍;
4)速度(Velocity):对于大数据而言,我们不仅追求大量数据的处理,还希望它能有让人满意的速度。在早期的大数据处理框架中,它是做不到实时的,只能进行离线批处理。但是很多场景下,我们是需要立刻就能计算出结果的,所以大数据领域也就慢慢诞生了更多的实时性框架。
以上便是大数据的 4V 特征。
而大数据的出现,也必然会带来一些技术性的革新。
比如数据存储,大数据可以到 PB、EB、ZB 级别,这种数据量单台机器已经存储不下了,所以数据存储方面必然要发生改变。由原来的文件存储变成了分布式文件存储,也就是将一个大文件切分成多个小块存储在多台机器上,这样就解决了数据存储的问题。
不仅如此,由于散落在多个机器上,如果某一台机器挂掉了,那么就导致整个文件都无法读取了。因此为了容错,在切分成多个小块的时候,还会将每一个块多拷贝两份,散落在不同的机器上。这样一台机器挂掉了,还可以从其它机器上读,这一点在后续介绍 HDFS 的时候会详细说。
除了数据存储,还有数据计算。因为数据量大,必然也伴随着计算量大,那该怎么办呢?类比数据存储,存储的时候可以存在多台机器上,那么计算的时候是不是也可以让多台机器一块计算呢。所以将一个作业进行拆分,交给不同的机器进行计算,然后再将结果做一个归并,这就是所谓的分布式计算。
而大数据生态圈中最著名的Hadoop,便是由分布式文件系统HDFS和分布式计算框架MapReduce 组成的,这个我们后面会说。
除了存储和计算,还有网络问题。以前单机的时候,数据就在你的本地,计算也在本地,所以没什么好说的,直接读取数据、计算就是了。但分布式文件存储和分布式计算就不一样了,因为数据被切分成了多块,有可能某台机器上的计算任务所需要的数据在其它的机器上,这是很正常的。因此很多时候数据之间的传输是不可避免的,这对网络也是一个挑战,所以至少是万兆网,千兆已经捉襟见肘了。特别是跨数据中心、跨地区,要求会更高。
大数据处理都分为哪几步
- 1. 数据采集:一般使用 Flume、Sqoop;
- 2. 数据存储:一般使用 Hadoop;
- 3. 数据处理、分析、挖掘:一般使用 Hadoop, Hive, Spark, Flink 等等;
- 4. 可视化:该步骤并不完全属于大数据的范畴,一般由专门的团队去做;
大数据在技术架构上所带来的挑战
1. 对现有数据库管理技术的挑战:对于 PB、EB 级别的大数据而言,使用目前的关系型数据库存储是不现实的,尽管数据库也可以部署集群,但规模非常有限。而且由于数据量的原因,也很难使用现有的结构化查询语言来分析现有的大数据;
2. 经典数据库技术并没有考虑数据的多类别:大数据的 4V 特征中有一个 V 是多类别,现在的数据库没有办法很好地存储一些特殊类型的数据;
3. 实时性技术的挑战:想从大量数据中提取相应的价值,花费的时间是不短的,如果使用现有的技术很难做到实时性;
4. 网络架构、数据中心、运维的挑战:数据一直在高速增长,当涉及到大量数据的传输时,对数据中心、运维会是一个不小的挑战;
如何对大数据进行存储和分析呢
这是最直观的一个问题,如果你都不能对大数据进行存储、分析,那么也就谈不上所谓的商业价值了。而数据的存储和分析,自然需要由专业的框架来完成,当然你也可以自己开发一个框架,但这显然是非常困难的,我们也不会这么做。因为在大数据的存储和计算中,存储容量、读写速度、计算效率等等,这些都是需要考虑的。
而幸运的是,Google 的三驾马车:GFS, MapReduce, BigTable 解决了这一点,但 Google 并没有将它们开源,只是发表了相应的技术论文。而 Hadoop 便是 Hadoop 的作者基于 Google 的论文、使用 Java 语言开发的。我们来介绍一下:
- GFS:指的就是 Google 公司的分布式文件系统,HDFS 便是基于 GFS 诞生的,也就是 Hadoop 的分布式文件系统;
- MapReduce:分布式计算处理框架,对应 Hadoop 的 MapReduce,可以将一个作业拆分成多份,然后在多个机器上并行计算;
- BigTable:顾名思义是一张大表,普通量级的数据可以使用关系型数据库的表进行查询,但大数据就没办法了。而 BigTable 则可以很好地解决这一点,它对应着大数据框架中的 HBase。但要清楚 HBase 并不隶属于 Hadoop,HBase 是一个独立的分布式数据库,只是它底层的数据存储依赖于 HDFS,正如 BigTable 底层的数据存储依赖于 GFS 一样;
Hadoop 概述
下面我们就来认识一下Hadoop,看看它的概念是什么?核心组件有哪些?具有哪些优势?发展史等等。
提到 Hadoop,有狭义上的 Hadoop,还有广义上的 Hadoop。
- 狭义 Hadoop:指的就是 Hadoop 本身;
- 广义 Hadoop:围绕着 Hadoop 所构建的生态圈,里面包含了各种各样的框架;
下面我们说的 Hadoop,指的是狭义上的 Hadoop,也就是 Hadoop 本身。
先说一下 Hadoop 这个名字的由来,这个名字没有任何的意义,它是作者的儿子给一个玩具大象起的名字,所以 Hadoop 官网的 logo 也是一个大象。另外这些大数据框架基本上都是Apache的顶级项目,它们的官网都是项目名.apache.org,比如Hadoop的官网就是hadoop.apache.org。
那么 Hadoop 是什么呢?
Hadoop 是一个可靠的、可扩展的、分布式的计算框架,它可以对大量的数据集进行并行处理。我们知道单台机器的能力是不够的,所以可以将多台机器组成一个集群,而集群中每一台机器都叫做一个节点,Hadoop 可以从单机(单节点)扩展到上千个节点,来对数据进行存储和计算。如果存储不够了,怎么办?直接加机器就好了,节点的扩展非常容易。
另外最重要的是,Hadoop 可以部署在廉价的机器上,而不需要使用昂贵的小型机、或者刀片机等等。而且还提供了故障恢复、容错等机制。
Hadoop 主要由以下几个部分组成。
- 分布式文件系统 HDFS:将文件分布式存储在很多的节点上;
- 分布式计算框架 MapReduce:能在很多节点上进行分布式并行计算;
- 分布式资源调度框架 YARN:实现集群资源管理以及作业的调度;
HDFS 和 MapReduce 我们一开始说过了,但是还有一个 YARN,它是用来对集群资源进行管理、以及作业的调度的。还是举之前的例子,如果一个作业所需要的数据不在当前节点上该怎么办?显然有两种做法:
- 1. 将数据从其它节点上传输过来;
- 2. 或者将作业调度到有数据的节点上。
而在大数据领域中是有说法的,移动数据不如移动计算,因为数据的移动成本要比计算的移动成本高很多。所以这个时候就需要 YARN 了,当然 YARN 还用于资源的管理,给每个作业分配相应的资源等等。
Hadoop 的优势
那么 Hadoop 的优势都有哪些呢?
- 高可靠性:Hadoop 底层维护多个数据副本,所以即使某个节点出现故障,也不会导致数据的丢失;
- 高扩展性:在集群间分配任务数据,可方便地扩展数以千计的节点。很好理解,如果容量不够了,直接横向扩展,加机器就行。
- 高效性:在 MapReduce 的思想下,Hadoop 是并行工作的,以加快任务的处理速度。实际上如果学了 Spark,会发现 Hadoop 自己所描述的易用性、高效性实在是不敢恭维。但是 Hadoop 作为大数据生态圈中非常重要的组件,我们是有必要学好的,而且学了 Hadoop 之后,再学 Spark 会轻松很多,而且也会明白为什么 Spark 会比 Hadoop 在效率上高出几十倍、甚至上百倍;
- 高容错性:能够自动将失败的任务重新分配,如果某台机器挂掉了,那么会自动将任务分配到其他的机器上执行;
- 可以部署在廉价机器上,降低成本;
- 成熟的生态圈,里面不仅仅是 Hadoop,里面还有大量的其它框架,后面会说;
Hadoop 生态圈
我们说 Hadoop 分为狭义 Hadoop 和广义 Hadoop。
狭义 Hadoop 指的是:一个适合大数据的分布式存储(HDFS)、分布式计算(MapReduce)和资源调度(YARN)平台,所以狭义 Hadoop 指的就是 Hadoop 框架本身。
广义 Hadoop 指的是:Hadoop 生态系统,这是一个很庞大的概念,Hadoop 框架是其中最重要、也是最基础的一个部分;生态系统中的每一个子系统只解决某一个特定的问题域(甚至可能很窄),不搞统一型的一个全能系统,而是小而精的多个小系统。
Hadoop 生态圈里面的东西还是非常非常多的,囊括了大数据处理的方方面面,并且这是一个成熟的生态圈。像 Flume 做日志采集、Sqoop 做数据的导入导出,还有调度系统以及分布式协调服务等等。我们这里学习的是 Hadoop,并且是狭义上的 Hadoop。
Hadoop 常用的发行版
首先是社区版,也就是 Apache 版本,直接去官网就可以下载。它的特点是纯开源,可以直接在源码的基础上进行二次开发;但是不同版本/框架之间的整合有时会比较麻烦,比如 jar 包的冲突等等。
然后是 CDH 版本,它是 cloudera 公司开发的 Hadoop,并且提供了 cloudera manager,可以通过页面进行可视化操作,比如一键安装各种框架、对框架升级等等,并且还帮你屏蔽了不同框架之间的 jar 包冲突。但是 CDH 不开源,并且与社区版本有点出入。但凡是使用 Hadoop 的公司,有百分之 60~70 使用的都是 CDH,包括笔者以前所在的公司,其信息中心就是采购的 CDH。
最后是 HDP版本,由 Hortonworkds 公司提供,原装 Hadoop、支持 tez;但是企业级安全不开源。
如果是在学习的时候,使用哪种发行版都无所谓,但是在生产环境中最好使用 CDH 或者 HDP,这里我们就使用社区版了。
安装 Hadoop
下面我们就开始安装 Hadoop 了,你可以使用虚拟机,或者云服务器等等。我这里使用的腾讯云服务器,操作系统是 CentOS7。
另外 Hadoop 的运行模式有三种:
- 单机模式:基本不用,不用管;
- 伪分布式:按照完全分布式来进行搭建、配置,但是机器只有一台;
- 完全分布式:真正意义上的多台机器
我们后续使用的是伪分布式,但是在学习的时候,和使用真正意义上的分布式之间是没有太大区别的。
在学习的时候,不建议上来就搭建完全分布式的 Hadoop 集群,Hadoop 还没搞明白就开始搭建分布式集群的话,真的是很容易造成从入门到放弃。一开始完全可以搭建一个单机版的伪集群,因为在学习的时候和真正的集群没有太大区别。
下面就开始安装 Hadoop 了,不过在安装它之前,我们还需要安装 jdk。因为 Hadoop 是 Java 语言编写的,所以我们需要安装 jdk,至于版本请保证在1.8以上。
这里我的软件都安装在 /opt 目录下,其中 jdk 已经安装好了,至于它的安装也很简单,这里就不介绍了。
下面安装 Hadoop,我们直接去官网下载即可,这里我下载的版本是 2.8.4。下载成功之后,直接解压即可。
解压之后的文件目录如上图所示,我们来介绍几个重要的目录。
- bin 目录:和Hadoop客户端相关的脚本文件,比如 hadoop, hdfs, yarn 等等;
- etc 目录:里面有一个 hadoop 目录,该目录存放了所有的配置文件, 我们后面会修改好几个;
- include 目录:这是与 C 语言有关的一些头文件,我们不用管;
- lib 目录:一些本地库,动态库文件;
- libexec 目录:和 lib 目录类似;
- sbin 目录:非常重要的一个目录, 存放了大量的启动文件,比如启动、关闭集群,启动、关闭 yarn 等等;
目录里面的文件我们在用到的时候会说,下面我们来介绍 Hadoop 最重要的三大组件,首先是 HDFS。
分布式文件系统 HDFS
什么是HDFS,它和普通的文件系统之间有什么区别呢?
HDFS(Hadoop Distributed File System)是一个分布式文件系统,用于存储文件,通过目录树来定位文件;其次它是分布式的,可以横跨 N 个机器,由多个节点联合起来实现其功能,集群中的节点有各自的角色。
HDFS 产生背景
随着数据量越来越大,在一台机器上无法存下所有的数据,那么就分配到更多的机器上,但是这样不方便管理和维护。因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统,HDFS 只是其中的一种。
HDFS 使用场景
适合一次写入,多次读出的场景,且不支持文件的修改。适合用来做数据分析,并不适合做网盘应用。很好理解,HDFS 的定位就决定了它不适合像关系型数据库那样,可以任意修改数据。而且写入数据,一定是大批量一次性写,至于原因后面会解释。
HDFS 的设计目标
1)解决硬件故障
HDFS 设计出来就是为了存储大量数据,因为 HDFS 可以跨上千个节点,每个节点只存储数据的一部分。但是这样的话,其中任何一个节点故障,都会导致无法读取整个文件,所以 HDFS 还要具备相应的容错性。
首先 HDFS 在接收到一个大文件之后,会将其切分成块(block),每一个 block 的大小默认是 128M。然后将每一个块再拷贝两份(所以总共默认是 3 份,也就是三副本),分别散落在不同的节点上,这样一台节点挂了(或者磁盘坏掉了),还可以从其它节点上找。我们画一张图:
这里我们以 500M 的文件为例,显然它顶多是个小型数据,这里只是举个例子。
因此以上便属于硬件故障,HDFS 要具备快速检测错误的能力,并且能从错误中自动恢复,这才是 HDFS 设计架构的核心目标。不能说一台机器瘫了或者存储坏掉了,整个 HDFS 集群就不能工作了。
2)流式数据访问
运行在 HDFS 上的应用程序会以流的方式访问数据集,这和普通的文件系统不同,因为 HDFS 更多地被应用于批处理,而不是和用户之间进行交互式访问。所以 HDFS 关注的是高吞吐量,而不是数据访问的低延迟,从这一点我们也能看出 HDFS 不适合做实时处理。
3)大型数据集
HDFS 可以管理大型数据集,TB、PB 甚至更高的级别。而 HDFS 不怕文件大,就怕文件小,这是面试的时候经常被问到的点,这背后更深层次的含义我们后面会说。总之一个 HDFS 集群可以有几千个节点,可以管理非常大的文件。
4)移动计算比移动数据更划算
这一点我们之前说过了,假设计算任务在 A 节点上,但是数据在 B 节点上。这个时候要么把计算调度到 B 机器上,要么把数据传输到 A 机器上。而移动计算的成本比移动数据的成本低很多,所以我们应该将计算调度到 B 机器上,HDFS 也支持这一点。
HDFS 的架构
HDFS 有两个核心概念:NameNode、DataNodes,并且是一个主从架构。而 NameNode 就是 master,DataNodes 是 slave。注意:DataNodes 的结尾有一个 s,意味着会有多个 DataNode;但是 NameNode 后面没有 s,因此它只有一个,事实上也确实如此。
HDFS 集群由一个 NameNode 和多个 DataNode 组成,NameNode 负责管理文件系统的 namespace(名字空间,比如文件的目录结构便是 namespace 的一部分),并提供给客户端固定的访问途径。因为客户端需要读写数据,首先经过的就是 NameNode。
除了 NameNode,还有多个 DataNode,DataNode 就是用来存储数据的一个进程,通常一个节点对应一个 DataNode。所以一个 HDFS 集群可以由多个节点组成,其中一个节点负责启动 NameNode 进程,剩余的节点负责启动 DataNode 进程。在内部,一个文件会被拆分多个块,并默认以三副本存储,然后默认存储在多个 DataNode 对应的节点上。
注意:我们说客户端创建、删除文件,都是通过 NameNode,它负责执行文件系统的类似 CRUD 的操作。并且最重要的是,它还决定了 block 和 DataNode 之间的映射关系。
假设由一个 150M 的文件,存储的时候会被切分成两个块,那么问题来了:block1 存在哪个 DataNode 中呢,block2 又存在哪个 DataNode 中呢?其实这一点不用担心,因为我们说 NameNode 会记录每个 block 和 DataNode 的映射关系,这些便是数据的元信息。比如拆分成几个块,每个块都散落在哪些 DataNode 上。
所以客户端获取数据的时候,一定要经过 NameNode,不然这些元信息你拿不到。因此在存储的时候,NameNode 会记录这些元信息,当我们获取的时候 NameNode 会根据元信息找到对应的 DataNode,而这个过程对于用户来说是不可见的。
所以你可以简单地理解为:HDFS 就是一个拆文件、合文件的一个过程。存储的时候拆开,获取的时候合并。
至于数据本身的读写,则是通过 DataNode 来完成的,因为数据存在 DataNode 对应的节点上。
所以我们可以再来总结一遍。
NameNode:就是 master,它是一个主管、管理者
- 管理 hdfs 的名字空间;
- 配置副本策略;
- 管理数据块(block)映射信息;
- 处理客户端读写请求;
DataNode:就是 slave,NameNode 下达命令,DataNode 执行实际的操作
- 存储实际的数据块;
- 执行数据块的读/写操作;
client:就是客户端
- 文件切分,文件上传到 hdfs 的时候,client 将文件切分成一个个的 block,然后上传;
- 与 NameNode 交互,获取文件的位置信息;
- 与 DataNode 交互,读取或者写入数据;
- 客户端提供一些命令来管理 hdfs,比如 NameNode 的格式化;
- 客户端可以通过一些命令来访问 hdfs,比如对 hdfs 的增删改查操作;
还有一个 Secondary NameNode:它不是 NameNode 的替补,当 NameNode 挂掉时,并不能马上替换 NameNode 并提供服务,它的作用如下
- 辅助 NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给 NameNode;
- 紧急情况下,可辅助恢复 NameNode,可以恢复一部分;
我们来看几幅漫画,写得非常好,个人给翻译了一遍。
从这里我们便了解了 HDFS 的整体架构,用一张图总结就是:
那么下面再来总结一下 HDFS 的优缺点。
优点:
1)高容错性:数据自动保存多个副本,它通过增加副本的形式,提高容错性;即使某一个副本丢失,也可以自动恢复;
2)适合处理大数据:数据规模大,能够处理 TB、甚至 PB 级别的数据;文件规模大,能够处理百万规模以上的文件,数量相当之大;
3)可构建在廉价的机器之上,通过多副本机制,提高可靠性;
缺点
1)不适合低延时数据访问,如果你想做到毫秒级存储,别想了,做不到的;
2)无法高效地对大量小文件进行存储,存一个 1G 的数据比存 10 个 100MB 的数据要高效很多;至于原因和我们之前说的 HDFS 不怕文件大,就怕文件小是类似的。
NameNode 一般是唯一的,这就意味着空间是有限的。而 NameNode 要记录文件的元数据,不管是 1KB,还是 1GB,都需要 150 字节的空间进行记录。如果全是小文件的话,那是不是很耗费 NameNode 所在机器的空间呢?
而且文件在读取之前需要先确定它位于哪些 DataNode 上,相当于寻址,而文件过小的话,那么寻址时间反而会超过读取时间,这违反了 HDFS 的设计目标。
3)不支持并发写入和文件的随机修改,一个文件只能有一个写,不允许多个线程同时写。仅支持数据的 append,不支持文件的随机修改。
HDFS 块大小的设置
HDFS 中的文件在物理上是分块存储(block),块的大小可以通过配置参数(df.blocksize)指定,默认大小是 128M,老版本是 64M。
那么这个块大小是怎么来的呢?首先我们说读取一个块的时候,需要先经过 NameNode 确定该块在哪些节点上,而这是需要时间的,一般称为寻址时间。而当寻址时间为块读取时间的 1% 时,是最佳状态。
一般寻址时间为 10ms,那么块读取时间为 1s,而目前磁盘的传输速率普遍为 100M/s,因此块大小大概 100M 左右。根据实际情况可能略有不同,但大致是 100多M 左右。
思考:为什么块不能设置太小,也不能设置太大?
- hdfs 块设置太小,会增加寻址时间,程序一直在找块的开始位置;
- hdfs 块设置太大,程序在读取这块数据时会非常慢;
总结:HDFS 块的大小设置主要取决于磁盘的传输速率。
修改配置文件,启动伪集群
下面来启动集群,但需要先修改几个配置文件。我们在介绍 Hadoop 目录结构的时候说过,配置文件都在安装目录的 etc/hadoop 目录中。
修改 hadoop-env.sh
里面配置好 JAVA 的安装路径。
修改 core-site.xml
修改 hdfs-site.xml
修改 slaves
将集群当中的 DataNode 节点都写在里面,而当前只有一个节点,它既是 NameNode 又是 DataNode,所以写一个 localhost 进去就行。但该文件里面默认就是 localhost,因此这个文件当前不需要修改。之所以提一嘴,是想提示在搭建集群的时候,别忘记将 DataNode 所在节点都写在里面。
目前只需要修改以上几个文件,然后启动集群(伪)。但在此之前需要先格式化 NameNode,注意:只需要在第一次启动时格式化。
# 如果你配置了环境变量,可以直接输入 hdfs bin/hdfs namenode -format
如果格式化成功,那么 hadoop.tmp.dir 参数指定的目录会自动创建。
然后启动 Hadoop 集群:
# 如果把 sbin 目录也配置了环境变量 # 那么 sbin/ 也不需要加,关闭集群则是 stop-dfs.sh sbin/start-dfs.sh
执行该命令的时候会依次启动 NameNode, DataNode, SecondaryNameNode,但会要求你三次输入当前用户的密码,并且每次启动、关闭的时候都是如此。在多节点通信的时候,显然不能这样,虽然目前是单节点,但每次输入密码也很麻烦,因此建议配置免密码登录。
启动完毕,并且启动时的日志信息也记录在了文件里,但如果你查看的话会发现并没有什么信息。这是因为显示的文件不对,你只需要把结尾的 out 改成 log 就可以查看日志信息了,这个应该是 Hadoop 内部的问题,不过无关紧要。
那么到底有没有启动成功呢?我们输入 jps 查看一下。
如果出现了黄色方框里的内容,那么就证明启动成功了。另外这是三个独立的进程,可能出现有的启动成功,有的启动失败,比如你发现显示的进程中没有 NameNode,那就证明 NameNode 启动失败了,你就需要去对应的日志文件中查看原因。
PS:如果你发现 NameNode 真的启动失败了,那么很有可能是 9000 端口冲突了。
只有当上面三个进程同时出现,才算启动成功。然后我们在浏览器输入:ip:50070,可以查看 webUI 界面。
页面信息大致如上,在 Summary 中有很多关于节点的信息,可以看一下。另外导航栏中的最后一个 utilities,点击的话会出现一个下拉菜单,里面有一个 Browse the file system。点击的话,可以查看整个文件的目录结构,后面会说。
最后补充一点,我们不可以重复格式化 NameNode。因为该操作会产生新的集群 id,导致 NameNode 和 DataNode 的集群 id 不一致,集群找不到以往的数据。所以格式化 NameNode 的时候,务必要先删除 data 数据和 log 日志,然后再格式化。因为两者需要有一个共同的 id,这样才能交互。
hdfs shell 命令
HDFS 的 shell 命令和 Linux 是非常类似的,比如查看某个目录下的文件,Linux 是 ls,那么 hdfs shell 就是 hdfs dfs -ls,再比如查看文件内容是 hdfs dfs -cat filename。两者非常相似,只不过在 hdfs shell 中需要加上一个横杠。
另外 hdfs dfs 还可以写成 hadoop fs,对于 shell 操作来说两者区别不大,下面就来介绍一些常用的命令。
hdfs dfs -help 某个命令
显然这是查看某个命令使用方法的命令,比如查看 cat 的使用方法。
hdfs dfs -ls 目录路径
查看某个目录有哪些文件,加上 -R 表示递归显示。由于当前没有文件,所以不演示了。
hdfs dfs -mkdir 目录
在 hdfs 上面创建目录,加上 -p 表示递归创建,和 Linux 是一样的。
hdfs dfs -moveFromLocal 本地路径 hdfs路径
将本地文件或目录移动到 hdfs 上面,注意是移动,移完之后本地就没了。
hdfs dfs -cat 文件
查看一个文件的内容。
hdfs dfs -appendToFile 追加的文件 追加到哪个文件
将一个文件的内容追加到另一个文件里面去。
我们将本地 other.py 里的内容追加到了 HDFS 上的 /code.py 文件中。
hdfs dfs [-chgrp、-chmod、-chown]
更改组、更改权限、更改所有者,这个和 Linux 中用法一样。
hdfs dfs -copyFromLocal 本地路径 hdfs路径
将文件从本地拷贝到 HDFS 上面去,这个和刚才的 moveFromLocal 就类似于 Linux 中的 cp 和 mv。
hdfs dfs -copyToLocal hdfs路径 本地路径
将 HDFS 上的文件拷贝到本地,此时是 HDFS 路径在前、本地路径在后。
hdfs dfs -cp 源hdfs路径 目的hdfs路径
上面的拷贝都是针对本地路径和 HDFS 路径,而 -cp 则是在两个 HDFS 路径之间拷贝。
hdfs dfs -mv 源hdfs路径 目的hdfs路径
和 -cp 用法一样,不过 -cp 是拷贝,-mv 是移动。
hdfs dfs -get hdfs路径 本地路径
等同于 copyToLocal。
hdfs dfs -put 本地路径 hdfs路径
等同于 copyFromLocal。
hdfs dfs -getmerge hdfs路径(通配符) 本地路径
将 hdfs 上面的多个文件合并下载到本地。
hdfs dfs -tail 文件名
显示文件的结尾,类似于 Linux 的 tail。
hdfs dfs -rm 文件
删除文件,如果是文件夹需要加上 -r。
hdfs dfs -rmdir 空目录
删除一个空目录,不常用,一般使用 -rm。
hdfs dfs -du 目录
统计目录的大小信息:
- hdfs dfs -du -h /:加上-h人性化显示;
- hdfs dfs -du -h -s / :查看当前目录的总大小;
hdfs dfs -setrep 数值 文件
设置文件的副本数量,比如 hdfs dfs -setrep 5 /file.txt,表示将 file.txt 的副本设置成 5。
使用webUI观察HDFS存储
我们上传一个稍微大一点的文件进去吧,就把 jdk 压缩包上传到 HDFS 上。
这里我们上传到了 HDFS 的根目录,通过 webUI 来查看一下。
我们发现文件已经在里面了,并且这个文件的大小是 180.06M,显然会被切成两个块,每个块的副本系数是 1,因为我们设置的是 1。然后点击一下该文件:
可以看到拆分之后的两个块的信息,分别是 block0 和 block1,而且每个块都有一个 ID,是依次增大的,并且两个 Size 加起来也一定是 jdk 安装包的大小。然后它存在什么地方呢?还记得之前配置的 hadoop.tmp.dir 吗?
最终我们进入到 subdir0 这个目录,层级非常的多,然后我们看看里面都包含了哪些内容。
箭头所指的就是 jdk 压缩包的两个块,并且文件名的结尾就是对应的块id,而且大小也和 webUI 上显示的一样。然后 jdk 压缩包被切分成了两个块,而现在这两个块我们都找到了,如果将它们合并在一起话,那么 tar 命令能不能正常解压呢?我们来试试。
现在你还觉得 HDFS 神奇吗?所以就是之前说的,只是将文件切分成块,然后散落在不同节点的本地存储中。查找的时候,会去 NameNode 获取元信息,找到相应的块再组合起来,就这么简单。因此 HDFS 还是需要依赖本地进行存储的,只不过内部的 NameNode 会帮助我们对块进行管理,但本质上就是文件的拆分与合并的过程。
Python连接HDFS
使用 HDFS SHELL 只是用来临时做测试用,工作中肯定是通过代码来操作的,那么下面来看看如何使用 Python 连接 HDFS,并进行相关操作。
这里为什么要用 Python 呢?因为笔者是 Python 方向的,Java、Scala 一概不懂。
首先 Python 若想操作 HDFS,需要下载一个第三方库,也叫 hdfs,直接 pip install hdfs 即可。
from pprint import pprint import hdfs # 导入相关模块,输入 http://ip:50070,创建客户端 client = hdfs.Client("http://82.157.146.194:50070")
client.list:查看指定目录的内容
pprint(client.list("/")) """ ['code.py', 'code1.py', 'girls', 'jdk-8u221-linux-x64.tar.gz'] """ # status 表示是否显示文件的相关属性, 默认为 False # 指定为 True 的话,会同时返回文件相关属性 # 返回的数据格式为:[("文件名", {相关属性}), ...] pprint(client.list("/", status=True)) """ [('code.py', {'accessTime': 1667875155444, 'blockSize': 134217728, 'childrenNum': 0, 'fileId': 16397, 'group': 'supergroup', 'length': 32, 'modificationTime': 1667875310739, 'owner': 'root', 'pathSuffix': 'code.py', 'permission': '644', 'replication': 1, 'storagePolicy': 0, 'type': 'FILE'}), ..., ] """
client.status:获取指定路径的状态信息
pprint(client.status("/")) """ {'accessTime': 0, 'blockSize': 0, 'childrenNum': 3, 'fileId': 16385, 'group': 'supergroup', 'length': 0, 'modificationTime': 1607194594505, 'owner': 'root', 'pathSuffix': '', 'permission': '755', 'replication': 0, 'storagePolicy': 0, 'type': 'DIRECTORY'} """ # 里面还有一个 strict=True,表示严格模式 # 如果改为 False,那么如果输入的路径不存在就返回 None # 为 True 的话,路径不存在,报错
client.makedirs:创建目录
# 会自动递归创建,如果想创建的时候给目录赋予权限 # 可以使用 permission 参数,默认为 None client.makedirs("/a/b/c", permission=777) pprint(client.list("/a")) # ['b'] pprint(client.list("/a/b")) # ['c']
如果不出意外的话,你在执行的时候应该会报错:hdfs.util.HdfsError: Permission denied:。因为默认情况下我们只能查看 HDFS 上的数据,但是不能写入、修改、删除。
如果你创建目录的话,就会抛出这个异常。至于解决办法也很简单,我们修改 hdfs-site.xml,在里面添加如下内容,然后重启 Hadoop 集群。
<property> <name>dfs.permissions</name> <value>false</value> </property>
但如果你重启集群之后立即执行的话,那么还是会报错,提示:Name node is in safe mode。因为 Hadoop 集群启动之后会进入短暂的安全模式,你需要等待一会才可以创建,关于安全模式一会单独说。
client.write、client.read:往文件里面写内容、读内容
with client.write("/这是一个不存在的文件.txt") as writer: # 需要传入字节 writer.write( bytes("this file not exists", encoding="utf-8") ) with client.read("/这是一个不存在的文件.txt") as reader: # 读取出来也是字节类型 print(reader.read()) # b'this file not exists'
如果你在执行的时候出现了连接错误,那么就在 hosts 文件中增加服务器IP到主机名的映射。
然后注意这里的 write 方法,如果不指定额外的参数,那么要求文件不能存在,否则会报错,提示文件已经存在。如果要对已存在的文件进行操作,那么需要显式的指定参数:overwrite 或者 append,也就是重写或追加,相当于 Python 文件操作的 w 模式和 r 模式。
注意:overwrite 和 append 不能同时出现,否则报错。
client.content:查看目录的汇总情况
比如:当前目录下有多少个子目录、多少文件等等。
pprint(client.content("/", strict=True)) """ {'directoryCount': 7, 'fileCount': 3, 'length': 195094805, 'quota': 9223372036854775807, 'spaceConsumed': 195094805, 'spaceQuota': -1, 'typeQuota': {}} """
client.set_owner:设置所有者
client.set_permission:设置权限
client.set_replication:设置副本系数
client.set_times:设置时间
""" def set_owner(self, hdfs_path, owner=None, group=None): def set_permission(self, hdfs_path, permission): def set_replication(self, hdfs_path, replication): def set_times(self, hdfs_path, access_time=None, modification_time=None): """
client.resolve: 将带有符号的路径,转换成绝对、规范化路径
# 当然并不要求路径真实存在 print( client.resolve("/古明地觉/古明地觉/..") ) # /古明地觉
client.walk:递归遍历目录
# 递归遍历文件,类似于 os.walk,会返回一个生成器,可以进行迭代 # 每一次迭代的内容都是一个三元组,内容如下: # ("路径", ["目录文件", ...], ["文本文件", ...]) for file in client.walk("/"): print(file) """ ('/', ['a', 'girls'], ['code.py', 'code1.py', 'jdk-8u221-li...']) ('/a', ['b'], []) ('/a/b', ['c'], []) ('/a/b/c', [], []) ('/girls', ['koishi', 'satori'], []) ('/girls/koishi', [], []) ('/girls/satori', [], []) """
client.upload:上传文件
print("1.c" in client.list("/")) # False client.upload(hdfs_path="/", local_path="1.c") print("1.c" in client.list("/")) # True
client.download:下载文件
client.download(hdfs_path="/code.py", local_path="code.py") print( open("code.py", "r", encoding="utf-8").read() )
client.checksum:获取文件的校验和
# 获取文件的校验和 pprint(client.checksum("/code.py"))
client.delete:删除文件或目录
# recursive 表示是否递归删除,默认为 False try: client.delete("/girls") except Exception as e: print(e) # `/girls is non empty': Directory is not empty client.delete("/girls", recursive=True)
当然还有一些其它操作,比如重命名:client.rename 等等,这里就不说了。
HDFS元数据管理
再来聊聊 HDFS 的元数据管理,首先元数据包含:文件名、副本系数(或者说副本因子)、块id、以及散落在哪个 DataNode 上。
因此 HDFS 的元数据也就是整个 HDFS 文件系统的层级结构,以及每个文件的 block 信息。
而这些元信息存在于 hadoop.tmp.dir 中,我们来看一下。
然后 current 目录里面存放了很多的文件:
首先 NameNode 启动之后,这些元数据会在内存中,因此为了防止重启之后丢失,肯定要定期写入磁盘。图中的 fs_image 文件便是写入之后的结果,但写入是定期写入的,假设每隔半小时写入一次。
如果 NameNode 宕掉,那么还是会丢失半小时的数据,这也是我们所无法忍受的。因此就像 Redis 一样,在缓存数据的同时还将执行的命令操作缓存起来,记录在 edits 文件中。edits 文件是时刻记录的,因为记录的只是命令而已。
然后根据 edits 中的命令,和 fsimage 综合起来,生成一个新的 fsimage,再把老的fsimage给替换掉,这样就确保了元数据的不丢失。但要注意,这一步并不是交给 NameNode 来做的,因为它还要处理来自客户端的请求,如果合并的工作再交给它,那么 NameNode 的压力就太大了。
那么给谁做呢?没错,正是 SecondaryNameNode。所以 NameNode 和它并不是所谓的主备关系,后者相当于前者的小弟,主要是帮大哥减轻压力的。
以上这个过程,叫做 HDFS 的 CheckPoint。
HDFS的安全模式
在 HDFS 刚启动的时候,会进入到一种特殊的模式:安全模式,这是 Hadoop 的一种自我保护机制,用于保证集群中数据块的安全性。比如:副本系数是 3,但是某个块的数量是 2,这个时候就会再拷贝一份,满足副本系数。
如果 HDFS 是在安全模式下的话,那么客户端不能进行任何文件修改的操作,比如:上传文件,删除文件,重命名,创建目录等操作。当然正常情况下,安全模式会运行一段时间自动退出的,只需要我们稍等一会就行了,到底等多长时间呢,我们可以通过 50070 端口查看安全模式退出的剩余时间。
我这里的安全模式是关闭的,如果你当前处于安全模式的话,那么页面信息会提示你还需多长时间才能结束安全模式。在安全模式下,虽然不能进行修改文件的操作,但是可以浏览目录结构、查看文件内容。
此外我们可以通过命令行,显式地控制安全模式的进入、查看、以及退出等等。
安全模式是 Hadoop 的一种保护机制,在启动时,最好是等待集群自动退出该模式,然后进行文件操作。有的小伙伴在和 Hive, Spark 整合的时候,刚一启动 HDFS,就开始启动 Hive, Spark 写数据,结果发现写的时候报错了,此时应该先等待集群退出安全模式。
分布式计算框架 MapReduce
下面来介绍一下 MapReduce,源自于 Google 在 2004 年 12 月发表的 MapReduce 技术论文,它的思想逻辑很简单,完全可以看成是 Python 中 map 和 reduce 的组合。
另外 B 站上有一位 MIT 大佬的公开课,介绍分布式系统的,里面对 Google 设计 HDFS 和 MapReduce 的流程进行了阐述。强烈推荐去看一下,或者也可以读一读原版论文。
MapReduce 实际上就是 map 和 reduce 两者的组合:
- map:并行处理输入的数据;
- reduce:对 map 阶段得到结果进行汇总;
比如我们要做一个词频统计:
MapReduce 就是将一个作业拆分成多份,在多个机器上并行处理,然后再将处理之后的结果汇总在一起。因此它非常适合海量数据的离线处理,不怕你的数据量大,PB, EB 都无所谓,只要你的节点数量足够即可。并且内部的细节我们不需要关心,只需按照它提供的 API 进行编程,我们便能得到结果。
并且当你的计算资源不足时,你可以通过简单的增加机器来扩展计算能力。并且一个节点挂了,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务完全失败。而且这个过程不需要人工参与,是由 Hadoop 内部完成的。
但是对比 Spark, Flink 等框架,MapReduce其实是比较鸡肋的。官方说使用 MapReduce 易开发、易运行,只是相对于我们自己实现而言,而使用 Spark, Flink 进行处理要比使用 MapReduce 清蒸的多。
因为 MapReduce 有以下几个缺点:
1)不擅长实时计算,无法像传统的关系型数据库那样,可以在毫秒级或者秒级内返回结果。
2)不擅长流式计算,流式计算输入的数据是动态的,而 MapReduce 的数据必须是静态的,不能动态变化。这是因为 MR 自身的设计特点决定了数据源必须是静态的。
3)不擅长 DAG(有向无环图)计算,多个程序之间存在依赖,后一个应用程序的输入依赖于上一个程序的输出。在这种情况,MR 不是不能做,而是使用后,每个 MR 作业的输出结果都会写入到磁盘,然后再从磁盘中读取,进行下一个操作。显然这样做会造成大量的磁盘 IO,导致性能非常的低下。
关于 MR 的实际操作就不说了,事实上现在基本都不用 MR 编程了,我们有 Spark,它的出现解决了 MR 的效率低下问题。而且还有 Hive,Hive 也是我们后面要学习的一个重要的大数据组件,很多公司都在用,它是将 MR 进行了一个封装,可以让我们通过写 SQL 的方式来操作 HDFS 上的数据,这样就方便多了。
但既然是写 SQL,那么肯定要像传统关系型数据库一样,有表名、字段名、字段信息等等。没错,这些信息在 Hive 中也叫作元信息、或者元数据,它一般存在 MySQL 等关系型数据库中,实际的数据依旧是存储在 HDFS 上,相当于帮你做了一层映射关系。
关于 Hive 我们介绍的时候再说,总之它也是一个非常重要的大数据组件,毕竟使用 SQL 进行编程肯定要比 MR 简单的多,而 Hive 也可以使用 Python 进行连接、执行操作。
资源调度框架 YARN
下面我们来介绍一下资源调度框架 YARN,但是在介绍它之前我们需要先了解为什么会有 YARN,一项技术的诞生必然是有其原因的。
首先我们用的都是 Hadoop 2.x 或 3.x,但在 1.x 的时候 MR 的架构是怎样的呢?当然不管是哪个版本,MR 都是 master/slave 架构。
HDFS 是一个 NameNode 带多个 DataNode,形成主从架构,在 1.x 的 MR 中也是如此,一个 JobTracker 带多个 TaskTracker。JobTracker 用来跟踪一个作业,而我们说一个作业可以被拆分成多个任务,每个任务对应一个 TaskTracker。
当然这里的拆分并不是将计算本身拆分,而是将文件拆分,举个例子:我们有 100G 的文件,分别散落在 10 个节点上。我们要对这 100G 的文件执行两次 Map、一次 Reduce,那么结果就是每个节点分别对 10G 的数据执行两次 Map、一次 Reduce。
另外 TaskTracker 可以和 JobTracker 进行通信,并需要告诉 JobTracker 自己是否存活。所以客户端 Client 提交作业是先提交到 JobTracker 上面,然后再由 TaskScheduler(任务调度器)将任务调度到 TaskTracker 上运行,比如:MapTask, ReduceTask。
此外 TaskTracker 会定期向 JobTracker 会报节点的健康状况、任务的执行状况,以及资源的使用情况等等。
但这个架构是存在问题的,因为只有一个 JobTracker,它要跟踪所有的作业。如果 JobTracker 挂掉了怎么办?要是挂掉了,那么客户端的所有作业都无法提交到集群上运行了。
此外 JobTracker 要负责和 Client 进行通信,还要和 TaskTracker 进行通信,因此它的压力会非常大。在后续集群的扩展时,JobTracker 很容易成为瓶颈。此外最关键的一点,在 Hadoop1.x 的时候,JobTracker 仅仅只能支持 MapReduce 作业,想提交 Spark 作业是不可能的。
并且这种集群的资源利用率也很低,比如我们有 Hadoop 集群,Saprk 集群,不同的集群进行不同的资源分配。有可能 Hadoop 集群处于空闲状态,Spark 集群处于缺资源状态,导致它们没办法充分利用集群的资源。
而解决这一点的办法就是,所有的计算框架都运行在一个集群中,共享一个集群的资源,做到按需分配。而想实现这一点,首先要满足支持不同种类的作业,所以 YARN 便诞生了。
什么是 YARN
YARN(Yet Another Resource Manager,另一种资源管理器),是一个通用的资源管理系统,为上层应用提供统一的资源管理和调度,所以 YARN 支持不同种类作业的调度。在介绍 Hadoop 生态圈的时候,我们贴了一张图,在 HDFS 之上的就是 YARN。而在 YARN 之上可以运行各种作业,像 MapReduce 作业、Spark 作业等等都可以,只需要提交到 YARN 上面就可以了
YARN 就类似于 1.x 里面的 JobTracker,但是它内部包含了两个部分:一个是资源管理,一个是作业调度或监控,这是两个单独的进程。而在 1.x 当中,都是通过 JobTracker 来完成的,所以它压力大。
所以基于这种架构,会有一个全局的 Resource Manager,每一个应用程序都有一个 Application Master,比如:MapReduce 作业会有一个 MapReduce 对应的 AM,Spark 作业会有一个 Spark 对应的 AM。而一个应用程序可以是一个独立 MapReduce 作业,也可以是多个作业组成的 DAG(有向无环图,多个任务之间有依赖)。
我们说 Resource Manager 是全局的,但除了 RM 之外每个节点还有各自的 NodeManager。目前抛出的概念有点多,我们来慢慢介绍。
Resource Manager
- 1)处理客户端请求。客户端想访问集群,比如提交一个作业,要经过 Resource Manager,它是整个资源的管理者,管理整个集群的 CPU、内存、磁盘等资源;
- 2)监控 Node Manager;
- 3)启动或监控 Application Master;
- 4)资源的分配和调度;
Resource Manager
- 1)管理单个节点上的资源,Node Manager 是当前节点资源的管理者,当然它也需要跟 Resource Manager 汇报;
- 2)处理来自 Resource Manager 的命令;
- 3)处理来自 Application Master 的命令;
Application Master
- 1)某个任务的管理者。当任务在 Node Manager 上运行的时候,就是由 Application Master 负责管理,因为每个任务都会对应一个 AM;
- 2)负责数据的切分;
- 3)为应用程序申请资源并分配给内部的任务;
- 4)任务的监控与容错;
Container
- Container 是 YARN 中资源的抽象,它封装了节点上的多维度资源,如内存、CPU、磁盘、网络等等。其实 Container 是为 Application Master 服务的,因为任务在运行的时候,需要的内存、cpu 等资源都被虚拟化到 Container 里面了。
所以整体流程如下:
- 1)客户端向 Resource Manager 提交作业;
- 2)RM 为客户端提交的作业在 Node Manager 上分配一个 Container,来运行作业对应的 Application Master;
- 3)AM 启动之后要注册到 RM 当中,因为 RM 是负责全局的资源管理,而且注册之后客户端可以通过 RM 来查询作业运行的情况。并且在注册之后, 还要向 RM 申请资源;
- 4)申请到资源之后,AM 便要求 NM 启动 Container,在 Container 里面运行 Task;
整个流程并没有那么复杂,并且在运行过程中 AM 是知道每一个 Task 的运行情况的,这便是 MR 在 YARN 上的执行流程。注意:不仅仅是 MapReduce,还有 Spark,它们除了 AM 不同之外,整体的执行流程是没有任何区别的。
YARN环境部署
Hadoop2.x 自带 YARN,如果我们想启动 YARN,还是要先修改配置文件。下面看看 YARN 的单节点部署。
修改 yarn-env.sh
和 hadoop-env.sh 一样,配置 JAVA_HOME。
修改 yarn-site.xml
<!--reducer获取数据的方式--> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!--指定yarn的ResourceManager的地址--> <property> <name>yarn.resourcemanager.hostname</name> <value>主机名</value> </property>
修改 mapred-env.sh
老规矩,遇到 -env.sh 都是配 JAVA_HOME。
修改 mapred-site.xml
你会发现 hadoop 没有提供这个文件,不过有一个 mapred-site.xml.template,我们可以拷贝一份。
<!--指定MR运行在yarn上--> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>
修改完毕,下面启动集群。
我们看到多出了两个进程,分别是RM和AM,关于YARN我们还可以通过webUI的方式查看,端口是8088。
里面包含了很多关于节点的信息以及任务的信息,注意图中的 Active Nodes,我们看到目前有一个节点存活。因此在运行任务的时候,可以多通过 webUI 的方式关注一下节点和任务的信息。
但如果你还想查看程序的历史运行情况,那么还需要额外配置一个参数,修改 mapred-site.xml。
<!--指定历史服务端地址--> <property> <name>mapreduce.jobhistory.address</name> <value>主机名:10020</value> </property> <!--历史web端地址--> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>主机名:19888</value> </property>
- 通过 sbin/mr-jobhistory-daemon.sh start historyserver 启动历史服务器;
- 然后通过 http://ip:19888/jobhistory 查看;
最后是提交作业到 YARN 上运行,但这里我们就不说了,因为还是那句话,现在基本上很少直接使用 MR 进行编程了。在后续学习 Spark 的时候,我们会学习如何将作业提交到 YARN 上运行,至于 MapReduce 我们了解一下它的概念即可。
而 YARN,它是一个资源管理器,我们需要掌握它的整体架构,在未来学习 Spark 的时候依旧需要了解 YARN。所以到时候我们再说如何将作业提交到 YARN 上执行吧。
小结
如果想走进大数据的大门,那么 Hadoop 是必须要了解的。在了解完 Hadoop 之后,下一个目标就是 Hive,因为使用 MapReduce 编程其实是很不方便的。而 Hive 则是可以让我们像写 SQL 一样来进行 MapReduce 编程,并且很多公司都在使用 Hive 这个大数据组件,我们以后再介绍。
最后我这里搭建的 Hadoop 集群其实是个伪集群,你也可以多找几个节点搭建一个完全分布式集群。当然在工作中,这一块应该是由专业的大数据团队负责,我们只需要了解它的原理以及使用即可。