MapReduce之二次排序案例详解!

简介: 笔记

一、需求分析


MR的二次排序的需求说明:

mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。

23.png

  原始数据    无二次排序 有二次排序
  a 12      a 12      a 12
  b 34    b 34    b 13
  c 90    b 23      b 23
  b 23    b 13      b 34
  b 13    c 90      c 90

根据案例分析,我们要将下面数据key按照abc,value按照大小排序,这也就是一个典型的MR的二次排序的案例,准备原始数据:

a 20
b 20
a 5
c 10
c 8
b 15
a 10
b 18
c 29
b 52

我们想要得到的结果:

a       5
a       10
a       20
b       15
b       18
b       20
b       52
c       8
c       10
c       29


二、方案一实现


先看方案一的实现思路:

input -> map -><a,20> -> shuffle -> <a,list(10, 5, 20)> -> reduce -> <a,5>
         <b,20>       <b,list(52, 18, 15, 20)>         <a,10>
           <a,5>        <c,list(29, 8, 10)>            <a,20>
           <c,10>                          <b,15>
           ...                             <b,18>
                                         <b,20>
                                          ...

直接在reduce端对分组后的values进行排序

示例代码:

package com.kfk.hadoop.mr.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class SortMR extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class TemplateMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
        // 创建map输出的对象
        private static final Text mapOutKey = new Text();
        private static final IntWritable mapOutValue = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将每一行数据按空格拆开
            String[] values = value.toString().split(" ");
            // 数据预处理,将数组超过2的过滤掉
            if (values.length != 2){
                return;
            }
            mapOutKey.set(values[0]);
            mapOutValue.set(Integer.valueOf(values[1]));
            context.write(mapOutKey,mapOutValue);
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * reduce
     * TODO
     */
    public static class TemplateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        // 创建reduceout端的对象
        private static final IntWritable outputValue = new IntWritable();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            List<Integer> valueList = new ArrayList<Integer>();
            // 取出value
            for (IntWritable value:values){
                valueList.add(value.get());
            }
      // 打印出reduce输入的key和valueList
            System.out.println("Reduce in == KeyIn: "+key+"   ValueIn: "+valueList);
            // 进行排序
            Collections.sort(valueList);
      /*
                valueList:表示上面已经排序好的列表,即需要遍历列表中的值作为reduce的输出
                key不需要改变,即可作为reduce的输出
             */
            for (Integer value : valueList){
                outputValue.set(value);
                context.write(key,outputValue);
            }
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(TemplateMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        // job.setPartitionerClass();
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        // job.setCombinerClass(WordCountCombiner.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        // job.setGroupingComparatorClass();
        // 6.设置reduce的数量
        // job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(TemplateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new SortMR(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

a       5
a       10
a       20
b       15
b       18
b       20
b       52
c       8
c       10
c       29

很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。


注意的地方(容易被“坑”)

在reduce端对values进行迭代的时候,不要直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。


三、方案二实现


方案二的解决思路:

  原始数据      自定义newkey 在shuffle中排序  reduce输入            reduce输出
  a 12        a#12,12    a#12,12
  b 34      b#34,34    b#13,13
  c 90 -> map ->  c#90,90    b#23,23       b#,List(13,23,34)-> reduce ->  b,13 b,23 b,34
  b 23      b#23,23    b#34,34  
  b 13      b#13,13    c#90,90 

我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。


需要自定义的地方

 1.自定义数据类型实现组合key

   实现方式:继承WritableComparable

 2.自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。

   实现方式:继承Partitioner

 3.自定义分组,保持分组规则任然按照key进行。不打乱原来的分组

   实现方式:继承RawComparator


自定义数据类型代码:

package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/15
 * @time : 6:16 下午
 */
public class PairWritable implements WritableComparable<PairWritable> {
    // 组合key:a#12,12
    private String first;
    private int second;
    public PairWritable() {
    }
    public PairWritable(String first, int second) {
        this.set(first,second);
    }
    /**
     * 方便设置字段
     */
    public void set(String first, int second){
        this.first = first;
        this.second = second;
    }
    public String getFirst() {
        return first;
    }
    public void setFirst(String first) {
        this.first = first;
    }
    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    /**
     * 重写比较器
     */
    public int compareTo(PairWritable o) {
        int comp = this.getFirst().compareTo(o.getFirst());
        if (0 == comp){
            // 若第一个字段相等,则比较第二个字段
            return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());
        }
        return comp;
    }
    /**
     * 序列化
     */
    public void write(DataOutput out) throws IOException {
        out.writeUTF(first);
        out.writeInt(second);
    }
    /**
     * 反序列化
     */
    public void readFields(DataInput in) throws IOException {
        this.first = in.readUTF();
        this.second = in.readInt();
    }
    @Override
    public String toString() {
        return "PairWritable{" +
                "first='" + first + '\'' +
                ", second=" + second +
                '}';
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PairWritable that = (PairWritable) o;
        if (second != that.second) return false;
        return first != null ? first.equals(that.first) : that.first == null;
    }
    @Override
    public int hashCode() {
        int result = first != null ? first.hashCode() : 0;
        result = 31 * result + second;
        return result;
    }
}

自定义分区代码:

package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/15
 * @time : 7:09 下午
 */
