Hadoop基础学习---6、MapReduce框架原理(二)

简介: Hadoop基础学习---6、MapReduce框架原理(二)

1.3 Shuffle机制

1.3.1 Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

9b1b00c0877145c48b7f9b08f52f835e.png

1.3.2 Partition

1、问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照收集归属地不同省份输出到不同文件中。

2、默认Partitioner分区

cb71a88f1f534ffc9702d5d4c90892b9.png

默认分区时根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key储存到哪个分区

3、自定义Partitioner步骤

(1)自定义类继承Partitioner,重写getPartition()方法

public class CustomPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 控制分区代码逻辑
… …
return partition;
  }
}

(2)在驱动中,设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相对应数量的ReduceTask

job.setNumReduceTasks(5); //我的逻辑分区数量是5

(4)分区总结

(a)如果ReduceTask的数量大于getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

(b)如果1 小于 ReduceTask的数量 小于 getPartition的结果数,则有一部分分区数据无处安放,会Exception;

(c)如 果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个

ReduceTask,最终也就只会产生一个结果文件 part-r-00000;

(d)分区号必须从零开始,逐一累加。

1.3.3 Partition 分区案例实操

1、需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(1)输入数据文件(百度网盘自取数据文件)

链接:https://pan.baidu.com/s/1i2FdQTWFijkrr29n9xAj8Q

提取码:zhm6

(2)手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到

一个文件中。

2、需求分析

22f2da2cb3274a82b3f6b5adff92d367.png

3、在序列化实操的基础上,增加一个分区类

package org.example.fenqu;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @ClassName MyPartition
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/21 11:29
 * @Version 1.0
 */
public class MyPartition extends Partitioner<Text,FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        String phone=text.toString();
        String prePhone=phone.substring(0,3);
        //定义分区编号
        int partition;
        if ("136".equals(prePhone)){
            partition=0;
        }else if ("137".equals(prePhone)){
            partition=1;
        }else if ("138".equals(prePhone)){
            partition=2;
        }else if ("139".equals(prePhone)){
            partition=3;
        }
        else {
            partition=4;
        }
        return partition;
    }
}

(4)在驱动函数中增加自定义数据分区设置和ReduceTask设置

package org.example.fenqu;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @ClassName MyPartition
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/21 11:29
 * @Version 1.0
 */
public class MyPartition extends Partitioner<Text,FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        String phone=text.toString();
        String prePhone=phone.substring(0,3);
        //定义分区编号
        int partition;
        if ("136".equals(prePhone)){
            partition=0;
        }else if ("137".equals(prePhone)){
            partition=1;
        }else if ("138".equals(prePhone)){
            partition=2;
        }else if ("139".equals(prePhone)){
            partition=3;
        }
        else {
            partition=4;
        }
        return partition;
    }
}
1.3.4 WritableComparable排序

1·、排序概述

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

对于MapTask,它将会处理的结果暂时存放到环形缓冲区中,当环形缓冲区使用率达到一定阙值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所以文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阙值,则溢写到磁盘上,否者储存在内存中。如果磁盘上文件数目达到一定阙值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数目超过一定的阙值,则进行一次合并后将数据溢写到磁盘上。当所以数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

2、排序的分类

(1)部分排序

MapReduce根据输入记录的键值对数据集进行排序。保证输出的每个文件内部有序。

(2)全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

(3)辅助排序(GroupingComparator)

在Reduce端对key进行分组。应用于:在接受的key为bean对象的时候,想让一个或几个这段相同(全部字段比较不相同)的key进入到同一个Reduce方法,可以采用分组排序。

(4)二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

3、自定义排序WritableComparable 原理分析

bean对象作为key传输,需要实现WritableComparable接口重写compareTo()方法,就可以实现排序

@Override
public int compareTo(FlowBean bean) {
int result;
// 按照总流量大小,倒序排列
if (this.sumFlow > bean.getSumFlow()) {
result = -1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
1.3.5 WritableComparable排序案例实操(全排序)

1、需求

根据序列化案例(上一篇文章最后一个案例)产生的结果再次对总流量进行倒序排序。

2、需求分析

53abf65dc7c9403ab4badb79cdafefcb.png

3、代码实现

1、FlowBean对象在需求1基础上增加了比较功能

package org.example.paixu;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * @ClassName FlowBean
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/20 10:13
 * @Version 1.0
 */
public class FlowBean implements  WritableComparable<FlowBean> {
    private Long  upFlow;
    private Long downFlow;
    private long sumFlow;
    public FlowBean() {
    }
    public Long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }
    public Long  getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(Long  downFlow) {
        this.downFlow = downFlow;
    }
    public void setSumFlow() {
        this.sumFlow = this.upFlow+this.downFlow;
    }
    //实现序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }
    //实现反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow=dataInput.readLong();
        this.downFlow=dataInput.readLong();
        this.sumFlow=dataInput.readLong();
    }
    @Override
    public String toString() {
        return upFlow+"\t"+downFlow+"\t"+sumFlow;
    }
    @Override
    public int compareTo(FlowBean o) {
        //倒序排列
        return this.sumFlow>o.sumFlow?-1:1;
    }
}

2、编写Mapper类

