MapReduce【自定义分区Partitioner】

简介: MapReduce【自定义分区Partitioner】

实际开发中我们可能根据需求需要将MapReduce的运行结果生成多个不同的文件,比如上一个案例【MapReduce计算广州2022年每月最高温度】,我们需要将前半年和后半年的数据分开写到两个文件中。

默认分区

默认MapReduce只能写出一个文件:

因为我们在提交job的时候未设置reduceTask的个数,所以默认reduceTask的个数为1,结果也就只能输出一个文件,下面是数据写入文件时,默认调用的HashPartitioner类的getPartition方法:

public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & 2147483647) % numReduceTasks;
    }

因为reduceTask数量为1,任何数1取余结果为0,所以文件只有一个且文件名位 "part-r-00000"

自定义分区

1.继承Partitioner类

像HashPartitioner类一样继承Partitioner类

2.指定泛型

因为分区出现在map方法之后,所以两个泛型参数分别为map方法输出的key和value的类型

3.继承getPartition方法

  • 继承后需要重写分区逻辑,也就是说如何根据输入的键或值来确定该行数据的分区。
  • 设置自定义分区后,因为我们的输入格式是默认的TextInputFormat,所以每行数据都会通过getPartitioner方法来选择分区。
public class MyPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int partitioner) {
        //text 是月份
        String month = text.toString();
        int month_num = Integer.parseInt(month);
        //注意:分区号必须从0开始!
        if (month_num<=6)   //前半年的数据放到分区0
            partitioner = 0;
        else                //后半年的数据放到分区1
            partitioner = 1;
        return partitioner;//返回分区号
    }
}

4.在job中设置分区

  • 我们不仅要指定分区的类型还要指定reduceTask的数量,因为我们最后是根据分区的数量来分配给不同的reduceTask去执行。

这里我们只需要增加下面的语句即可,其余部分代码不需要改动。

//绑定自定义分区
        job.setPartitionerClass(MyPartitioner.class);
        //设置reduceTask个数
        job.setNumReduceTasks(2);

输出结果

注意点-reduceTask的数量

reduceTask数量 > 分区的数量

这种情况会造成CPU资源的浪费,因为会多开启 reduceTask ,它没有任务还空占大约1G内存。

1< reduceTask数量< 分区的数量

会抛出异常,因为有多余的分区剩下来没有reduceTask拉取处理。

reduceTask数量 =1

自定义分区失效,按照默认的分区规则(HashPartitioner的getPartitioner方法)执行,仍然只生成一个结果文件。

分区号必须从0开始

向下面这段代码一样,共划分了5个分区(0~4),结果会有5个结果文件产生。

if (条件1)   
     partitioner = 0;    //分区0
else if(条件2)                
     partitioner = 1;    //分区1
else if(条件3)                
     partitioner = 2;    //分区2
else if(条件4)                
     partitioner = 3;    //分区3
else                 
     partitioner = 4;    //分区4


相关文章
|
1月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
37 0
|
8天前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
9 0
|
1月前
|
存储 分布式计算 关系型数据库
MapReduce【自定义OutputFormat】
MapReduce【自定义OutputFormat】
|
1月前
|
分布式计算
MapReduce【自定义InputFormat】
MapReduce【自定义InputFormat】
|
1月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
47 1
|
1月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
117 0
|
16天前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
17 1
|
23天前
|
数据采集 SQL 分布式计算
|
1月前
|
分布式计算 Hadoop Java
Hadoop MapReduce 调优参数
对于 Hadoop v3.1.3,针对三台4核4G服务器的MapReduce调优参数包括:`mapreduce.reduce.shuffle.parallelcopies`设为10以加速Shuffle,`mapreduce.reduce.shuffle.input.buffer.percent`和`mapreduce.reduce.shuffle.merge.percent`分别设为0.8以减少磁盘IO。
|
15天前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
35 0