Mapreduce概念及流程介绍

简介: Mapreduce概念及流程介绍

Mapreduce框架:


867e1ff7c0d4a4300cea7044ce517ed8.png

b4e2239478a15dbe0ab0b6bbb5926749.png



MapReduce过程


cd6feedf1d20aff062067c279a737220.png

75cdb71122000f274627ddbe30549d6c.png



MapReduce是采用一种分而治之的思想设计出来的分布式计算框架,它由两个阶段组成:map阶段和reduce阶段。


在map阶段中:


首先读取HDFS中的文件,每个文件都以一个个block形式存在,block中的数据会被解析成多个kv对,然后调用map task的map方法;

map方法对接收到的kv对进行分片处理,转换输出成新的kv对;

然后对kv对进行分区和排序压缩,中间会涉及到map的shuffle过程,最后存入本地磁盘,供后续reduce task作为输入参数使用。


在reduce阶段中:


reduce方法将多个map task的输出,按照不同的分区传输到不同的reduce 上进行合并排序,这中间也涉及到reduce的shuffle过程(shuffle就像洗扑克牌一样)

然后reduce方法对输入的键值对汇总计算,输出计算结果

最后把reduce的输出保存在HDFS文件中。


Mapreduce概念


一、MapReduce编程模型

思想:分成两个阶段,map阶段和reduce阶段;


map阶段将大的任务,切分成小任务,分散到不同的服务器中,并行的执行(减少每个服务器的计算负载,同时减少整体的计算时间); reduce阶段,将map阶段的结果做汇总


map阶段的输入、输出都是以kv对的形式;输出结果写入map任务所在服务器的本地磁盘 reduce阶段的输入来自map阶段的输出;reduce的输出结果也是kv对,默认存储到HDFS中


二、MapReduce编程示例

1、写三个主要的代码 自定义的map类 在map方法中,编写业务代码 自定义的reduce类 在reduce方法中,编写业务代码 程序入口main 编写程序入口


2、可以本地运行


3、集群运行


三、combiner combiner的原理,map端本地聚合 其本质是一个reducer类


四、shuffle

shuffle主要指的map端的输出,然后如何传输到reduce task的过程


shuflle中的执行顺序是先分区,然后在溢写之前进行排序,最后溢出的多个磁盘文件会进行合并成一个大文件。


1.map端: 输出是kv对,先写入环形缓冲区(100M),当达到80%后,缓冲区中的数据开始溢出写磁盘文件; 溢出过程中,有以下几个操作: 1、根据reduce个数对kv对做分区;


2、每个分区中,根据kv对的key做排序;


3、若有map端本地聚合combine,则对每个分区中,同组数据(默认指key相同的kv对)的values做聚合 在溢出写的过程中,可以继续向环形缓冲区写入数据;但是若写入速度大于溢出写的速度,当100m占满后,暂停向环形缓冲区中写入的过程;只执行溢出写的过程;直到全部溢出写到磁盘,才恢复向缓冲区写入。


随着不断的溢出写磁盘文件,本地磁盘会生成多个溢出文件,在map task完成之前,溢出文件会被合并成一个已分区、已排序的输出文件


4.reduce端: reduce task会在每个map task运行完成后,获得map task输出中,属于自己的分区数据(许多kv对); 数据先在reduce的jvm内存中,如果数据占空间变大,则合并后溢出写磁盘文件;若指定了combine,在合并时,运行它,减少写入磁盘的数据量。随着溢出文件的增多,会合并文件 所有map task复制完成后,进入“合并阶段”;维持原顺序排序每组合并后的数据,调用一次reduce方法


Linux shell统计文章中出现频次最高的单词:

]# cat 文件名| grep -Eo --color '\w+' | sort | uniq -c | sort -k1 -nr | head -1
解释:-e 使用正则 -o 只打出匹配到的字符 \w+ 匹配字母
uniq 去重 ,uniq -c 打印频次,会显示两列 第一列 频次 ,第二列数据
sort -k1 代表第一列 -n按数字排序 ,默认由小到大 -nr由大到小排序