package org.example.paixu;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * @ClassName FlowMapper
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/20 10:21
 * @Version 1.0
 */
public class FlowMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
    private Text outV=new Text();
    private FlowBean outK=new FlowBean();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {
        String s = value.toString();
        String[] split = s.split("\t");
        String phone=split[0];
        outV.set(phone);
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        context.write(outK,outV);
    }
}

3、编写Reducer类

package org.example.paixu;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @ClassName FlowReducer
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/20 10:29
 * @Version 1.0
 */
public class FlowReducer extends Reducer <FlowBean, Text,Text,FlowBean>{
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }
    }
}

4、编写Driver类

package org.example.paixu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
 * @ClassName FlowDriver
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/20 10:38
 * @Version 1.0
 */
public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1、获取job对象
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        //2、关联本Driver类
        job.setJarByClass(FlowDriver.class);
        //3、关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //4、设置Map端输出KV类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        //5、设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class)
        //6、设置程序的输入和输出路径
        FileInputFormat.addInputPath(job,new Path("E:\\test\\output1"));
        FileOutputFormat.setOutputPath(job,new Path("E:\\test\\output2"));
        //7、提交Job
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
1.3.6 WritableComparable排序案例实操(区内排序)

前提:这个案例和上一个案例区别不大,就是增加了分区而已,所以只需要增加自定义分区类和修改一个Driver类就行,像Mapper类和Reducer类是不用修改的。

(1)增加自定义分区类

package org.example.paixu;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @ClassName MyPartition
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/21 11:29
 * @Version 1.0
 */
public class MyPartition extends Partitioner< FlowBean,Text> {
    @Override
    public int getPartition(FlowBean flowBean, Text text, int i) {
        String phone=text.toString();
        String prePhone=phone.substring(0,3);
        //定义分区编号
        int partition;
        if ("136".equals(prePhone)){
            partition=0;
        }else if ("137".equals(prePhone)){
            partition=1;
        }else if ("138".equals(prePhone)){
            partition=2;
        }else if ("139".equals(prePhone)){
            partition=3;
        }
        else {
            partition=4;
        }
        return partition;
    }
}

(2)修改Driver

package org.example.paixu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
 * @ClassName FlowDriver
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/20 10:38
 * @Version 1.0
 */
public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1、获取job对象
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        //2、关联本Driver类
        job.setJarByClass(FlowDriver.class);
        //3、关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //4、设置Map端输出KV类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        //5、设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //8、指定自定义分区器
        job.setPartitionerClass(MyPartition.class);
        //9、同时指定相应数量的ReduceTask
        job.setNumReduceTasks(5);
        //6、设置程序的输入和输出路径
        FileInputFormat.addInputPath(job,new Path("E:\\test\\output1"));
        FileOutputFormat.setOutputPath(job,new Path("E:\\test\\output2"));
        //7、提交Job
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

1.3.7 Combiner 合并

Combiner是MR程序中Mapper和Reducer之外的一种组件

Combiner组件的父类就是Reducer

Combiner和Reducer的区别就是运行位置

Combiner是在每一个MapTask所在的节点运行

Reducer是接受全局所以Mapper的输出结果

Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量

Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combine的输出kv应该跟Reducer的输入kv类型要对应起来

a63a77fdc55d42749db86f8ae2c10c5c.png

1、自定义Combiner实现步骤

(1)自定义一个Combiner继承Reducer,重写Reduce()方法

package org.example._07Combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @ClassName CombinerClass
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/22 9:11
 * @Version 1.0
 */
public class CombinerClass extends Reducer<Text, IntWritable,Text, IntWritable> {
    private IntWritable outV=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum=0;
        for (IntWritable value : values) {
            sum+=value.get();
        }
        outV.set(sum);
        context.write(key,outV);
    }
}

(2)在Job驱动类中设置

job.setCombinerClass(WordCountCombiner.class);
1.3.8 合并案例实操

1、需求(我的还是单词统计的代码)

统计过程中对每一个 MapTask 的输出进行局部汇总,以减小网络传输量即采用Combiner 功能。

(1)数据输入

自己创造一个就行了吧(到这应该会自己创造自己需要用的数据)

(2)期望数据输出

自己算吧

2、案例实操

(1)增加一个类继承Reducer

package org.example._07Combiner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @ClassName CombinerClass
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/5/22 9:11
 * @Version 1.0
 */
public class CombinerClass extends Reducer<Text, IntWritable,Text, IntWritable> {
    private IntWritable outV=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum=0;
        for (IntWritable value : values) {
            sum+=value.get();
        }
        outV.set(sum);
        context.write(key,outV);
    }
}

(2)在驱动类中指定Combiner

job.setCombinerClass(CombinerClass.class);

说明:其实这里也可以直接填以下代码

job.setCombinerClass(WordCountReduce.class);


相关文章
|
2月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
72 1
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
56 4
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
104 3
|
2月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
42 2
|
2月前
|
分布式计算 负载均衡 算法
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
Hadoop-31 ZooKeeper 内部原理 简述Leader选举 ZAB协议 一致性
31 1
|
2月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
45 1
|
2月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
52 1
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
41 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
53 1
|
2月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
52 0