前情回顾
MapReduce
1.MapReduce 编程模型
1.1 基础
1.1.1 是什么?
是谷歌开源的一种大数据并行计算编程模型,它降低了并行计算应用开发的门槛。
1.1.2 设计目标
1) 分而治之。采用分布式并行计算,将计算任务进行拆分,由主节点下的各个子节点共同完成,最后汇总各子节点的计算结果,得出最终计算结果。
2) 降低分布式并行编程的使用门槛。
1.1.3 特点
1) 简单,容易使用。
只需要简单的实现一些接口,就可以完成一个分布式程序。
2) 扩展性强。
算力不足,加机器,就是这么简单粗暴。
3) 高容错性。
某个节点挂掉,
Hadoop
可以自动将计算任务转移到另外一个节点上进行,不会使作业因为某个节点挂掉而整体失败。更多精彩文章请关注公众号『Pythonnote』或者『全栈技术精选』
4) 可离线计算 PB
量级的数据。
不适合实时处理,延迟较高。
1.1.4 避免以下场景使用
1) 实时计算。
因为延迟高,不适用实时计算。
2) 流式计算。
MapReduce
输入数据集要求是静态的,而流式计算则要求动态数据。
3) 有向图计算。
有向图计算就是多个应用程序存在依赖关系,后一个应用程序的输入为前一个应用程序的输出。如果这种情况下使用
MapReduce
,会造成大量中间结果的磁盘IO
,影响性能。
1.2 编程模型
1.2.1 概述
MapReduce
可分为 Map
和 Reduce
两阶段,我们需要实现这两个函数来完成分布式程序设计。
1) map()
函数输入值为键值对,输出值为新的一组键值对。而 MapReduce
框架会自动将这些输出结果按照键 key
进行聚集(默认策略,也可以自定义聚集策略),键相同的数据被统一交给 reduce
函数。
2) reduce()
函数输入值为聚集后的键值对(键值对类似于 key: [value1, value2, value3 ...]
),输出值为一组新的键值对。最后将最终结果写入 HDFS
。
1.2.2 示例 - 统计词频
需求:统计文件中每个单词出现的次数。
map()
函数接收键值对(文件名: 文本内容),然后将文本内容中的单词都以新键值对输出(类似于 hadoop: 1
这种形式,遇到一个单词就将其转换成这样的键值对)。最终结果可能是这样的:
<hadoop: 1><mapreduce: 1><hdfs: 1>...
然后 MapReduce
框架合并相同键的键值对,就变成了如下的样子:
<hadoop: [1, 1, 1, 1]><mapreduce: [1, 1]>...
reduce()
函数接收一个键值对(<hadoop: [1, 1, 1, 1]>
),并将其值(数组)进行累加,然后将结果 <hadoop: 4>
新的键值对输出,从而得出词频。更多精彩文章请关注公众号『Pythonnote』或者『全栈技术精选』
部署步骤:
1) 上传程序与测试文件数据
2) 提交 MapReduce
作业到集群中运行
3) 查看作业输出结果
2.MapReduce 组件
2.1 Combiner
Hadoop
框架一般使用 Mapper
将数据处理成键值对,然后在网络节点间对其进行整理,最后使用 Reducer
处理数据并进行最终输出。Combiner
可以有效的减少 maptask
和 reducetask
之间传输的数据量(减少网络带宽),减轻 Reducer
上的计算负载。
简单的来说,就是在 Reducer
之前进行预处理,减轻它的压力。
注意:Combiner
是在每个 maptask
所在的节点运行,而 Reducer
是接收全部的 Mapper
输出结果。Combiner
的输出结果需要与 Reducer
的输入结果类型等对应。Combiner
的使用原则是有或者没有都不影响业务逻辑。更多精彩文章请关注公众号『Pythonnote』或者『全栈技术精选』
2.2 Partitioner
它是负责划分数据的类,可以将不同的数据分配给不同 reducetask
执行。MapReduce
默认的 Partitioner
是 Hash Partitioner
,也就是先计算 key
的散列值(一般为 md5
值),然后通过 Reducer
个数进行取模运算。这样既可以随机的将整个 key
空间平均分配给每个 Reducer
,还可以确保不同的 Mapper
产生的相同的 key
能被分配到同一个 Reducer
。公式为:
hashcode%reducetask # hashcode 是散列值 # reducetask 是 reducer 个数
2.3 自定义 Record Reader
Record Reader
表示从分片中读取一条记录的方式。每读取一条记录都会调用一次 Record Reader
类。系统默认的类是 Line Record Reader
,它以每行的偏移量作为 map
输入的键,以每行的内容作为 map
输入的值。这种情况就满足不了大多数情况,比如我们前面统计词频的例子需要以文件名为键,这时就需要自定义类。
3.MapReduce 高级应用
3.1 join
我们可以借助 Hive
、Spark SQL
等框架来实现 join
操作。(不用怀疑,它就是 SQL
里面实现的那个 join
)那么如何自己实现呢?
MapReduce Map
端 join
实现原理:
1) Map
端读取所有的文件,并在输出的内容里加上标识(代表数据是从哪个文件里来的)。
2) 在 reduce
处理函数中,按照标识对数据进行处理。
3) 根据 key
用 join
来求出结果直接输出。
3.2 排序
在 MapReduce
中默认可以进行排序。
原理:key
为数字类型时,按照数字大小对 key
进行排序;key
为字符串类型时,按照字典顺序对字符串排序。
3.3 二次排序
默认情况是对 key
进行排序,但有时还需要对 value
进行排序,这就是二次排序。比如在键相同的情况下,按值进行排序。更多精彩文章请关注公众号『Pythonnote』或者『全栈技术精选』
3.4 小文件合并
前面我们说过小文件十分占用 HDFS
的存储空间,所以我们需要将小文件进行合并,输出为 Sequence File
。