1、MapReduce理论
1.1、MapReduce是什么?
MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据);MapReduce是谷歌公司在研究如何处理海量数据所提出的一种面向大规模数据处理的并行计算模型和方法。
1.2、MapReduce概述
MapReduce是一个计算框架,用于对大数据进行处理,它的主要思想就是“分而治之”;整个MapReduce计算过程可以分为Map(映射)阶段和Reduce(缩减阶段);一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对Map的输出先进行排序, 然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统(HDFS)中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。
应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。
一个Map/Reduce 作业的输入和输出类型如下所示:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。
- Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序 (例如:Shell工具)来做为mapper和reducer。
- Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。
1.3、MapReduce核心功能描述
1.3.1、MapReduce执行流程
Step 1:从本地HDFS读取文件输入的内容,每个输入文件被切分成一定大小的数据块Block(1.0版本默认64M);
Step 2:每个被切分的数据块Block会产生对应的Map任务,用户可以定义map函数,对被切分的数据使用map函数解析成一个key/value格式的数据;
Step 3:对每个map任务得到的key/value格式的数据按照不同的分区,通过网络传输到不同的Reduce节点;
Step 4:对多个map任务输出的结果进行排序、合并,然后通过reduce函数进程处理得到最新的key/value结果;
- Step 5:将Reduce输出的结果保存到文件系统中。
1.3.2、Map和Reduce的任务数量
在整个hadoop的MapReduce计算过程中,需要多少个map和reduce呢?
- map的任务数量
Map的任务数量通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。
Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU 消耗较小的map任务可以设到300个左右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟;
例如:输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,通过 setNumMapTasks(int)可以修改map数值。
- reduce的任务数量
Reduce的任务数量建议是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。
用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。
增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。
2、MapReduce案例
2.1、使用shell命令演示“mapper”和“reducer”
对一个英文文本的单词进行统计,shell命令行处理思路如下:
- 打开文件:
cat file
- 将文本的空格替换成换行符:
tr ' ' '\n'
- 将替换空格文本的单词进行排序:
sort -k 1
- 将排序后的文本显示每行出现单词的数字:
uniq -c
- 将文件中的数字(value)和单词(key)互换位置:
awk '{print $2"\t"$1}'
: - 将替换键值的文本相同的词进行汇总并按大小进行排序:
sort -k2 -nr
- 将结果保存为新的文件:
newfile
以上步骤中,第2-5步相当于mapper,第6步为reducer。
对文本The_Man_of_Property的单词进行统计;
$ cat The_Man_of_Property |tr ' ' '\n'|sort -k 1|uniq -c|awk '{print $2 "\t"$1}'|sort -k2 -nr|head -n 20
the 5144
of 3407
to 2782
and 2573
a 2543
he 2139
his 1912
was 1702
in 1694
had 1526
that 1273
with 1029
her 1020
— 931
at 815
for 765
not 723
she 711
He 695
it 689
以上显示了前20行的结果,需要将计算的结果保存下来,追加到文件即可
$ cat The_Man_of_Property |tr ' ' '\n'|sort -k 1|uniq -c|awk '{print $2 "\t"$1}'|sort -k2 -nr > wordcountretult.txt
2.2、使用Streaming运行MapReducer程序
使用python分别编写mapper和reducer,通过Hadoop Streaming运行程序;
2.2.1 Hadoop Streaming
Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper /bin/cat \
-reducer /bin/wc
-file /bin/cat
-file /bin/cat
- -input:输入文件的路径,从HDFS上读取文件
- -output:计算结果的输出路径,保存到HDFS上
- -mapper:可执行的程序或脚本
- -reducer:可执行的程序或脚本
2.2.2 wordcount by Python
使用Python编写“mapper.py”和“reducer.py”:
- mapper.py从标准输入读取文件;
- mapper.py将文本的单词处理成key/value的形式;
- reducer.py的标准输入就是mapper.py的输出;
- reducer.py将读入的单词进行归并整理,最终梳理为所有单词的汇总输出到文件;
注:以下的程序运行在hadoop 1.2.1上;
2.2.2.1、mapper by python
vim mapper.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print "%s\t%s" %(word, 1)
从标准输入读入文件,遍历每一行字符;并对每一行字符去除特殊字符,然后以空格作为分隔符进行遍历输出word:1
的key/value的格式;
文本The_Man_of_Property执行maper.py程序,显示前10行:
$ cat The_Man_of_Property |python mapper.py |head
Preface 1
“The 1
Forsyte 1
Saga” 1
was 1
the 1
title 1
originally 1
destined 1
for 1
2.2.2.2、reducer by python
mapper输出作为reducer的输入,因此,reducer需要对word:1
的格式进行处理,排序、相同单词归并,最终输出每个单词的汇总数据;
vim reducer.py
#!/usr/bin/env python
import sys
current_word = None
word_sum = 0
for line in sys.stdin:
word_list = line.strip().split('\t')
if len(word_list) < 2:
continue
word = word_list[0].strip()
word_value = word_list[1].strip()
if current_word == None:
current_word = word
if current_word != word:
print "%s\t%s" %(current_word, str(word_sum))
current_word = word
word_sum = 0
word_sum += int(word_value)
print "%s\t%s" %(current_word, str(word_sum))
word_sum = 0
reducer从mapper标准输出读入数据,对相同单词进行汇总;
2.2.2.3、本地测试mapper和reducer
在本地上先测试的流程方法:cat inputfile|mapper.py|sort|reducer.py
cat The_Man_of_Property |python mapper.py |sort -k1|python reducer.py
2.2.2.4、hadoop(hadoop-1.2.1)上测试
- 文本The_Man_of_Property上传到HDFS上
- mapper.py和reducer.py增加执行权限
hadoop fs -mkdir /user/input
hadoop fs -mkdir /user/output
hadoop fs -copyFromLocal The_Man_of_Property /user/input/
chmod +x mapper.py reducer.py
- 编写运行streaming的shell脚本:
vim run.sh
#!/bin/bash
HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_FILE_PATH="/user/input/The_Man_of_Property"
OUTPUT_FILE_PATH="/user/output/output00"
#
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_FILE_PATH \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file ./mapper.py \
-file ./reducer.py
- 给
run.sh
增加执行权限,运行程序:
chmod +x run.sh
./run.sh
packageJobJar: [./mapper.py, ./reducer.py, /home/hadoop/app/hadoop/hadoop-1.2.1/tmp/hadoop-unjar8638446041851116131/] [] /tmp/streamjob3031434578661633957.jar tmpDir=null
18/01/16 13:49:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
18/01/16 13:49:17 WARN snappy.LoadSnappy: Snappy native library not loaded
18/01/16 13:49:17 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/16 13:49:17 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/app/hadoop/hadoop-1.2.1/tmp/mapred/local]
18/01/16 13:49:17 INFO streaming.StreamJob: Running job: job_201801150959_0008
18/01/16 13:49:17 INFO streaming.StreamJob: To kill this job, run:
18/01/16 13:49:17 INFO streaming.StreamJob: /home/hadoop/app/hadoop/hadoop-1.2.1/libexec/../bin/hadoop job -Dmapred.job.tracker=http://192.168.30.50:9001 -kill job_201801150959_0008
18/01/16 13:49:17 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_201801150959_0008
18/01/16 13:49:18 INFO streaming.StreamJob: map 0% reduce 0%
18/01/16 13:49:25 INFO streaming.StreamJob: map 100% reduce 0%
18/01/16 13:49:33 INFO streaming.StreamJob: map 100% reduce 33%
18/01/16 13:49:36 INFO streaming.StreamJob: map 100% reduce 100%
18/01/16 13:49:38 INFO streaming.StreamJob: Job complete: job_201801150959_0008
18/01/16 13:49:38 INFO streaming.StreamJob: Output: /user/output/output00
- 查看输出结果
hadoop fs -ls /user/output/output00 Found 3 items -rw-r--r-- 2 hadoop supergroup 0 2018-01-16 13:49 /user/output/output00/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2018-01-16 13:49 /user/output/output00/_logs -rw-r--r-- 2 hadoop supergroup 181530 2018-01-16 13:49 /user/output/output00/part-00000
计算的结果输出到文件:/user/output/output00/part-00000,可以将文件下载到本地,和使用shell命令处理的结果是相同的。
$ hadoop fs -get /user/output/output00/part-00000 ./ $ cat part-00000 |sort -k2 -nr|head the 5144 of 3407 to 2782 and 2573 a 2543 he 2139 his 1912 was 1702 in 1694 had 1526
$ head wordcountretult.txt
the 5144
of 3407
to 2782
and 2573
a 2543
he 2139
his 1912
was 1702
in 1694
had 1526
3、遇到的问题
使用streaming运行python编写的mapper和reducer的是出现了如下错误:
ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask
因为本地验证正常,可以确定的是mapper.py和reducer.py程序应该没有问题;使用 -file 指定mapper和reducer的路径(相对路径或者绝对路径都可以)再一次运行,没有报错了。
本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2061545