开发者社区> 代码的天敌是懒惰> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

MapReduce自定义分区

简介: MapReduce自定义分区
+关注继续查看

自定义分区很简单,主要需要以下三步:


 **(1)创建一个类继承抽象类 Partitioner,然后重写`getPartition()`方法。** 具体实现如下所示:

public class TelePartitioner extends Partitioner<FlowBeanSort, Text> {

    // 重写自定义分区方法
    @Override
    public int getPartition(FlowBeanSort key, Text value, int numPartitions) {
        // (1)获取手机号的前3位
        String preNum = value.toString().substring(0, 3);

        // (2)定义分区号,从0开始,最大的分区号为6
        int partition = 6;

        // (3)使用if...else判断,将手机号的前3位和分区号进行对应
        if ("134".equals(preNum)) {
            partition = 0;
        } else if ("135".equals(preNum)) {
            partition = 1;
        } else if ("136".equals(preNum)) {
            partition = 2;
        } else if ("137".equals(preNum)) {
            partition = 3;
        } else if ("138".equals(preNum)) {
            partition = 4;
        }else if("139".equals(preNum)) {
            partition = 5;
        }
        // (4)返回分区号
        return partition;
    }
}

 **参数解析:**


- **key**:对应 MapTask 的输出 key,所以是封装并实现了自定义排序的流量信息类 FlowBeanSort

- **value**:对应 MapTask 的输出 value,所以是手机号

- **numPartitions**:指的是设置的 ReduceTask 的数量,默认值是1。


 **(2)另外,还要在 Driver 端给任务设置需要执行的分区类:**

job.setPartitionerClass(TelePartitioner.class);

**(3)自定义分区后,要根据自定义的分区逻辑设置相应数量的ReduceTask:**

job.setNumReduceTasks(7);

 **注意:**


- 如果 ReduceTask 的数量 > getPartition 的结果数,则会多产生几个空的输出文件 part-r-000xx;

- 如果1 < ReduceTask 的数量 < getPartition 的结果数,则有一部分分区数据无处安放,会 Exception;

- 如果 ReduceTask 的数量 = 1,则不管 MapTask 端输出多少个分区文件,最终结果都交给这一个 ReduceTask,最终也就只会产生一个结果文件 part-r-00000。


 例如:假设自定义分区数为 7,则:

 (1)`job.setNumReduceTasks(1);` # 会正常运行,只不过会产生一个输出文件


 (2)`job.setNumReduceTasks(2);` # 会报错


 (3)`job.setNumReduceTasks(8);` # 大于7,程序会正常运行,会产生空文件


 Map 端和 Reduce 端程序不用任何改动,使用 FlowSumSortMapper 和 FlowSumSortReducer。Driver 端添加设置自定义分区类和设置 ReduceTask 个数的代码即可。完整代码如下所示:


public class FlowSumSortDemo {
    /**
     * (2)得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序;
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // (1)获取配置信息类
        Configuration conf = new Configuration();
        // 指定mapreduce程序运行的hdfs的相关运行参数
        conf.set("fs.defaultFS", "hdfs://localhost:9000");

        // (2)新建一个Job对象
        Job job = Job.getInstance(conf);

        // (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)
        job.setJarByClass(FlowSumSortDemo.class);

        // (4)指定 Mapper 类和 Reducer 类
        job.setMapperClass(FlowSumSortMapper.class);
        job.setReducerClass(FlowSumSortReducer.class);
        // 指定自定义分区类
        job.setPartitionerClass(TelePartitioner.class);
        // 设置 ReduceTask的个数
        job.setNumReduceTasks(7);

        // (5)指定 MapTask 的输出key-value类型
        job.setMapOutputKeyClass(FlowBeanSort.class);
        job.setMapOutputValueClass(Text.class);

        // (6)指定 ReduceTask 的输出key-value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBeanSort.class);

        // (7)指定该 mapreduce 程序数据的输入和输出路径
        Path inPath = new Path("/flow/output_sum");
        Path outPath = new Path("/flow/output_partition");

        // 获取fs对象
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }

        FileInputFormat.setInputPaths(job, inPath);
        FileOutputFormat.setOutputPath(job, outPath);

        // (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出
        boolean waitForCompletion = job.waitForCompletion(true);
        System.exit(waitForCompletion ? 0 : 1);
    }
}















版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Spark 创建RDD、DataFrame各种情况的默认分区数
Spark 创建RDD、DataFrame各种情况的默认分区数
0 0
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
0 0
【Hive】(五)Hive 中动态分区与静态分区详解
【Hive】(五)Hive 中动态分区与静态分区详解
0 0
Hive动态分区详解
Hive动态分区注意事项是什么?
0 0
Hive动态分区
Hive动态分区 一)hive中支持两种类型的分区: 静态分区SP(static partition) 动态分区DP(dynamic partition) 静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断。
4664 0
hive动态分区
设置如下参数开启动态分区: hive.exec.dynamic.partition=true 默认值:false 描述:是否允许动态分区 hive.exec.dynamic.partition.
642 0
MapReduce编程实例之自定义分区
任务描述: 一组数据,按照年份的不同将其分别存放在不同的文件里 example Data: 2013 1 2013 5 2014 5 2014 8 2015 9 2015 4 Code: package mrTest; import java.
491 0
MapReduce框架Partitioner分区方法
前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的。
522 0
在hadoop作业中自定义分区和归约
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/43735703 当...
538 0
+关注
代码的天敌是懒惰
拒绝摆烂,寻寻渐进持续学习
文章
问答
文章排行榜
最热
最新
相关电子书
更多
HBase2.0重新定义小对象实时存取
立即下载
低代码开发师(初级)实战教程
立即下载
阿里巴巴DevOps 最佳实践手册
立即下载