MapReduce之分区器(Partitioner)

简介: Partitioner 组件可以对 MapTask后的数据按Key进行分区,从而将不同分区的Key交由不同的Reduce处理。这个也是我们经常会用到的功能


Partitactioner

 Partitioner 组件可以对 MapTask后的数据按Key进行分区,从而将不同分区的Key交由不同的Reduce处理。这个也是我们经常会用到的功能。

1.使用场景

 比如上个案例中我们统计出来了每个用户的流量数据,那么我们接下来想把统计的用户数据根据不同的手机号输出到不同的文件中,那么这时使用分区器就非常合适了。

2.HashPartitioner

 在一般的 MapReduce 过程中,我们知道可以通过 job.setNumReduceTasks(N) 来创建多个 ReducerTask 进行处理任务。可是这种情况下,系统会调用默认的Partitioner也就是 HashPartitioner来对Map的 key 进行分区。进入 Hadoop 的源码,可以看到 HashPartitioner 的实现其实很简单。如下:

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
  public void configure(JobConf job) {}
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    // key的hash值与integer的最大值取与然后对ReduceTask的个数取余
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

 hash的好处是可以很key的分布更加随机,但是这样会将一些不同的key放在同一个分区中,这并不是我们所期望的。

3.自定义Partitioner

 面对HashPartitioner所具有的局限,我们可以通过自定义Partitioner来解决,如下:

3.1 实现自定义分区器

import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 自定义分区器
 * @author 波波烤鸭
 *
 */
public class CustomPartitioner extends Partitioner< Text, Flow>{
  private static Map<String, Integer> map = new HashMap<>();
  // 此处我们将数据写死,实际开发中我们应该从对应的数据源中获取数据然后存储在缓存中(Redis)
  static{
    map.put("138", 0);
    map.put("139", 1);
    map.put("158", 2);
    map.put("159", 3);
  }
  /**
   * 根据key获取对应的分区号
   * @param key 就是用的手机号码
   * @param value 统计的用户的信息
   */
  @Override
  public int getPartition(Text key, Flow value, int numPartitions) {
    // 获取手机号码的前3位 138
    String prefix = key.toString().substring(0, 3);
    return map.containsKey(prefix)?map.get(prefix):4;
  }
}

3.2 启动类设置

image.png

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration(true);
    conf.set("mapreduce.framework.name", "local");
    // 输出到HDFS文件系统中
    // conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000");
    // 输出到本地文件系统
    conf.set("fs.defaultFS", "file:///");
    Job job = Job.getInstance(conf);
    job.setJarByClass(FlowTest.class);
    // 设置ReduceTask的个数
    job.setNumReduceTasks(5);
    // 设置自定义的分区器
    job.setPartitionerClass(CustomPartitioner.class);
    // 指定本job要使用的map/reduce的工具类
    job.setMapperClass(MyMapTask.class);
    job.setReducerClass(MyReduceTask.class);
    // 指定mapper输出kv的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Flow.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Flow.class);
    // 指定job的原始文件输入目录
    // 6.设置输出输出类
    FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/flow/input/"));
    FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/flow/output/"));
    //将job中配置的相关参数,以及job所用的jar包提交给yarn运行
    //job.submit();  waitForCompletion等待执行完成
    boolean flag = job.waitForCompletion(true);
    System.exit(flag?0:1);
  }

 MapTask和ReduceTask的代码内容不需要改变,可以参考上篇内容。

image.png

Ok ~ partitioner的作用就是用来对Map之后的数据做分区处理操作!


相关文章
|
8月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
114 0
|
分布式计算
29 MAPREDUCE中的分区Partitioner
29 MAPREDUCE中的分区Partitioner
94 0
|
5月前
|
分布式计算 负载均衡 Hadoop
MapReduce 分区器的作用与重要性
【8月更文挑战第31天】
83 1
|
8月前
|
数据采集 分布式计算 DataWorks
DataWorks产品使用合集之在DataWorks中,在MapReduce作业中指定两个表的所有分区如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
97 0
|
分布式计算 DataWorks
DataWorks想在mapreduce中指定两个表的所有分区
DataWorks想在mapreduce中指定两个表的所有分区,
50 1
|
8月前
|
分布式计算
MapReduce【自定义分区Partitioner】
MapReduce【自定义分区Partitioner】
|
存储 分布式计算 Hadoop
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
Hadoop中的MapReduce框架原理、Shuffle机制、Partition分区、自定义Partitioner步骤、在Job驱动中,设置自定义Partitioner、Partition 分区案例
|
分布式计算
MapReduce自定义分区
MapReduce自定义分区
|
分布式计算 负载均衡 Hadoop
MapReduce 默认分区介绍
MapReduce 默认分区介绍