引子
MapReduce 是谷歌 2004 年(Google 内部是从03年写出第一个版本)发表的论文里提出的一个概念。虽然已经过去15 年了,但现在回顾这个大数据时代始祖级别概念的背景、原理和实现,仍能获得对分布式系统的很多直觉性的启发,所谓温故而知新。
在Google 的语境里,MapReduce 既是一种编程模型,也是支持该模型的一种分布式系统实现。它的提出,让没有分布式系统背景的开发者,也能较轻松的利用大规模集群以高吞吐量的方式来处理海量数据。其解决问题思路很值得借鉴:找到需求的痛点(如海量索引如何维护,更新和排名),对处理关键流程进行高阶抽象(分片Map,按需Reduce),以进行高效的系统实现(所谓量体裁衣)。这其中,如何找到一个合适的计算抽象,是最难的部分,既要对需求有直觉般的了解,又要具有极高的计算机科学素养。当然,并且可能更为接近现实的是,该抽象是在根据需求不断试错后进化出的海水之上的冰山一角。
需求
谷歌当时作为互联网的最大入口,维护着世界全网索引,最早触到了数据量的天花板。即,哪怕针对很简单的业务逻辑:如从爬虫数据中生成倒排索引、将图状网页集合用不同方式组织、计算每个主机爬取的网页数量、给定日期的高频查询词汇等等,在全球互联网数据的尺度的加成下,也变的异常复杂。
这些复杂性包括:输入数据分散在非常多的主机上、计算耗资源太多单机难以完成、输出数据需要跨主机进行重新组织。为此,不得不针对每个需求重复构造专用系统,并耗费大量代码在分发数据和代码、调度和并行任务、应对机器故障和处理通信失败等问题上。
抽象
map 和 reduce 的抽象灵感来自于函数式编程语言 Lisp,为什么选定这两个概念呢?这来源于谷歌人对其业务的高度提炼:首先输入可以切分成一个个逻辑的记录 (record);然后对其每个 record 执行某种*映射 (map) 操作,生成一些键值对组成的中间结果(为什么要分键和值呢?为最后一步做铺垫,允许用户将中间结果以任意指定的方式——键,来进行组织规约);最后在具有相同键的中间结果子集上执行规约(reduce* ,包括排序,统计,提取最值等等)操作。
函数式模型的另一个特点在于对 map 操作实现的约束,即规定用户应提供一个无副作用的 map 操作(相关概念有纯函数,确定性,幂等性等等,当然他们的概念并不一样,后面小结会详细讨论)。如此限制,好处有二,可以进行大规模并行执行,可以通过换地儿重试来屏蔽主机故障。
具体到落地上,map 和 reduce 都是用户自定义函数。map 函数接受一个 Record,不过为了灵活,一般也组织为键值对;然后产生 List[key, value],reduce 函数接受一个 key 和该 key 对应的所有中间结果 List[value]。即:
map (k1,v1) -→ list(k2,v2) reduce (k2,list(v2)) -→ list(v2)
拿由谷歌这篇论文提出,后来成为大数据处理界的 hello world 级别示例程序 Word Count (对一堆文档中的单词计数)来说,map 和 reduce 的实现长这样:
map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
这里有两个有意思的点:
- 中间变量需要网络传输,必然会涉及到序列化。MapReduce 的最初版本选择是一切解释权归运行时的用户代码所有,我只是傻傻的传 string。即,规定用户在 map 中将任何输出的中间结果对象都转换为 string,然后在 reduce 函数中接收该 Iterator[string] 后,自行解析为自己想要的格式。当然,在后来的模仿框架如 Hadoop 中,序列化和解序列化部分被拿了出来,可以由用户来自定义实现来满足功能或性能上的需求。没去查证,谷歌后来相比对此也做了优化。这是一种很自然的系统演化思路,初期设计尽量简单粗暴以求快速实现可用原型;在积累使用经验后再对某些不便模块进行扩展设计。
- reduce 接受 value 集合被组织为了迭代器(Iterator)。相信用过 Python 的同学应该对迭代器不陌生,它是一个很简单的接口,包括 next 和 stop 两个语义。配合 for loop ,构成一个很强大的抽象。不管你底层是一个内存中的 List、还是文件内容、还是网络 IO 流,只要能在运行时知道如何得到下一条记录,什么时候时候停止,都能被 for 循环来利用,进行逐一处理。迭代器抽象的一个好处在于,不必将待迭代的内容一次加载到内存,可以对数据增量式的惰性加载。MapReduce 框架的此处实现也正是利用了该特性。
实现概览
抽象定了,那么实现自然可以有不同,这也是接口和实现分离的意义所在。前者的抽象是一种思想,谷歌已经给你做了;后者的实现,完全可以根据自己的生产环境进行量体裁衣的来定制实现。谷歌在 paper 中给了一种内部经典版,Hadoop 也提供了一套通用版,当然我们也可以根据自己的业务需求和场景约束来实现一个合身版。
谷歌发布论文时 实现 MapReduce 所面对的系统环境长这样:
- 单机双核 x86 架构,2~4G内存,Linux 系统
- 通用网络硬件,百兆或者千兆以太网
- 集群含数百或者数千台机器,因此机器故障是常态
- 用的廉价 IDE 接口硬盘,但是人家有 GFS 来提供容错
- 多租户支持,因此需要做 Job 级别抽象和资源约束
流程概览
输入由用户指定切分大小后,切分成 M 份,然后分散到不同机器上(由于 GFS 的存在,也可能该输入 Block 本来就在那台机器上)。每个机器上会并行的跑用户定义的 map 。map 输出的中间结果,亦由用户指定,按 key 值范围切分为 R 份,对于每个中间结果,通过 node label = hash(key) mod R 决定其去处。下面是流程概览图:
- 首先把输入切分成 M 份,通常每份 16 ~ 64M,这个粒度用户按需进行把握;然后把这些分片分散到不同机器上(如果有GFS这种分布式文件系统配合的话,可能本来就分散着),然后在每个机器上 fork 一份用户的代码。对于用户代码分发,是一个有意思的话题,对于不同语言可能有不同实现,比如谷歌的C++,可能传输的是动态链接库;比如 Hadoop 的 Java 传的可能是 jar 包 (当然, 所有依赖也得传);如果是 PySpark 的 Python 呢,可能用的就是神奇的 cloudpickle;总之,不同语言需要考虑的传输机制是不一样的,比如说动态语言和静态语言;此外,全局变量和外部依赖也是需要考虑的点,谷歌虽然在此一笔带过,但是不同语言需要面对的情况可能差别很大。
- Master 的这份程序拷贝是不一样的,是负责安排活的。会选空闲的 worker 来安排这 M 个 map 任务和 R 个 reduce 任务。这需要考虑的是,worker 执行每个用户代码是单独启动一个进程,还是插入到系统 loop 中去执行。
- 执行 map 任务的 Worker,会读取被分配到的输入切片,解析出键值对流,送给用户定义的 map 函数。map 后产生的临时结果首先会缓存在内存中。虽然论文中没有展开,但可以预见的是,如何切片,如何解析出键值对流,不同用户对于同样的输入可能有不同的关注点,因此必然存在定制化解析的需求。这一部分(FileSplit 和 RecordReader)在稍后随着承载业务的增多,估计也会开放出来给用户自定义,事实上 Hadoop 就是这么干的。
- 缓存的中间结果会被定期在执行 Map Task 的机器本地进行刷盘(这也算一个本地性的优化,但是也有后果,就是一旦该机器故障,容错会稍微麻烦点,后面会说),并且按用户指定的 Partition 函数拆分成 R 个块,然后将这些位置信息发给 Master 节点。Master 负责通知相应的 Reduce Worker 以拉取对应数据。
- Reduce Worker 收到这些中间结果的位置信息后,会通过 RPC 来拉取其对应的 Partition 的数据。对于某个 Reduce Worker 来说,待所有数据拉取完成后,会将其按照 key 来进行排序。这样一来,所有具有同样 key 的数据便挨到了一块。第 4 步和第 5 步的过程合起来就是 shuffle,涉及到外部排序、多机数据传输等极其耗时操作;当数据量比较小时,如何实现都不成问题。但是当数据量大到一定程度,这里很容易成为性能瓶颈,因此也有一些优化方法,稍后会针对 shuffle 做详细展开,此处按下不表。
- 待中间数据排好序之后,Reduce Woker 会对其进行扫描,然后将一个个 key 和其对应的值集合,即 传给用户定制的 reduce 函数,然后将生成的结果追加到最终输出文件。对于谷歌来说,一般来说就是支持并行 append 的文件系统 GFS,好处在于可以多进程同时写结果。
- 当所有 reduce 任务完成后,master 会唤醒用户进程,即在用户代码的视角,MapReduce 任务是阻塞的。
一般而言,用户无需将最终结果的 R 个 Partition 进行合并,而是将其直接作为下一个 MapReduce 任务的输入。Spark RDD 的partition 就是将这一特点概念化了,并且将每一步 MapReduce 输出也放内存中,不进行落盘,以降低连续 MapReduce 任务的延迟。
局部性
计算机科学中常用的一个原理,叫做局部性原理 (locality reference,这里特指空间局部性),说的是程序在顺序执行时,访问了一块数据,接下来大概率会访问该数据(物理位置上)旁边的一块数据。很朴素的断言,却是一切 cache 发挥作用的基础,计算机存储因此也形成了由慢到快,由贱到贵,由大到小的存储层次体系(硬盘-> 内存 -> 缓存 -> 寄存器)。在分布式环境中,这个层次体系至少还要再罩上一层——网络IO。
在 MapReduce 系统中,我们也会充分利用输入数据的 locality。只不过这次,不是将数据加载过来,而是将程序调度过去(Moving Computation is Cheaper than Moving Data)。如果输入存在 GFS 上,表现形式将为一系列的逻辑 Block,每个 Block 可能会有几个(一般是三个)物理副本。对于输入每个逻辑 Block,我们可以在其某个物理副本所在机器上运行 Map Task(如果失败,就再换一个副本),由此来尽量减小网络数据传输。从而降低了延迟,节约了带宽。
Master 的数据结构
谷歌的 MapReduce 实现是有作业(Job)级别的封装的,每个 Job 包含一系列任务(Task),即 Map Task 和 Reduce Task。那么,我们要维护一个正在运行的 Job 的元信息,就势必要保存所有正在执行的 Task 的状态,其所在的机器 ID 等等。而且,Master 事实上充当了 Map Task 输出到 Reduce Task 输入的一个”管道“。每一个 Map Task 结束时,会将其输出的中间结果的位置信息通知 Master,Master 再将其转给对应的 Reduce Task,Reduce Task 再去对应位置拉取对应 size 的数据。注意,由于 Map Task 的结束时间不统一,这个通知->转发-> 拉取 的过程是增量的。那么不难推测出,reduce 侧对中间数据排序的应该是一个不断 merge 的过程,不大可能是等所有数据就位了再全局排序。
在分布式系统中,一个比较忌讳的问题就是单点。因为是牵一发而动全身,而 Master 就是这样一个单点。当然单个机器的统计平均故障率并不高,但是一旦故障,那么整个集群都将不可用。但同时,有一个 Leader 节点会大大简化分布式系统的的设计;因此采用单点 Master 的系统反而是主流,那势必需要开发一些其他手段来强化 master 的容错能力,比如说记 log + snapshot、比如说主从备份、比如说每次从 worker 心跳进行状态重建、比如说用其他实现了分布式一致性协议的系统来保存元信息等等。
容错
集群中有 Master 和 Worker 两种机器角色。
Worker 由于数量大,有机器故障概率较大。在分布式系统中,Master 获取 Workers 的信息,最常见便是心跳,既可以是 master ping worker,也可以反过来,也可以兼而有之。master 通过心跳发现某些 worker 不可到达后(可能是 worker 死掉了,也可能是网络出问题了等),就会将该 Worker 打个故障(failed)的标记。
之前已经调度到该故障 Worker 上的任务(Task) 很显然有两种类型: Map Task 和 Reduce Task。对于 Map Task(以下所提的 Task,肯定是从属于未结束的 Job) ,不管成功与否,我们都要进行重试,因为一旦该 Worker 变为不可达,存于其上的中间结果也随之无法被 Reduce Task 获取。当然,我们可以在 Master 中多记点状态来减少对已完成的 Map Task 进行重试的概率。比如记下某个 Map Task 的输出是否已经都被 Reduce Task 拉取,以决定要不要对正常完成的 Map Task 进行重试,但无疑会提高系统复杂度。工程往往会对环境做某些假设, 以简化某些实现;我们假设 worker 失败率不是那么高,或者重试所有 Map Task 的代价可以忍,那么就可以简化一点实现,以保持系统的简约,减少可能的 bug。对于 Reduce Task,未完成的无疑要进行重试,已经完成的,由于其输出结果我们假设会写到全局分布式系统文件中(即某些机器挂了也不影响),就不会重试。
具体重试的方法,可以标记需要重试的 Task 的状态为 idle,以告诉调度器,该 Task 可以重新被调度。当然,也可以实现为从一个(工作/完成)队列倒腾到另一个(就绪)队列,本质上是一样的,都是合理实现一个 Task 的状态机。
至于 master 的故障恢复,上一节稍有提到。如果在实践中 Master 确实很少死掉,并且偶尔死掉造成所有正在运行的任务失败的后果也可以接受,那么就可以粗暴的实现为如果 Master 死掉,就简单通知所有正在运行的任务的用户代码任务失败了(比如返回非 0 值),然后有用户代码自行决定丢弃任务还是待集群重启后进行重试:
MapReduceResult result; if (!MapReduce(spec, &result)) abort();
如果业务对于宕机时间有要求,或者大面积任务失败不可以忍受,那么就需要增强 Master 的容错性。常用的方法上节有提到,这里展开一下:
- snapshot + log:将 Master 的内存数据结构定期做快照(snapshot)同步到一个持久存储,可以写到外部存储,可以同步到其他节点,可以一份也可以多份。但是只做快照的话,时间间隔不好选择:间隔太长容易造成恢复时状态丢失过多;间隔过短会加重系统负载。因此常用的辅助手段是记 log,即对每个会改变系统的状态的操作进行记录。这样就可以选择长一点的快照间隔,恢复时,先加载快照,再叠加上快照点之后的日志。
- 主从备份。比如 Hadoop 原先的 secondary namenode,用属于不同容错阈两台机器都作为 Master,只不过一个用来响应请求,另一个用来实时同步状态。等某台机器故障发生时,立即将流量切换到另一个机器上。至于其同步机制,则是另一个可以考量的点。
- 状态外存。如果元数据量不大,可以用 Zookeeper 或者 etcd 这种实现了分布式一致性协议的集群来保存。由于这些集群本身具有容错能力,因此可以认为避免了单点故障。
- 心跳恢复:重新启动一个 Master 后,利用所有 Worker 报上来的信息进行 Master 数据结构的重建。
还值得一提的是,容错也需要用户侧代码做配合。因为框架会对不成功的 map/reduce 用户代码进行重试。这就要求,用户提供的 map/reduce 逻辑符合确定性(Deterministic):即函数的输出依赖且仅依赖于输入,而不取决任何其他隐形的输入或者状态。当然,这个蕴含了幂等性(Idempotency):多次执行和一次执行效果一样;但是幂等性并不能推出确定性;假设有这么一个函数,它第一次执行造成了一些状态改变(比如某些释放资源的 dispose 函数),而后续发现状态已经改变过了就不再改变该状态,那么它符合幂等性;但是由于其含有隐式状态输入,不是确定性的。
如果 map/reudce 函数是确定性的,那么框架就可以放心大胆重试了。某些条件下,幂等性也可以接受,比如保存隐式状态的地方很牢靠。举个栗子,我们依赖于一个文件锁做判断某个函数执行了一次或多次,如果该文件锁所依赖的文件系统很稳定,并且提供分布式一致性,那么就完全可以。如果是用 nfs 的一个文件做锁,来实现的所谓幂等性就值得商榷了。
如果 map/reduce 函数是确定性的,框架会通过其输出提交的原子性来进行幂等性保证。即,即使重试了多次,也和只执行了一次一样。具体来说,对于 Map Task,会产生 R 个临时文件,并在结束时将其位置发送给 Master;Master 在收到多次同一分片(split) 的位置信息时,如果该分片前面某次结果来源仍可用或者已经被消费,那么就忽略掉该请求后面的所有请求。对于 Reduce Task,其生成的结果也会先写成临时文件,然后依赖于底层文件系统的原子性的改名操作(原子性改名也是一个多进程竞争的经典的操作,因为生成文件过程比较长,不容易做成原子的,但是判断具有某名字的文件是否存在并改名却很容易做成原子的),在处理完成时改变为目的文件名。如果发现已经有一个具有该目的文件名的文件了,就放弃改名操作,从而保证了该 Reduce Task只有一个成功输出的最终文件。
任务粒度
一个 MapReduce Job 中会产生 M+R 个 Task,具体 M 和 R 的值在运行之前可以由人进行配置。不同的系统实现可能会有发挥出最佳系统性能的不同配比;但是同时要兼顾业务需求,比如输入大小,输出文件个数等等。
备份任务
在实际业务中,由于某些主机原因常会出现长尾效应,即少数几个 Map/Reduce Task 总是会巨慢的拖到最后,甚至拖得时间甚至是其他任务的几倍。造成这些主机拖后腿的原因可以举出很多,如:某个机器硬盘老化,读写速度几十倍的变慢;又比如调度器调度的有问题,导致某些机器负载太高,具有大量的 CPU、内存、硬盘和网络带宽的争抢;又比如软件 bug 导致某些主机变慢等等。
只要确定这些问题只发生在少数某些主机上,那么解决方法也很简单。在任务接近尾声的时候(比如统计剩余task的占比小于一个阈值时),对于每个仍然在跑的任务,分别额外调度一份到其他主机上,那么大概率会让这些任务提前完成,同一任务跑多次的处理逻辑,和容错重试造成跑多次是一致的,可以复用。
此外,我们可以通过实际业务检验来微调该阈值(包括怎么判定任务结尾,启用几个备份任务),在耗费额外资源代价和减少任务总用时之前取得一个平衡。