基于WordCount详解MapReduce编程模型!

简介: 笔记

一、MapReduce编程模型概述


MR是分布式计算模型

MR整个并行计算过程中会抽象出两个函数:


map():它是对我们独立元素中的每一个元素进行并行计算操作的函数

reduce():它是对我们独立元素中的数据进行合并

一个简单 的MR程序,我们只需要指定map() reduce() input output,剩下的事情交个我们的框架来完成


关于MR的数据处理流程:


数据处理的阶段 input -> map -> reduce ->output

任务Job = Map+Reduce

Map的输出是Reduce的输入

数据处理过程中,MR要求的数据格式是以

是Map的输入,是Map的输出。

是Reduce的输入,是Reduce的输出。

MapReduce编程模型图:

image.png

  • k2 = k3,v3是一个集合,v3的元素就是v2,表示为v3=list(v2)
  • 所有的输入和输出的数据类型必须是Hadoop的数据类型
  • MapReduce处理的数据都是HDFS的数据(或HBase)

一个小案例,解释上面的流程

数据源:

a good beginning is half the battle
where there is a will there is a way

map阶段:

输入是,其中k1默认是指偏移量,类型是LongWritable,v1是指一行的文本内容,类型是Text。因为要统计单词的数量,所以k2是指单词,类型是Text。v2用数字1表示,即一个单词记一数,类型是IntWritable。

并行读取文本,获取,进行map操作,输出

读取第一行:

a good beginning is half the battle

在用户自己实现的map函数中,取到的v1即这一行的文本,以空格为分隔符对字符串进行拆分,获取到每个单词,并以的格式输出,得到:

<a,1> <good,1> <beginning,1> <is,1> <half,1> <the,1> <battle,1>

读取第二行:

where there is a will there is a way

得到:

<where,1> <there,1> <is,1> <a,1> <will,1> <there,1> <is,1> <a,1> <way,1>

Reduce阶段:

按MapReduce的理论得知,在Reduce阶段,输入,k3与k2一致,表示单词,类型是Text。v3是v2的集合,即List(v2)。同时k3默认已经按字典顺序排序,因此是:

<a,(1,1,1)> <battle,(1)> <beginning,(1)> <good,(1)> <half,(1)> <is,(1,1,1)> <the,(1)> <there,(1,1)> <way,(1)> <where,(1)> <will,(1)>

在用户实现自己的reduce函数中,读入并统计v3集合中每个单词的总数,并将输出,得到:

<a,3> <battle,1> <beginning,1> <good,1> <half,1> <is,3> <the,1> <there,2> <way,1> <where,1> <will,1>

整个MR的大致过程如下:

2.png


二、WordCount案例


统计一批英文文章中,每个单词出现的总次数。

现在有一个输入文件"Gone With The Wind",这个文件有三个block:block1, block2, block3。三个block的内容依次如下图。MR原理图

3.png

Map阶段


每一个block对应一个分片split (默认split与block一一对应)。

每一个split对应一个map任务(map task)。所以这里三个block将会对应三个map task(map1, map2, map3),这3个任务的逻辑完全一样。

以map1为例。map1会读取block1的数据,一次读取block1的一行数据,然后会产生一个kv对(其中,key是当前所读行的行首相对于当前block开始处的字节偏移量;value是当前行的内容;如假设当前所读行是第一行,那么当前行的内容是"Dear Bear River",则kv对是(0, “Dear Bear River”)),作为map()的参数传入,调用map()。

map()方法。将value当前行内容按空格切分,得到三个单词Dear|Bear|River,然后将每个单词变成键值对(Dear, 1)|(Bear, 1)|(River, 1),最终结果输出为文件,写入map任务所在节点的本地磁盘中(其中还有一个Shuffle的过程)。

block的第一行数据被处理完后,接着处理第二行,当map任务将当前block所有的数据全部处理完后,此map任务即运行结束。

Reduce阶段


reduce任务(reduce task)的个数由用户程序指定,main()内的job.setNumReduceTask(4)指定reduce任务是4个(reduce1, reduce2, reduce3, reduce4)。

以reduce1为例。reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端(拷贝阶段)。同样地,也会连接到map2,map3获取数据。最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一个组。4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入(其中还有一个Shuffle的过程,下文会详细讲解),调用reduce()。

reduce()方法。计算Dear的总数为4,并将(Dear, 4)作为键值对输出,最终结果输出成文件,写入HDFS。


三、MapReduce基于WordCount编程


(1)MapReduce基于本地调试

准备数据源

java python hadoop
spark spark java hadoop
hive hbase hbase hive
spark java python hadoop
linux unix java python
spark spark storm flink
java flink hue kafka kafka
java python zookeeper zookeeper
spark hive hbase hive

基于WordCount业务逻辑编写:

Map数据处理:

<0,hadoop  spark>    ->   split  ->   <hadoop,1>   <spark,1>

Reduce数据处理:

<hadoop,List(1)>
   <spark,List(1,1,1,1)>
   <java,List(1,1)

总结:k1进map k2出,k3进reduce k4出


基于本地测试,去掉core-site.xml中 “hadoop.tmp.dir”配置项

<property>
  <name>hadoop.tmp.dir</name>
  <value>/opt/modules/hadoop/tmp</value>
</property>

示例代码:

