Hadoop MapReduce计算框架

简介:

1、MapReduce理论

1.1、MapReduce是什么?

MapReduce用于处理海量数据的分布式计算框架,是Hadoop生态中的核心之一(MapReduce用于计算海量数据,HDFS用于存储海量数据);MapReduce是谷歌公司在研究如何处理海量数据所提出的一种面向大规模数据处理的并行计算模型和方法。

1.2、MapReduce概述

MapReduce是一个计算框架,用于对大数据进行处理,它的主要思想就是“分而治之”;整个MapReduce计算过程可以分为Map(映射)阶段和Reduce(缩减阶段);一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对Map的输出先进行排序, 然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统(HDFS)中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

一个Map/Reduce 作业的输入和输出类型如下所示:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。

  • Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序 (例如:Shell工具)来做为mapper和reducer。
  • Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。

1.3、MapReduce核心功能描述

mapreduce

mapreduce-datanode

1.3.1、MapReduce执行流程

  • Step 1:从本地HDFS读取文件输入的内容,每个输入文件被切分成一定大小的数据块Block(1.0版本默认64M);

  • Step 2:每个被切分的数据块Block会产生对应的Map任务,用户可以定义map函数,对被切分的数据使用map函数解析成一个key/value格式的数据;

  • Step 3:对每个map任务得到的key/value格式的数据按照不同的分区,通过网络传输到不同的Reduce节点;

  • Step 4:对多个map任务输出的结果进行排序、合并,然后通过reduce函数进程处理得到最新的key/value结果;

  • Step 5:将Reduce输出的结果保存到文件系统中。

1.3.2、Map和Reduce的任务数量

在整个hadoop的MapReduce计算过程中,需要多少个map和reduce呢?

  • map的任务数量

Map的任务数量通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。

Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU 消耗较小的map任务可以设到300个左右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟;

例如:输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,通过 setNumMapTasks(int)可以修改map数值。

  • reduce的任务数量

Reduce的任务数量建议是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。

用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。

增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。

2、MapReduce案例

2.1、使用shell命令演示“mapper”和“reducer”

对一个英文文本的单词进行统计,shell命令行处理思路如下:

  1. 打开文件:cat file
  2. 将文本的空格替换成换行符:tr ' ' '\n'
  3. 将替换空格文本的单词进行排序: sort -k 1
  4. 将排序后的文本显示每行出现单词的数字:uniq -c
  5. 将文件中的数字(value)和单词(key)互换位置:awk '{print $2"\t"$1}'
  6. 将替换键值的文本相同的词进行汇总并按大小进行排序:sort -k2 -nr
  7. 将结果保存为新的文件:newfile

以上步骤中,第2-5步相当于mapper,第6步为reducer。

对文本The_Man_of_Property的单词进行统计;

$ cat The_Man_of_Property |tr ' ' '\n'|sort -k 1|uniq -c|awk '{print $2 "\t"$1}'|sort -k2 -nr|head -n 20
the 5144
of  3407
to  2782
and 2573
a   2543
he  2139
his 1912
was 1702
in  1694
had 1526
that    1273
with    1029
her 1020931
at  815
for 765
not 723
she 711
He  695
it  689

以上显示了前20行的结果,需要将计算的结果保存下来,追加到文件即可

$ cat The_Man_of_Property |tr ' ' '\n'|sort -k 1|uniq -c|awk '{print $2 "\t"$1}'|sort -k2 -nr > wordcountretult.txt

2.2、使用Streaming运行MapReducer程序

使用python分别编写mapper和reducer,通过Hadoop Streaming运行程序;

2.2.1 Hadoop Streaming

Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc
    -file /bin/cat
    -file /bin/cat
  • -input:输入文件的路径,从HDFS上读取文件
  • -output:计算结果的输出路径,保存到HDFS上
  • -mapper:可执行的程序或脚本
  • -reducer:可执行的程序或脚本

2.2.2 wordcount by Python

使用Python编写“mapper.py”和“reducer.py”:

  • mapper.py从标准输入读取文件;
  • mapper.py将文本的单词处理成key/value的形式;
  • reducer.py的标准输入就是mapper.py的输出;
  • reducer.py将读入的单词进行归并整理,最终梳理为所有单词的汇总输出到文件;

注:以下的程序运行在hadoop 1.2.1上;

2.2.2.1、mapper by python

vim mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" %(word, 1)

从标准输入读入文件,遍历每一行字符;并对每一行字符去除特殊字符,然后以空格作为分隔符进行遍历输出word:1的key/value的格式;

文本The_Man_of_Property执行maper.py程序,显示前10行:

$ cat The_Man_of_Property |python mapper.py |head 
Preface 1
“The    1
Forsyte 1
Saga”   1
was 1
the 1
title   1
originally  1
destined    1
for 1

2.2.2.2、reducer by python

mapper输出作为reducer的输入,因此,reducer需要对word:1的格式进行处理,排序、相同单词归并,最终输出每个单词的汇总数据;

vim reducer.py

#!/usr/bin/env python

import sys

current_word = None
word_sum = 0

