Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例

简介: Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例

13.MapReduce框架原理

13.2MapReduce工作流程

在这里插入图片描述在这里插入图片描述上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:
(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
(5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask
会将这些文件再进行合并(归并排序)
(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
注意:
(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。

13.3Shuffle机制

13.3.1Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
在这里插入图片描述

13.3.2Partition分区

13.3.2.1问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

13.3.2.2默认Partitioner分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
    } 
}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

13.3.2.3自定义Partitioner步骤

13.3.2.3.1自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
    // 控制分区代码逻辑
        … …
        return partition; 
    } 
} 
13.3.2.3.2在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
13.3.2.3.3自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);

13.3.2.4分区总结

(1)如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
(4)分区号必须从零开始,逐一累加。

13.3.2.5案例分析

例如:假设自定义分区数为5,则
(1)job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2); 会报错
(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

13.3.3Partition 分区案例实操

13.3.3.1需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据
在这里插入图片描述

1    13736230513    192.196.100.1    www.baidu.com    2481    24681    200
2    13846544121    192.196.100.2            264    0    200
3     13956435636    192.196.100.3            132    1512    200
4     13966251146    192.168.100.1            240    0    404
5     18271575951    192.168.100.2    www.baidu.com    1527    2106    200
6     84188413    192.168.100.3    www.baidu.com    4116    1432    200
7     13590439668    192.168.100.4            1116    954    200
8     15910133277    192.168.100.5    www.hao123.com    3156    2936    200
9     13729199489    192.168.100.6            240    0    200
10     13630577991    192.168.100.7    www.shouhu.com    6960    690    200
11     15043685818    192.168.100.8    www.baidu.com    3659    3538    200
12     15959002129    192.168.100.9    www.baidu.com    1938    180    500
13     13560439638    192.168.100.10            918    4938    200
14     13470253144    192.168.100.11            180    180    200
15     13682846555    192.168.100.12    www.qq.com    1938    2910    200
16     13992314666    192.168.100.13    www.gaga.com    3008    3720    200
17     13509468723    192.168.100.14    www.qinghua.com    7335    110349    404
18     18390173782    192.168.100.15    www.sogou.com    9531    2412    200
19     13975057813    192.168.100.16    www.baidu.com    11058    48243    200
20     13768778790    192.168.100.17            120    120    200
21     13568436656    192.168.100.18    www.alibaba.com    2481    24681    200
22     13568436656    192.168.100.19            1116    954    200

(2)期望输出数据
手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到一个文件中

13.3.3.2需求分析

1、需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

2、数据输入
13630577991 6960 690
13736230513 2481 24681
13846544121 264 0
13956435636 132 1512
13560439638 918 4938

3、期望数据输出
文件1
文件2
文件3
文件4
文件5

4、增加一个ProvincePartitioner分区
136 分区0
137 分区1
138 分区2
139 分区3
其他 分区4

5、Driver驱动类
//指定自定义数据分区
job.setPartitionerClass ( ProvincePartitioner.class) ;
//同时指定相应数量的reduceTask
job.setNumReduceTasks ( 5);

13.3.3.3在案例 2.3 的基础上,增加一个分区类

package com.summer.mapreduce.partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        //获取手机号前三位prePhone
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);

        //定义一个分区号变量partition,根据prePhone设置分区号
        int partition;

        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }

        //最后返回分区号partition
        return partition;
    }
}

13.3.3.4在驱动函数中增加自定义数据分区设置和ReduceTask设置

package com.summer.mapreduce.partitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class FlowDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 关联本Driver类
        job.setJarByClass(FlowDriver.class);

        //3 关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4 设置Map端输出数据的KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //5 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //8 指定自定义分区器
        job.setPartitionerClass(ProvincePartitioner.class);

        //9 同时指定相应数量的ReduceTask
        job.setNumReduceTasks(5);

        //6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));
        FileOutputFormat.setOutputPath(job, new Path("D\\partitionout"));

        //7 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
相关文章
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
64 2
|
4月前
|
数据采集 分布式计算 资源调度
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
88 3
|
1月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
39 1
|
1月前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
41 1
|
1月前
|
分布式计算 资源调度 数据可视化
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
Hadoop-06-Hadoop集群 历史服务器配置 超详细 执行任务记录 JobHistoryServer MapReduce执行记录 日志聚合结果可视化查看
36 1
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
47 1
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
82 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
37 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
46 0

热门文章

最新文章

相关实验场景

更多