package com.kfk.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WordCountMR {
    // 1.map
    public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        // 创建k2,v2对象
        private final Text mapOutputKey = new Text();
        private final IntWritable mapOutputValue = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 输出<k1,v1>,k1就是偏移量,v1就是一行行数据
            System.out.println("KeyIn: "+key+"   ValueIn: "+value);
            // 每一行数据
            String lineValue = value.toString();
            // 将数据按照空格分开
            String[] strs = lineValue.split(" ");
            // 输出<k2,v2>
            for (String str:strs){
                mapOutputKey.set(str);
                context.write(mapOutputKey,mapOutputValue);
            }
        }
    }
    // 2.reduce
    public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        // 创建v4对象
        private final IntWritable outputValue = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 对v3求和
            int sum = 0;
            for (IntWritable value:values){
                sum += value.get();
            }
            outputValue.set(sum);
            // 输出<k4,v4>
            context.write(key,outputValue);
        }
    }
    // driver
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = new Configuration();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
        WordCountMR wordCountMR = new WordCountMR();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(new Configuration());
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = wordCountMR.run(args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

将数据源上传到hdfs上:

bin/hdfs dfs -mkdir /user/caizhengjie/datas
bin/hdfs dfs -put /opt/datas/wordcount.txt  /user/caizhengjie/datas

创建输出的文件路径

bin/hdfs dfs -mkdir /user/caizhengjie/mr

这是测试Hadoop中自带的mapreduce程序

bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.9.3.jar wordcount /user/caizhengjie/datas/wordcou

查看结果

bin/hdfs dfs -text /user/caizhengjie/mr/output/par*


where applicable
        1
flink   2
hadoop  3
hbase   3
hive    4
hue     1
java    6
kafka   2
linux   1
python  4
spark   6
storm   1
unix    1
zookeeper       2


(2)MapReduce基于YARN调试

将编写好的程序,上传到虚拟机,并测试:

bin/hadoop jar /opt/jars/bigdata_study.jar /user/caizhengjie/datas/wordcount.txt /user/caizhengjie/mr/output
bin/hdfs dfs -text /user/caizhengjie/mr/output/par*

会出现上面的同样的结果。


四、MapReduce模板优化


根据上面代码,我们可以对MapReduce的编程模板进行优化,成为通用编程模版:

package com.kfk.hadoop.mr;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class WordCountUpMR extends Configured implements Tool {
    /**
     * map
     */
    public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        // 创建k2,v2对象
        private final Text mapOutputKey = new Text();
        private final IntWritable mapOutputValue = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 输出<k1,v1>,k1就是偏移量,v1就是一行行数据
            System.out.println("Map in == KeyIn: "+key+"   ValueIn: "+value);
            // 每一行数据
            String lineValue = value.toString();
            // 将数据按照空格分开
            String[] strs = lineValue.split(" ");
            // 输出<k2,v2>
            for (String str:strs){
                mapOutputKey.set(str);
                context.write(mapOutputKey,mapOutputValue);
                // 打印出<k2,v2>
                System.out.println("Map out == KeyOut: "+mapOutputKey+"   ValueOut: "+mapOutputValue);
            }
        }
    }
    /**
     * reduce
     */
    public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        // 创建v4对象
        private final IntWritable outputValue = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 打印出k3,v3集合,用于本机测试
            List<IntWritable> list = Lists.newArrayList(values);
            System.out.println("Reduce in == KeyIn: "+key+"   ValueIn: "+list);
            // 对v3求和
            int sum = 0;
            for (IntWritable value:list){
                sum += value.get();
            }
            outputValue.set(sum);
            // 输出<k4,v4>
            context.write(key,outputValue);
            // 打印出k4,v4的值
            System.out.println("Reduce out == KeyOut: "+key+"   ValueOut: "+outputValue);
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/wordcount.txt",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new WordCountUpMR(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

4.png5.png


五、总结


WordCount程序MR运行流程:

input  ->  map  -> reduce  ->  output

每个阶段的数据类型

map输入  ->  <0,hadoop spark>
map输出  ->  <hadoop,1><spark,1>
reduce输入 ->   <hadoop,List(1,1)>
reduce输出 ->   <hadoop,2>

要知道的几个知识点:

  • 第一点:默认情况下,Map输入对格式
key:偏移量
value:文本中的一行值
  • 第二点:map与reduce之间的shuffle过程
map  ->   partition  ->  sort  ->  group  ->  reduce
    *分区:
        决定map输出的<key,value>,发送给哪个reduce去处理
    *排序:
        map -> 默认按单词首字母排序
    *分组:
        将相同key的value组合在一起,放在List中
        eg:  (keyIn:spark    ValueIn:[1, 1, 1, 1])
  • 第三点:reduce的输出结果,默认情况下,key和value作为一行数据输出key和value之间的分隔符为制表符 \t
相关文章
|
23天前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
32 0
|
23天前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
85 0
|
23天前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
33 0
|
17天前
|
分布式计算 资源调度 Hadoop
MapReduce分布式编程
MapReduce分布式编程
21 1
|
23天前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
|
23天前
|
分布式计算 并行计算 Java
【分布式计算框架】 MapReduce编程初级实践
【分布式计算框架】 MapReduce编程初级实践
20 2
|
23天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
25 2
|
23天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
146 0
|
23天前
|
分布式计算 Java Hadoop
MapReduce编程:检索特定群体搜索记录和定义分片操作
MapReduce编程:检索特定群体搜索记录和定义分片操作
36 0
|
23天前
|
分布式计算 Java Hadoop
MapReduce编程:数据过滤保存、UID 去重
MapReduce编程:数据过滤保存、UID 去重
59 0