MapReduce排序

简介: MapReduce排序

概念

MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数

据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序

2.全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。(比如wordcount最终将所有数据放到一个文件内再总的进行一次全排序)

3.二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。(比如先按照总流量使用降序,如果两个用户总流量相同的情况下再根据上行流量进行降序排列...)。

自定义排序WritableComparable

全排序

需求

根据上次的【流量统计案例】的结果,要求结果按照总流量进行降序排列。(默认是按照key(手机号)进行排序)。

输入数据

13480253104 180 180 360
13502468823 7335  110349  117684
13560436666 1116  954 2070
13560439658 2034  5892  7926
13602846565 1938  2910  4848
13660577991 6960  690 7650
13719199419 240 0 240
13726230503 2481  24681 27162
13726238888 2481  24681 27162
13760778710 120 120 240
13826544101 264 0 264
13922314466 3008  3720  6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512  1644
15013685858 3659  3538  7197
15920133257 3156  2936  6092
15989002119 1938  180 2118
18211575961 1527  2106  3633
18320173382 9531  2412  11943
84138413  4116  1432  5548

思路

让FlowBean来作为key(因为hadoop框架中的key必须支持可排序,如果让手机号作为key,结果仍然是按照手机号的字典序进行排序)

实现

1.修改FlowBean继承Writablepmparable接口,泛型的类型为需要排序的类

public class FlowBean implements WritableComparable<FlowBean> {
    //...
}

2.重写compareTo方法

@Override
    public int compareTo(FlowBean o) {
        //按照总流量进行降序排列
        if (this.sumFlow > o.sumFlow){
            return -1;  //降序
        }else if (this.sumFlow < o.sumFlow){
            return 1;   //升序
        }else {
            return 0;
        }
    }

3.Mapper

package com.lyh.mapreduce.writableComparable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text,FlowBean,Text> {
    private FlowBean outKey = new FlowBean();
    private Text outValue = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.获取一行数据
        String line = value.toString();
        //2.切割
        String[] words = StringUtils.split(line, '\t');
        //3.封装value
        outValue.set(words[0]);
        outKey.setUpFlow(Long.parseLong(words[1]));//对上一次的运行结果进行排序
        outKey.setDownFlow(Long.parseLong(words[2]));
        outKey.setSumFlow();
        //4.写出
        context.write(outKey,outValue);
    }
}

4.Reducer

public class FlowReducer extends Reducer<FlowBean, Text,Text,FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }
    }
}

5.Runner类

主要修改MapOutputKeyClass 和 MapOutputValueClass。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowRunner extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(),new FlowRunner(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        //1.获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "flow compu");
        //2.配置jar包路径
        job.setJarByClass(FlowRunner.class);
        //3.关联mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //4.设置map、reduce输出的k、v类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //5.设置数据输入的路径
        FileInputFormat.setInputPaths(job,new Path("D:\\MapReduce_Data_Test\\writable\\output1"));
        //6.设置输出路径-输出目录不可存在
        FileOutputFormat.setOutputPath(job,new Path("D:\\MapReduce_Data_Test\\writable\\output2"));
        //7.提交job
        return job.waitForCompletion(true) ? 0 : 1;//verbose:是否监控并打印job的信息
    }
}

运行结果

全部按照总流量(最后一列)排序

13502468823 7335  110349  117684
13925057413 11058 48243 59301
13726230503 2481  24681 27162
13726238888 2481  24681 27162
18320173382 9531  2412  11943
13560439658 2034  5892  7926
13660577991 6960  690 7650
15013685858 3659  3538  7197
13922314466 3008  3720  6728
15920133257 3156  2936  6092
84138413  4116  1432  5548
13602846565 1938  2910  4848
18211575961 1527  2106  3633
15989002119 1938  180 2118
13560436666 1116  954 2070
13926435656 132 1512  1644
13480253104 180 180 360
13826544101 264 0 264
13719199419 240 0 240
13760778710 120 120 240
13926251106 240 0 240

