Partitioner与自定义Partitioner

简介: Partitioner与自定义Partitioner

通过前面的学习我们知道Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。在一些集群应用中,例如分布式缓存集群中,缓存的数据大多都是靠哈希函数来进行数据的均匀分布的,在Hadoop中也不例外。


Hadoop内置Partitioner


MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区,鉴于此,Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所示:

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}


现在我们来看看HashPartitoner所做的事情,其关键代码就一句:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

这段代码实现的目的是将key均匀分布在Reduce Tasks上,例如:如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int整数。但是,如果string太大的话这个int整数值可能会溢出变成负数,所以和整数的上限值Integer.MAX_VALUE(即0111111111111111)进行与运算,然后再对reduce任务个数取余,这样就可以让key均匀分布在reduce上。


image.png


自己定制Partitioner

流量汇总程序开发

这里添加了新需求,要求流量汇总统计并按省份区分。也就是说不但计算每个用户(手机号)的上行流量,下行流量,总流量外,还要按照每个手机号所属不同的省份来将计算结果写到不同的文件中(假如共4个省份,那么需要将输出结果写到4个文件中,也就是说有4个分区每个分区对应一个reduce task)。

public class Flowcount {
    /**
     * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,但是在hadoop中有自己的
     * 更精简的序列化接口(Seria会将类结构都序列化,而实际我们只需要序列化数据),所以不直接用Long,而用LongWritable
     * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
     * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key
     * VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value
     * @author 12706
     *
     */
    static class FlowcountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //输入为1234    23455   33333   33333(中间是制表符)
            //第二列为手机号,倒数第二列为下行流量,倒数第三列为上行流量
            String line = value.toString();
            String[] values = line.split("\t");
            //获取手机号
            String phoneNum = values[1];
            //获取上行流量下行流量
            long upFlow = new Long(values[values.length-3]);
            long downFlow = new Long(values[values.length-2]);
            //封装好后写出到输出收集器
            context.write(new Text(phoneNum), new FlowBean(upFlow,downFlow));
        }
    }
    /**
     * KEYIN VALUEIN对应mapper输出的KEYOUT KEYOUT类型对应
     * KEYOUT,VALUEOUT:是自定义reduce逻辑处理结果的输出数据类型
     * KEYOUT
     * VALUEOUT
     * @author 12706
     *
     */
    static class FlowcountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        @Override
        protected void reduce(Text key, Iterable<FlowBean> beans,Context context)
                throws IOException, InterruptedException {
            //传进来的实例<13345677654,beans>,即多个该电话的键值对
            //取出values获得上下行和总流量求和
            long upFlow = 0;
            long downFlow = 0;
            for (FlowBean flowBean : beans) {
                upFlow += flowBean.getUpFlow();
                downFlow += flowBean.getDownFlow();
            }
            context.write(key, new FlowBean(upFlow,downFlow));
        }
    }
    /**
     * 相当于一个yarn集群的客户端
     * 需要在此封装mr程序的相关运行参数,指定jar包
     * 最后提交给yarn
     * @author 12706
     * @param args
     * @throws Exception
     *
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //指定Partitioner
        job.setPartitionerClass(FlowPartitioner.class);
        //设置reduce task数量
        job.setNumReduceTasks(5);
        job.setJarByClass(Flowcount.class);
        //指定本业务job要使用的mapper,reducer业务类
        job.setMapperClass(FlowcountMapper.class);
        job.setReducerClass(FlowcountReducer.class);
        //虽然指定了泛型,以防框架使用第三方的类型
                //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //指定job输入原始文件所在位置
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job输入原始文件所在位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //将job中配置的相关参数以及job所用的java类所在的jar包,提交给yarn去运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

public class FlowBean implements Writable{
    private long upFlow;//上行流量
    private long downFlow;//下行流量
    private long totalFlow;//总流量
    //序列化时需要无参构造方法
    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }
    //序列化方法 hadoop的序列化很简单,要传递的数据写出去即可
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }
    //反序列化方法 注意:反序列化的顺序跟序列化的顺序完全一致
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }
    //重写toString以便展示
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + totalFlow;
    }
    get,set方法
}

/**
 * Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask
 *默认的分发规则为:根据key的hashcode%reducetask数来分发
 *所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
 *自定义一个CustomPartitioner继承抽象类:Partitioner
 *然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)
 * @author 12706
 *
 */
public class FlowPartitioner extends Partitioner<Text, FlowBean>{
    private static HashMap<String, Integer> map = new HashMap<String, Integer>();
    static {
        //模拟手机号归属地 0:北京,1:上海,2:广州,3:深圳,4:其它
        map.put("135", 0);
        map.put("136", 1);
        map.put("137", 2);
        map.put("138", 3);
    }
    //返回分区号
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        //进来的数据是<13567898766,flowbean1>,flowbean1中封装了上下行流量,总流量
        String phoneNum = key.toString();
        //截取手机号前3位
        String num = phoneNum.substring(0, 3);
        //获取对应的省
        Integer provinceId = map.get(num);
        return provinceId==null?4:provinceId;
    }
}

测试程序

将工程打jar包到本地,上传到linux,启动hadoop集群

数据以及在hdfs下的文件均使用流量汇总程序中的。使用以下命令

