基于WordCount详解MapReduce编程模型!

简介: 笔记

一、MapReduce编程模型概述


MR是分布式计算模型

MR整个并行计算过程中会抽象出两个函数:


map():它是对我们独立元素中的每一个元素进行并行计算操作的函数

reduce():它是对我们独立元素中的数据进行合并

一个简单 的MR程序,我们只需要指定map() reduce() input output,剩下的事情交个我们的框架来完成


关于MR的数据处理流程:


数据处理的阶段 input -> map -> reduce ->output

任务Job = Map+Reduce

Map的输出是Reduce的输入

数据处理过程中,MR要求的数据格式是以

是Map的输入,是Map的输出。

是Reduce的输入,是Reduce的输出。

MapReduce编程模型图:

image.png

  • k2 = k3,v3是一个集合,v3的元素就是v2,表示为v3=list(v2)
  • 所有的输入和输出的数据类型必须是Hadoop的数据类型
  • MapReduce处理的数据都是HDFS的数据(或HBase)

一个小案例,解释上面的流程

数据源:

a good beginning is half the battle
where there is a will there is a way

map阶段:

输入是,其中k1默认是指偏移量,类型是LongWritable,v1是指一行的文本内容,类型是Text。因为要统计单词的数量,所以k2是指单词,类型是Text。v2用数字1表示,即一个单词记一数,类型是IntWritable。

并行读取文本,获取,进行map操作,输出

读取第一行:

a good beginning is half the battle

在用户自己实现的map函数中,取到的v1即这一行的文本,以空格为分隔符对字符串进行拆分,获取到每个单词,并以的格式输出,得到:

<a,1> <good,1> <beginning,1> <is,1> <half,1> <the,1> <battle,1>

读取第二行:

where there is a will there is a way

得到:

<where,1> <there,1> <is,1> <a,1> <will,1> <there,1> <is,1> <a,1> <way,1>

Reduce阶段:

按MapReduce的理论得知,在Reduce阶段,输入,k3与k2一致,表示单词,类型是Text。v3是v2的集合,即List(v2)。同时k3默认已经按字典顺序排序,因此是:

<a,(1,1,1)> <battle,(1)> <beginning,(1)> <good,(1)> <half,(1)> <is,(1,1,1)> <the,(1)> <there,(1,1)> <way,(1)> <where,(1)> <will,(1)>

在用户实现自己的reduce函数中,读入并统计v3集合中每个单词的总数,并将输出,得到:

<a,3> <battle,1> <beginning,1> <good,1> <half,1> <is,3> <the,1> <there,2> <way,1> <where,1> <will,1>

整个MR的大致过程如下:

2.png


二、WordCount案例


统计一批英文文章中,每个单词出现的总次数。

现在有一个输入文件"Gone With The Wind",这个文件有三个block:block1, block2, block3。三个block的内容依次如下图。MR原理图

3.png

Map阶段


每一个block对应一个分片split (默认split与block一一对应)。

每一个split对应一个map任务(map task)。所以这里三个block将会对应三个map task(map1, map2, map3),这3个任务的逻辑完全一样。

以map1为例。map1会读取block1的数据,一次读取block1的一行数据,然后会产生一个kv对(其中,key是当前所读行的行首相对于当前block开始处的字节偏移量;value是当前行的内容;如假设当前所读行是第一行,那么当前行的内容是"Dear Bear River",则kv对是(0, “Dear Bear River”)),作为map()的参数传入,调用map()。

map()方法。将value当前行内容按空格切分,得到三个单词Dear|Bear|River,然后将每个单词变成键值对(Dear, 1)|(Bear, 1)|(River, 1),最终结果输出为文件,写入map任务所在节点的本地磁盘中(其中还有一个Shuffle的过程)。

block的第一行数据被处理完后,接着处理第二行,当map任务将当前block所有的数据全部处理完后,此map任务即运行结束。

Reduce阶段


reduce任务(reduce task)的个数由用户程序指定,main()内的job.setNumReduceTask(4)指定reduce任务是4个(reduce1, reduce2, reduce3, reduce4)。