MR编程


1.wordcount: 针对实际业务场景(一个用户(user_id)的评价量,点击量,收藏量等等)

#!/usr/bin/env python
import sys
for line in sys.stdin:   
    line = line.strip()   
    words = line.split()   
    for word in words:     
        print "%s\t%s" % (word, 1)
#!/usr/bin/env python
from operator import itemgetter
import syscurrent_word = Nonecurrent_count = 0
word = None
for line in sys.stdin:  
    line = line.strip()  
        word, count = line.split('\t', 1) 
        try:    
            count = int(count)  
        except ValueError:    
            continue  
        if current_word == word:   
             current_count += count  
        else:    if current_word:      
            print "%s\t%s" % (current_word, current_count)    
            current_count = count    
            current_word = word 
        if word == current_word:
             print "%s\t%s" % (current_word, current_count)


2.mr实现单词idf统计

map.py
import sys
for line in sys.stdin: 
 ss = line.strip().split('\t')  
if len(ss) != 2:   
continue  
file_name, file_content = ss  
word_list = file_content.strip().split(' ')  
word_set = set(word_list)  
for word in word_set:    
print '\t'.join([word, '1'])
red.py
[root@master tfidf_test]# cat red.py
import sysimport math
current_word = Nonesum = 0
docs_cnt = 508
for line in sys.stdin:  
    ss = line.strip().split('\t')  
    if len(ss) != 2:    
        continue  
        word, val = ss  
    if current_word == None:    
        current_word = word  
    if current_word != word:   
        idf = math.log(float(docs_cnt) / (float(sum) + 1.0))   
        print '\t'.join([current_word, str(idf)])   
        current_word = word    
        sum = 0   
        sum += int(val)
        idf = math.log(float(docs_cnt) / (float(sum) + 1.0))
        print '\t'.join([current_word, str(idf)])输出当前单词+idf分
目录
相关文章
|
7月前
|
机器学习/深度学习 分布式计算 Hadoop
通过比喻理解-MapReduce的数据处理流程
通过比喻理解-MapReduce的数据处理流程
86 0
|
7月前
|
存储 分布式计算 数据处理
请描述一下MapReduce的工作流程。
请描述一下MapReduce的工作流程。
51 0
|
7月前
|
存储 分布式计算 资源调度
干翻Hadoop系列文章【03】:MapReduce概念详解
干翻Hadoop系列文章【03】:MapReduce概念详解
|
缓存 分布式计算 监控
架构师带你细细的捋一遍MapReduce全流程【附调优指南】
架构师带你细细的捋一遍MapReduce全流程【附调优指南】
|
存储 分布式计算 监控
Hadoop, Hadoop涉及到的一些常见概念(分布式与集群、HDFS、MapReduce等),Hadoop怎么用?
Hadoop, Hadoop涉及到的一些常见概念(分布式与集群、HDFS、MapReduce等),Hadoop怎么用?
512 0
|
分布式计算
MapReduce 的原理、流程【重要】
MapReduce 的原理、流程【重要】
192 0
|
分布式计算 资源调度 Hadoop
大数据技术之Hadoop(MapReduce核心思想和工作流程)
大数据技术之Hadoop(MapReduce核心思想和工作流程)
338 0
大数据技术之Hadoop(MapReduce核心思想和工作流程)
|
存储 XML 缓存
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
Hadoop中的MapReduce框架原理、Job提交流程源码断点在哪断并且介绍相关源码、切片与MapTask并行度决定机制、MapTask并行度决定机制
|
分布式计算 Hadoop 开发者
MapReduce 工作流程(面试重点)| 学习笔记
快速学习 MapReduce 工作流程(面试重点)
181 0
MapReduce 工作流程(面试重点)| 学习笔记
|
分布式计算 资源调度 Hadoop
Hadoop之MapReduce03【wc案例流程分析】
Hadoop之MapReduce03【wc案例流程分析】
Hadoop之MapReduce03【wc案例流程分析】