二次排序

我们发现,刚才的输出文件中有三位用户的总流量相同,我们希望如果用户总流量相同的情况下,我们再根据上行流量去降序排列。

13719199419 240 0 240
13760778710 120 120 240
13926251106 240 0 240

只需要再FlowBean中修改compareTo方法

@Override
    public int compareTo(FlowBean o) {
        //按照总流量进行降序排列
        if (this.sumFlow > o.sumFlow){
            return -1;  //降序
        }else if (this.sumFlow < o.sumFlow){
            return 1;   //升序
        }else {//如果总流量相同
            //按照上行流量降序排列
            if (this.upFlow > o.upFlow){
                return -1;
            }else if (this.upFlow < o.upFlow){
                return 1;
            }else {
                return 0;
            }
        }
    }

运行结果

...
13719199419 240 0 240
13926251106 240 0 240
13760778710 120 120 240

区内排序

需求:根据手机号(前三位数)进行分区,并要求每个分区内的

输入:

13502468823 7335  110349  117684
13925057413 11058 48243 59301
13726230503 2481  24681 27162
13726238888 2481  24681 27162
13820173382 9531  2412  11943
13560439658 2034  5892  7926
13760577991 6960  690 7650
13813685858 3659  3538  7197
13922314466 3008  3720  6728
13920133257 3156  2936  6092
84138413  4116  1432  5548
13502846565 1938  2910  4848
13911575961 1527  2106  3633
13789002119 1938  180 2118
13560436666 1116  954 2070
13926435656 132 1512  1644
13880253104 180 180 360
13826544101 264 0 264
13719199419 240 0 240
13926251106 240 0 240
13760778710 120 120 240

只需要重写一个分区类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<FlowBean,Text> {
    @Override
    public int getPartition(FlowBean flowBean, Text text, int partitioner) {
        //获取手机号
        String line = text.toString();
        String phone = line.substring(0,3);
        if (phone.equals("135")){
            partitioner = 0;
        }else if (phone.equals("137")){
            partitioner = 1;
        }else if (phone.equals("138")){
            partitioner = 2;
        }else if (phone.equals("139")){
            partitioner = 3;
        }else {
            partitioner = 0;
        }
        return partitioner;
    }
}

运行结果

生成4个分区,分别存储135、137、138、139开头的手机号。

相关文章
|
分布式计算 Hadoop 大数据
MapReduce 案例之数据去重
MapReduce 案例之数据去重
200 0
|
7月前
|
分布式计算
28 MAPREDUCE中的排序初步
28 MAPREDUCE中的排序初步
17 0
|
7月前
|
分布式计算
24 MAPREDUCE中的Combiner
24 MAPREDUCE中的Combiner
28 0
|
存储 分布式计算 Hadoop
MapReduce排序
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数
|
存储 分布式计算 Hadoop
MapReduce 实验:二次排序
MapReduce 实验:二次排序
MapReduce 实验:二次排序
|
分布式计算 Hadoop
Hadoop学习:MapReduce不使用Reduce将表合并提高效率
Hadoop学习:MapReduce不使用Reduce将表合并提高效率
118 0
|
缓存 分布式计算 Hadoop
利用采样器实现mapreduce任务输出全排序
采样器是hadoop内自带的一个可以对目标文件部分数据进行提取的工具类,以方便我们对这些采样的数据做一些参考或者处理。hadoop提供了多种采样器供我们使用,以满足不同的需求。另外,采样器不同于普通mapreduce操作。
1098 0
|
分布式计算
MapReduce之输出结果排序
前面的案例中我们介绍了统计出每个用户的上行流量,下行流量及总流量,现在我们想要将输出的结果按照总流量倒序排序。
MapReduce之输出结果排序
|
SQL 大数据 Shell
HIVE TopN shuffle 原理
TopN 问题是排序中的一个经典问题。对于一个长度为 m 的数组,取其最大的 n (n
2193 0

热门文章

最新文章