以reduce1为例。reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端(拷贝阶段)。同样地,也会连接到map2,map3获取数据。最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一个组。4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入(其中还有一个Shuffle的过程,下文会详细讲解),调用reduce()。

reduce()方法。计算Dear的总数为4,并将(Dear, 4)作为键值对输出,最终结果输出成文件,写入HDFS。


三、MapReduce基于WordCount编程


(1)MapReduce基于本地调试

准备数据源

java python hadoop
spark spark java hadoop
hive hbase hbase hive
spark java python hadoop
linux unix java python
spark spark storm flink
java flink hue kafka kafka
java python zookeeper zookeeper
spark hive hbase hive

基于WordCount业务逻辑编写:

Map数据处理:

<0,hadoop  spark>    ->   split  ->   <hadoop,1>   <spark,1>

Reduce数据处理:

<hadoop,List(1)>
   <spark,List(1,1,1,1)>
   <java,List(1,1)

总结:k1进map k2出,k3进reduce k4出


基于本地测试,去掉core-site.xml中 “hadoop.tmp.dir”配置项

<property>
  <name>hadoop.tmp.dir</name>
  <value>/opt/modules/hadoop/tmp</value>
</property>

示例代码:

package com.kfk.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WordCountMR {
    // 1.map
    public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        // 创建k2,v2对象
        private final Text mapOutputKey = new Text();
        private final IntWritable mapOutputValue = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 输出<k1,v1>,k1就是偏移量,v1就是一行行数据
            System.out.println("KeyIn: "+key+"   ValueIn: "+value);
            // 每一行数据
            String lineValue = value.toString();
            // 将数据按照空格分开
            String[] strs = lineValue.split(" ");
            // 输出<k2,v2>
            for (String str:strs){
                mapOutputKey.set(str);
                context.write(mapOutputKey,mapOutputValue);
            }
        }
    }
    // 2.reduce
    public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        // 创建v4对象
        private final IntWritable outputValue = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 对v3求和
            int sum = 0;
            for (IntWritable value:values){
                sum += value.get();
            }
            outputValue.set(sum);
            // 输出<k4,v4>
            context.write(key,outputValue);
        }
    }
    // driver
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = new Configuration();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
        WordCountMR wordCountMR = new WordCountMR();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(new Configuration());
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = wordCountMR.run(args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

将数据源上传到hdfs上:

bin/hdfs dfs -mkdir /user/caizhengjie/datas
bin/hdfs dfs -put /opt/datas/wordcount.txt  /user/caizhengjie/datas

创建输出的文件路径

bin/hdfs dfs -mkdir /user/caizhengjie/mr

这是测试Hadoop中自带的mapreduce程序

bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.9.3.jar wordcount /user/caizhengjie/datas/wordcou

查看结果

bin/hdfs dfs -text /user/caizhengjie/mr/output/par*


where applicable
        1
flink   2
hadoop  3
hbase   3
hive    4
hue     1
java    6
kafka   2
linux   1
python  4
spark   6
storm   1
unix    1
zookeeper       2


(2)MapReduce基于YARN调试

将编写好的程序,上传到虚拟机,并测试:

bin/hadoop jar /opt/jars/bigdata_study.jar /user/caizhengjie/datas/wordcount.txt /user/caizhengjie/mr/output
bin/hdfs dfs -text /user/caizhengjie/mr/output/par*

会出现上面的同样的结果。


四、MapReduce模板优化


根据上面代码,我们可以对MapReduce的编程模板进行优化,成为通用编程模版:

package com.kfk.hadoop.mr;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WordCountUpMR extends Configured implements Tool {
    /**
     * map
     */
    public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        // 创建k2,v2对象
        private final Text mapOutputKey = new Text();
        private final IntWritable mapOutputValue = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 输出<k1,v1>,k1就是偏移量,v1就是一行行数据
            System.out.println("Map in == KeyIn: "+key+"   ValueIn: "+value);
            // 每一行数据
            String lineValue = value.toString();
            // 将数据按照空格分开
            String[] strs = lineValue.split(" ");
            // 输出<k2,v2>
            for (String str:strs){
                mapOutputKey.set(str);
                context.write(mapOutputKey,mapOutputValue);
                // 打印出<k2,v2>
                System.out.println("Map out == KeyOut: "+mapOutputKey+"   ValueOut: "+mapOutputValue);
            }
        }
    }
    /**
     * reduce
     */
    public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        // 创建v4对象
        private final IntWritable outputValue = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 打印出k3,v3集合,用于本机测试
            List<IntWritable> list = Lists.newArrayList(values);
            System.out.println("Reduce in == KeyIn: "+key+"   ValueIn: "+list);
            // 对v3求和
            int sum = 0;
            for (IntWritable value:list){
                sum += value.get();
            }
            outputValue.set(sum);
            // 输出<k4,v4>
            context.write(key,outputValue);
            // 打印出k4,v4的值
            System.out.println("Reduce out == KeyOut: "+key+"   ValueOut: "+outputValue);
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new WordCountUpMR(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

4.png5.png


五、总结


WordCount程序MR运行流程:

input  ->  map  -> reduce  ->  output

每个阶段的数据类型

map输入  ->  <0,hadoop spark>
map输出  ->  <hadoop,1><spark,1>
reduce输入 ->   <hadoop,List(1,1)>
reduce输出 ->   <hadoop,2>

要知道的几个知识点:

  • 第一点:默认情况下,Map输入对格式
key:偏移量
value:文本中的一行值
  • 第二点:map与reduce之间的shuffle过程
map  ->   partition  ->  sort  ->  group  ->  reduce
    *分区:
        决定map输出的<key,value>,发送给哪个reduce去处理
    *排序:
        map -> 默认按单词首字母排序
    *分组:
        将相同key的value组合在一起,放在List中
        eg:  (keyIn:spark    ValueIn:[1, 1, 1, 1])
  • 第三点:reduce的输出结果,默认情况下,key和value作为一行数据输出key和value之间的分隔符为制表符 \t
相关文章
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
129 3
|
3月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
67 1
|
5月前
|
分布式计算 大数据 Hadoop
揭秘MapReduce背后的魔法:从基础类型到高级格式,带你深入理解这一大数据处理利器的奥秘与实战技巧,让你从此不再是编程门外汉!
【8月更文挑战第17天】MapReduce作为分布式计算模型,是大数据处理的基石。它通过Map和Reduce函数处理大规模数据集,简化编程模型,使开发者聚焦业务逻辑。MapReduce分单阶段和多阶段,支持多种输入输出格式如`TextInputFormat`和`SequenceFileInputFormat`。例如,简单的单词计数程序利用`TextInputFormat`读取文本行并计数;而`SequenceFileInputFormat`适用于高效处理二进制序列文件。合理选择类型和格式可有效解决大数据问题。
81 1
|
5月前
|
分布式计算 Hadoop Java
Hadoop_MapReduce中的WordCount运行详解
MapReduce的WordCount程序在分布式系统中计算大数据集中单词出现的频率时,提供了一个可以复用和可伸缩的解决方案。它体现了MapReduce编程模型的强大之处:简单、可靠且将任务自动分布到一个集群中去执行。它首先运行一系列的Map任务来处理原始数据,然后通过Shuffle和Sort机制来组织结果,最后通过运行Reduce任务来完成最终计算。因此,即便数据量非常大,通过该模型也可以高效地进行处理。
120 1
|
7月前
|
分布式计算 Hadoop Java
MapReduce编程模型——在idea里面邂逅CDH MapReduce
MapReduce编程模型——在idea里面邂逅CDH MapReduce
104 15
|
7月前
|
分布式计算 资源调度 数据处理
YARN支持哪些非基于MapReduce的计算模型?
【6月更文挑战第19天】YARN支持哪些非基于MapReduce的计算模型?
79 11
|
7月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
70 1
|
7月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
57 0
|
7月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
104 0
|
7月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
300 0