public class FristPartitioner extends Partitioner<PairWritable, IntWritable> {
    public int getPartition(PairWritable key, IntWritable intWritable, int numPartitions) {
       /*
        * 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
        * 让key中first字段作为分区依据
        */
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

自定义分组比较器代码:

package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/15
 * @time : 6:59 下午
 */
public class FristGrouping implements RawComparator<PairWritable> {
    /*
     * 字节比较
     * bytes1,bytes2为要比较的两个字节数组
     * i,i1表示第一个字节数组要进行比较的收尾位置,i2,i3表示第二个
     * 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
     */
    public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {
        return WritableComparator.compareBytes(bytes1,0,i1-4,bytes2,0,i3-4);
    }
    /*
     * 对象比较
     */
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }
}

二次排序实现代码:

package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/10/9
 * @time : 7:07 下午
 */
public class SecondSortMR extends Configured implements Tool {
    /**
     * map
     * TODO
     */
    public static class TemplateMapper extends Mapper<LongWritable, Text,PairWritable, IntWritable>{
        // 创建map输出的对象
        private static final PairWritable mapOutKey = new PairWritable();
        private static final IntWritable mapOutValue = new IntWritable();
        @Override
        public void setup(Context context) {
            // TODO
        }
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将每一行数据按空格拆开
            String[] values = value.toString().split(" ");
            // 数据预处理,将数组超过2的过滤掉
            if (values.length != 2){
                return;
            }
            mapOutKey.set(values[0],Integer.parseInt(values[1]));
            mapOutValue.set(Integer.parseInt(values[1]));
            context.write(mapOutKey,mapOutValue);
            System.out.println("Map out == KeyOut: "+mapOutKey+"   ValueOut: "+mapOutValue);
        }
        @Override
        public void cleanup(Context context) {
            // TODO
        }
    }
    /**
     * reduce
     * TODO
     */
    public static class TemplateReducer extends Reducer<PairWritable,IntWritable,Text,IntWritable>{
        // 创建reduce output端的对象
        private static final IntWritable outputValue = new IntWritable();
        private static final Text outputKey = new Text();
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // TODO
        }
        @Override
        public void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      /*
                values表示reduce端输入已经排序好的列表,即需要遍历values每一个值作为reduce输出即可
                key表示为自定义的key(newkey),即需要取出newkey的第一部分,也就是原始的key,作为reduce的输出
             */
            for (IntWritable value:values){
                outputKey.set(key.getFirst());
                context.write(outputKey,value);
            }
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // TODO
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        // 3.1) input,指定job的输入
        Path path = new Path(args[0]);
        FileInputFormat.addInputPath(job,path);
        // 3.2) map,指定job的mapper和输出的类型
        job.setMapperClass(TemplateMapper.class);
        job.setMapOutputKeyClass(PairWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 1.分区
        job.setPartitionerClass(FristPartitioner.class);
        // 2.排序
        // job.setSortComparatorClass();
        // 3.combiner -可选项
        // job.setCombinerClass(WordCountCombiner.class);
        // 4.compress -可配置
        // configuration.set("mapreduce.map.output.compress","true");
        // 使用的SnappyCodec压缩算法
        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
        // 5.分组
        job.setGroupingComparatorClass(FristGrouping.class);
        // 6.设置reduce的数量
        // job.setNumReduceTasks(2);
        // 3.3) reduce,指定job的reducer和输出类型
        job.setReducerClass(TemplateReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 3.4) output,指定job的输出
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job,outpath);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        // 添加输入,输入参数
        args = new String[]{
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",
            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
        };
//        WordCountUpMR wordCountUpMR = new WordCountUpMR();
        Configuration configuration = new Configuration();
        try {
            // 判断输出的文件存不存在,如果存在就将它删除
            Path fileOutPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(configuration);
            if (fileSystem.exists(fileOutPath)){
                fileSystem.delete(fileOutPath,true);
            }
            // 调用run方法
            int status = ToolRunner.run(configuration,new SecondSortMR(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:

24.png

a       5
a       10
a       20
b       15
b       18
b       20
b       52
c       8
c       10
c       29
相关文章
|
8月前
|
存储 分布式计算 监控
Hadoop【基础知识 01+02】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】【分布式文件系统HDFS设计原理+特点+存储原理】(部分图片来源于网络)【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
347 2
|
8月前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
|
8月前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
333 0
|
分布式计算 Hadoop 大数据
MapReduce 案例之数据去重
MapReduce 案例之数据去重
322 0
|
存储 分布式计算 搜索推荐
MapReduce 案例之倒排索引
MapReduce 案例之倒排索引
237 0
MapReduce 案例之倒排索引
|
分布式计算 资源调度 监控
YARN On Mapreduce搭建与wordCount案例实现
YARN On Mapreduce搭建与wordCount案例实现
|
分布式计算 数据处理 索引
MapReduce 案例之Top N
MapReduce 案例之Top N
161 0
|
存储 分布式计算 Hadoop
MapReduce 实验:二次排序
MapReduce 实验:二次排序
MapReduce 实验:二次排序
|
分布式计算 监控 Java
3-网站日志分析案例-MapReduce执行日志清洗
文章目录 3-网站日志分析案例-MapReduce执行日志清洗 准备环境: 1.数据介绍 2.基于IDEA创建Maven工程 3.日志清洗
3-网站日志分析案例-MapReduce执行日志清洗
|
存储 分布式计算 Hadoop
Hadoop快速入门——第三章、MapReduce案例(字符统计)(2)
Hadoop快速入门——第三章、MapReduce案例(字符统计)
141 0
Hadoop快速入门——第三章、MapReduce案例(字符统计)(2)