MapReduce —— 历久而弥新(2)

简介: MapReduce —— 历久而弥新(2)

其他细节

除了 Mapper 和 Reducer 这两个最基本的源语,该系统还提供了一些其他的后来事实上也成为标配的扩展:Partitioner,Combiner 和 Reader/Writer


作者:木鸟杂记,转载请注明出处:https://www.qtmuniao.com/2019/05/26/gfs/


Partitioner

默认来说,对 Map 输出的中间结果进行划分会使用类似于 hash(key) mod R 这种应用无关的划分算法。但是有时候用户有需求将特定的一些 keys 路由到同一个 Reduce Task,比如说中间结果的 key 是 URL, 用户想按网站 host 进行汇总处理。这时候就需要将系统的这部分路由功能开放给用户,以满足用户的定制需求。

Combiner

如果该 Job 针对所有中间结果的 reduce 的操作满足结合律,那么指定 Combiner 会很能提高效率。拿的 Word Count 来说,数值的加法无疑满足结合律,也就是说,同一个单词的频次,在 Map Task 输出后进行加和(在 Map Work 上),还是在 Reduce Task 中进行加和(在 Reduce Worker上),结果保持一致;而这样一来,由于一些中间结果对进行了 combine,Map Task 到 Reduce Task 间的传输数据量会小很多,从而提高整个 Job 的效率。

也可以看出,combine 函数一般和 reduce 函数是一样的,因为他们本质上是对 value set 执行了同一种操作,只不过执行时,执行的地点不一样,结合的顺序不一样。目的是为了减少中间结果传输量,加速任务执行过程。

Reader/Writer

如果不将定制输入输出的能力开放给用户,那么系统显然只能处理有限几种默认约定的格式。因此,readerwriter接口本质上是系统和现实繁杂的业务之间的适配器(Adaptor)。它们让用户可以自行指定数据的来源和去处按需要理解输入内容自由定制输出格式

有了这两个 Adaptor,系统才能适配更多的业务。一般来说,系统会内置提供一些常见的 Reader 和 Writer 的实现;包括按行读文本文件,读文件中键值,读数据库等等。然后用户可以实现这两个接口,进行更具体的定制。系统常通过类似这种常用脚手架+进一步定制能力来提供API,下面的 Counter 也是如此。

副作用

有些用户实现的 map/reduce 函数会有一些副作用,比如说在执行任务中间输出一些文件、写一些数据库条目等等。一般来说这些副作用的原子性和幂等性需要用户自己来处理。因为如果输出介质不纳入 MapReduce 系统,系统是没有办法保证这些输出的幂等性和原子性的。不过有的系统就这么干的,提供一些某种类型/介质的状态或者数据存储,纳入系统中,并且提供一些容错和幂等的性质。好像 MillWheel 有类似的做法。但这样会大大加重系统的复杂性。

跳过坏记录

如果用户代码有 bug 或者某些输入有问题,会导致 Map 或者 Reduce 任务在运行时崩溃。当然这些 bug 或者输入能修则修,但是有些情况由于第三方库或者输入的原因,不能够进行修复。而在某些类型的任务,比如说训练数据集清洗、大型统计任务,丢几个是可以容忍的。针对这种情况,系统会提供一种模式,在这种模式中会跳过这些 Record 记录的执行。

具体实现上来说,也比较简单。可以给每个输入 Record 给个唯一编号(单次任务内唯一就行);如果某个 Record 处理时挂掉了,就将其编号汇报给 Master。如果 Master 收到了某个 Record 超过一次的处理失败信息,就将其跳过。做的再细一点,还可以记下错误类型和信息进行比对,来确定这是否是一个确定性(deterministic)的错误,进而决定是否将其跳过。

单机执行

众所周知,分布式系统很难跟踪、调试;因为一个 Job 可能同时分散在数千台机器上进行执行。因此系统提供了本地运行 Job 的能力。可以针对小数据集输入对代码的正确性进行测试。由于在单机运行,就可以比较方便通过调试工具进行断点追踪

实现一个本地 mock 系统,一般来说比较简单。因为不需要考虑网络间状态通信,代码多节点分发,多机调度等一系列分布式系统的问题。但却能极大方便代码调试,性能测试和小数据集测试。

状态信息

对于分布式执行的 Job,一个任务进度等信息可视化界面(给系统集成一个 HTTP 服务,实时拉取系统信息进行展示)有时候是至关重要的,它是系统易用性的关键。如果系统用户不能够很方便的实时监控任务的运行进度、执行速度、资源用量、输出位置,出错信息以及其他一些任务的元信息,就不能对任务的执行状况有个感性的把握。尤其是如果写 MapReduce 程序的人和跑这些程序的不是一个人时,会更为依赖这些状态的实时呈现。

因此,对于分布式系统来说,其易用性有一大半落在一个良好的系统信息呈现上。使用者需要据此来预测任务的完成时间、资源的吃紧程度等等,从而做出相应决策。

此外,对与集群机器状态信息,也需要进行跟踪,因为机器的负载信息、故障信息、网络状况等等对用户任务的执行也有不同程度的影响。给出这些机器状态信息,有助于对用户代码甚至系统代码进行出错诊断。

全局计数器

系统提供了一种计数服务,以统计某种事件的发生频次。比如用户想统计 Word Count 示例中所处理的全大写单词的总数:

Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
  for each word w in contents:
    if (IsCapitalized(w)):
      uppercase->Increment();
    EmitIntermediate(w, "1");