for line in sys.stdin:
    word_list = line.strip().split('\t')
    if len(word_list) < 2:
        continue
    word = word_list[0].strip()
    word_value = word_list[1].strip()

    if current_word == None:
        current_word = word
    if current_word != word:
        print "%s\t%s" %(current_word, str(word_sum))
        current_word = word
        word_sum = 0
    word_sum += int(word_value)

print "%s\t%s" %(current_word, str(word_sum))
word_sum = 0

reducer从mapper标准输出读入数据,对相同单词进行汇总;

2.2.2.3、本地测试mapper和reducer

在本地上先测试的流程方法:cat inputfile|mapper.py|sort|reducer.py

cat The_Man_of_Property |python mapper.py |sort -k1|python reducer.py

2.2.2.4、hadoop(hadoop-1.2.1)上测试

hadoop fs -mkdir /user/input
hadoop fs -mkdir /user/output
hadoop fs -copyFromLocal The_Man_of_Property /user/input/
chmod +x mapper.py reducer.py
  • 编写运行streaming的shell脚本:
    vim run.sh
#!/bin/bash

HADOOP_CMD="/home/hadoop/app/hadoop/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/home/hadoop/app/hadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"

INPUT_FILE_PATH="/user/input/The_Man_of_Property"
OUTPUT_FILE_PATH="/user/output/output00"

# 
$HADOOP_CMD jar $STREAM_JAR_PATH \
                -input $INPUT_FILE_PATH \
                -output $OUTPUT_FILE_PATH \
                -mapper "python mapper.py" \
                -reducer "python reducer.py" \
                -file ./mapper.py \
                -file ./reducer.py
  • run.sh增加执行权限,运行程序:
chmod +x run.sh
./run.sh
packageJobJar: [./mapper.py, ./reducer.py, /home/hadoop/app/hadoop/hadoop-1.2.1/tmp/hadoop-unjar8638446041851116131/] [] /tmp/streamjob3031434578661633957.jar tmpDir=null
18/01/16 13:49:17 INFO util.NativeCodeLoader: Loaded the native-hadoop library
18/01/16 13:49:17 WARN snappy.LoadSnappy: Snappy native library not loaded
18/01/16 13:49:17 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/16 13:49:17 INFO streaming.StreamJob: getLocalDirs(): [/home/hadoop/app/hadoop/hadoop-1.2.1/tmp/mapred/local]
18/01/16 13:49:17 INFO streaming.StreamJob: Running job: job_201801150959_0008
18/01/16 13:49:17 INFO streaming.StreamJob: To kill this job, run:
18/01/16 13:49:17 INFO streaming.StreamJob: /home/hadoop/app/hadoop/hadoop-1.2.1/libexec/../bin/hadoop job  -Dmapred.job.tracker=http://192.168.30.50:9001 -kill job_201801150959_0008
18/01/16 13:49:17 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_201801150959_0008
18/01/16 13:49:18 INFO streaming.StreamJob:  map 0%  reduce 0%
18/01/16 13:49:25 INFO streaming.StreamJob:  map 100%  reduce 0%
18/01/16 13:49:33 INFO streaming.StreamJob:  map 100%  reduce 33%
18/01/16 13:49:36 INFO streaming.StreamJob:  map 100%  reduce 100%
18/01/16 13:49:38 INFO streaming.StreamJob: Job complete: job_201801150959_0008
18/01/16 13:49:38 INFO streaming.StreamJob: Output: /user/output/output00
  • 查看输出结果
    hadoop fs -ls /user/output/output00
    Found 3 items
    -rw-r--r--   2 hadoop supergroup          0 2018-01-16 13:49 /user/output/output00/_SUCCESS
    drwxr-xr-x   - hadoop supergroup          0 2018-01-16 13:49 /user/output/output00/_logs
    -rw-r--r--   2 hadoop supergroup     181530 2018-01-16 13:49 /user/output/output00/part-00000

    计算的结果输出到文件:/user/output/output00/part-00000,可以将文件下载到本地,和使用shell命令处理的结果是相同的。

    $ hadoop fs -get /user/output/output00/part-00000 ./
    $ cat part-00000 |sort -k2 -nr|head
    the 5144
    of  3407
    to  2782
    and 2573
    a   2543
    he  2139
    his 1912
    was 1702
    in  1694
    had 1526
$ head wordcountretult.txt 
the 5144
of  3407
to  2782
and 2573
a   2543
he  2139
his 1912
was 1702
in  1694
had 1526

3、遇到的问题

使用streaming运行python编写的mapper和reducer的是出现了如下错误:

ERROR streaming.StreamJob: Job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask

因为本地验证正常,可以确定的是mapper.py和reducer.py程序应该没有问题;使用 -file 指定mapper和reducer的路径(相对路径或者绝对路径都可以)再一次运行,没有报错了。



本文转自 巴利奇 51CTO博客,原文链接:http://blog.51cto.com/balich/2061545

相关文章
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
64 2
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
89 3
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
47 1
|
1月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
37 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
47 1
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
47 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
85 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
37 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
46 0
|
2月前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
63 0

相关实验场景

更多