[root@mini2 ~]# hadoop jar flowcount.jar com.scu.hadoop.partitioner.Flowcount /flowcount/input /flowcount/output
17/10/09 10:47:49 INFO mapreduce.JobSubmitter: number of splits:1
17/10/09 10:47:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1507516839481_0001
17/10/09 10:47:50 INFO impl.YarnClientImpl: Submitted application application_1507516839481_0001
17/10/09 10:47:50 INFO mapreduce.Job: The url to track the job: http://mini1:8088/proxy/application_1507516839481_0001/
17/10/09 10:47:50 INFO mapreduce.Job: Running job: job_1507516839481_0001
17/10/09 10:47:58 INFO mapreduce.Job: Job job_1507516839481_0001 running in uber mode : false
17/10/09 10:47:58 INFO mapreduce.Job:  map 0% reduce 0%
17/10/09 10:48:03 INFO mapreduce.Job:  map 100% reduce 0%
17/10/09 10:48:13 INFO mapreduce.Job:  map 100% reduce 20%
17/10/09 10:48:14 INFO mapreduce.Job:  map 100% reduce 40%
17/10/09 10:48:19 INFO mapreduce.Job:  map 100% reduce 100%
17/10/09 10:48:20 INFO mapreduce.Job: Job job_1507516839481_0001 completed successfully
17/10/09 10:48:21 INFO mapreduce.Job: Counters: 50
        File System Counters
                FILE: Number of bytes read=863
                FILE: Number of bytes written=642893
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2278
                HDFS: Number of bytes written=551
                HDFS: Number of read operations=18
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=10
        Job Counters 
                Killed reduce tasks=1
                Launched map tasks=1
                Launched reduce tasks=5
                Data-local map tasks=1
    ...

从打印信息可以看到切片splits为1,即一个maptask从Job Counters可以看出map tasks=1,reduce tasks=5所以输出文件应该也有5个。

查看输出

[root@mini2 ~]# hadoop fs -ls /flowcount/output
-rw-r--r--   2 root supergroup          0 2017-10-09 10:48 /flowcount/output/_SUCCESS
-rw-r--r--   2 root supergroup         84 2017-10-09 10:48 /flowcount/output/part-r-00000
-rw-r--r--   2 root supergroup         53 2017-10-09 10:48 /flowcount/output/part-r-00001
-rw-r--r--   2 root supergroup        104 2017-10-09 10:48 /flowcount/output/part-r-00002
-rw-r--r--   2 root supergroup         22 2017-10-09 10:48 /flowcount/output/part-r-00003
-rw-r--r--   2 root supergroup        288 2017-10-09 10:48 /flowcount/output/part-r-00004

确实是5个文件

查看每个文件内容

[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00000
13502468823     7335    110349  117684
13560436666     1116    954     2070
13560439658     2034    5892    7926
[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00001
13602846565     1938    2910    4848
13660577991     6960    690     7650
...

按照省份划分了5个文件,每个文件里面有对应省份手机号与计算出的上行流量,下行流量,总流量。

目录
相关文章
|
10月前
|
分布式计算
29 MAPREDUCE中的分区Partitioner
29 MAPREDUCE中的分区Partitioner
63 0
|
分布式计算 Spark
通过spark.default.parallelism谈Spark并行度
本篇文章首先通过大家熟知的一个参数spark.default.parallelism为引,聊一聊Spark并行度都由哪些因素决定?
通过spark.default.parallelism谈Spark并行度
|
4月前
|
分布式计算
MapReduce【自定义分区Partitioner】
MapReduce【自定义分区Partitioner】
|
10月前
|
分布式计算
36 MAPREDUCE自定义GroupingComparator
36 MAPREDUCE自定义GroupingComparator
35 0
|
SQL 存储 缓存
让你真正理解什么是SparkContext, SQLContext 和HiveContext
让你真正理解什么是SparkContext, SQLContext 和HiveContext
265 0
让你真正理解什么是SparkContext, SQLContext 和HiveContext
|
SQL 分布式计算 HIVE
spark2.2 SparkSession思考与总结1
spark2.2 SparkSession思考与总结1
99 0
spark2.2 SparkSession思考与总结1
|
SQL 缓存 分布式计算
spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么
spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么
265 0
|
存储 分布式计算 Scala
Spark - 一文搞懂 Partitioner
spark 处理 RDD 时提供了 foreachPartition 和 mapPartition 的方法对 partition 进行处理,一个 partition 内可能包含一个文件或者多个文件的内容,Partitioner 可以基于 pairRDD 的 key 实现自定义partition 的内容。
275 0
Spark - 一文搞懂 Partitioner
|
分布式计算 算法 大数据
Spark - RangePartitioner rangeBounds 生成 源码分析 & 实践
本文主要探索RangePartitioner 源码中rangeBounds 的生成,rangeBounds 用于对 key 进行范围分区,通过源码可以学习到如何在分布式大数据下采样并获取近似均分的范围。
173 0
Spark - RangePartitioner rangeBounds 生成 源码分析 & 实践
|
分布式计算 Spark Java
Spark2.4.0 SparkSession 源码分析
创建SparkContext new SparkSession
3267 0