从代码可以大致猜测其实现:定义的时候,需要给 Counter 指定一个 Id。然后在 Map/Reudce 代码中可以通过该 Id 获取该 Counter 然后进行计数。每个 worker 机器上的计数信息会汇总到 Master 上,然后按 Counter 的 ID 进行加和,并且最终返回给用户。同时,前述展示状态信息页面也会将这些计数器进行可视化(比如说打折线图)。其中有个点需要注意,就是多次对重试的任务(由于机器死掉或者避免长尾进行的重试)的计次进行去重;可以按照 Map/Reduce ID 来进行去重,即我们假定同一输入的重试任务共享一个 Task ID(事实上为了满足重试需求和任务管理需求,分布式系统肯定会对所有任务进行唯一编号的),针对具有相同 Task ID 内部的 Counter 的计次,Master 只保留第一次成功的那一份;但是如果计数需要在页面上实时显示,可能就需要做适当信息保留,并且在该 Task 重试时进行计数回退之类的操作。

系统会自动维持一些计数器,比如说所有已经处理的键值对的数量和所有已经产生的键值对数量。全局计数操作对于某些应用用处很大,比如说有的应用要求所有输入键值对和输出键值对的数量一样,如果没有全局计数,就无从验证;或者统计一些数据的全局比例等等。

重排(shuffle) 操作

自 Spark 成名之后,shuffle 这个 MapReduce 中的语义得到了很多研究和实践。这是一个多机传输的耗时操作,其实现的高效性对系统的性能有着至关重要的作用,因此单独拿出一节来聊聊。

在 MapReduce 中就是指 Map Task 分片输出到 Reduce Task 按需拉取的这么一个过程。还拿 Word Count 为例,你想统计某个单词在所有文档中的总频次,但是这些单词分布在不同机器上的不同的 Map Task 输出里;而只有将所有同样单词的频次对聚集到同一台机器上,才能对其加和。这种将机器和子数据集对应关系按key打乱重组的操作,我们姑且称之为 shuffle。

在 Spark 中,基本上继承了该语义,并且更通用化了。一个常见的例子是 join,即将两个 Table 间具有相同 key 的记录路由到同一台机器上,从而在所有机器上按 key 分片进行并行 join,从而大幅提高效率。类似于 join 这样的高阶操作,会使得底层的 Partition 不能继续在本机运行而不与其他 Partition 发生联系,因此 shuffle 也是 Spark 中划分 Stage 的一个分水岭。

对于 MapReduce 系统来说,使用的 shuffle 策略类似于 Spark 中基于排序的 shuffle。Map 首先将中间结果写到内存中,然后定期刷盘,刷盘时进行归并排序。然后 Reducer 端按需拉取,从多个 Mapper 端拉取数据后,再次进行归并排序,然后传给 Reduce 函数。这样的好处在于可以进行大规模数据处理,因为可以做外部排序,也可以做迭代惰性加载。对于 Hadoop 的实现来说,将包含 shuffle 的整个流程分为了明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。

其他的一些点

一些缺点:通过 MapReduce 的系统设计可以看出,它是一个高吞吐,但是也高延迟的批量处理系统。并且不支持迭代。这也是后续 Spark,Flink 这样系统火热的动机。

文件系统: MapReduce 只有和 GFS 这样支持分块、多进程并发写的大文件系统配合才能发挥出更大的优势,优化输入和输出的性能。此外,这种分布式文件系统还会屏蔽底层节点故障。

组织形式: MapReduce 是一个系统,需要部署到集群上,但它同时又是一个库,让用户代码和分布式集群进行交互而不太用关心分布式环境中的问题的一个库。每个任务需要写任务描述(MapReduceSpecification),然后提交给系统——这是库常用的一种提交任务的手段。

代码分发:谷歌的 MapReduce 具体实现不得而知。猜测可以有两种方式:一是将 MapReduce 库代码 + 用户代码整体在不同机器上 fork ,然后根据角色不同来执行不同分支。二是将各个机器上的服务先启动起来,然后执行任务时只会将用户自定义函数序列化后传输到不同机器。

名词释义

Intermediate result:map 函数产生的中间结果,以键值对形式组织。

Map Task :这个应该都是指的 Worker 机器上,执行 map 函数的工作进程。

Map Worker:Map Task 所运行的 Worker 机器。所有 Worker 应该是没有角色标记的,既可以执行 Map Task,也可以执行 Reduce Task,以充分的利用机器性能。

参考文献

[1] Jeffrey Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters

[2] Alexey Grishchenko, Spark Architecture: Shuffle

[3] JerryLead, Spark internals

相关文章
|
分布式计算 数据处理
38 MAPREDUCE中的其他应用
38 MAPREDUCE中的其他应用
62 0
|
分布式计算
37 MAPREDUCE中的DistributedCache应用
37 MAPREDUCE中的DistributedCache应用
44 0
|
存储 分布式计算 监控
19 为什么要MAPREDUCE?
19 为什么要MAPREDUCE?
72 0
|
数据采集 机器学习/深度学习 存储
E-MapReduce
E-MapReduce(简称EMR)是阿里云提供的一项大数据处理服务,它基于开源的 Apache Hadoop 和 Apache Spark 构建,并提供了易于使用的 Web 界面和 API 接口,方便用户快速创建、调度和管理大数据处理作业。
266 2
|
分布式计算 并行计算 大数据
初识MapReduce
初识MapReduce
95 0
|
缓存 分布式计算 NoSQL
MapReduce(二)
MapReduce(二)
110 0
MapReduce(二)
|
存储 分布式计算 资源调度
|
存储 分布式计算 资源调度
|
存储 缓存 分布式计算
MapReduce —— 历久而弥新(1)
MapReduce —— 历久而弥新(1)
184 0
MapReduce —— 历久而弥新(1)
|
分布式计算 Hadoop Java
MapReduce使用
MapReduce使用
115 0
MapReduce使用