MapReduce自定义分区

简介: MapReduce自定义分区

自定义分区很简单,主要需要以下三步:


 **(1)创建一个类继承抽象类 Partitioner,然后重写`getPartition()`方法。** 具体实现如下所示:

publicclassTelePartitionerextendsPartitioner<FlowBeanSort, Text> {
// 重写自定义分区方法@OverridepublicintgetPartition(FlowBeanSortkey, Textvalue, intnumPartitions) {
// (1)获取手机号的前3位StringpreNum=value.toString().substring(0, 3);
// (2)定义分区号,从0开始,最大的分区号为6intpartition=6;
// (3)使用if...else判断,将手机号的前3位和分区号进行对应if ("134".equals(preNum)) {
partition=0;
        } elseif ("135".equals(preNum)) {
partition=1;
        } elseif ("136".equals(preNum)) {
partition=2;
        } elseif ("137".equals(preNum)) {
partition=3;
        } elseif ("138".equals(preNum)) {
partition=4;
        }elseif("139".equals(preNum)) {
partition=5;
        }
// (4)返回分区号returnpartition;
    }
}

 **参数解析:**


- **key**:对应 MapTask 的输出 key,所以是封装并实现了自定义排序的流量信息类 FlowBeanSort

- **value**:对应 MapTask 的输出 value,所以是手机号

- **numPartitions**:指的是设置的 ReduceTask 的数量,默认值是1。


 **(2)另外,还要在 Driver 端给任务设置需要执行的分区类:**

job.setPartitionerClass(TelePartitioner.class);

**(3)自定义分区后,要根据自定义的分区逻辑设置相应数量的ReduceTask:**

job.setNumReduceTasks(7);

 **注意:**


- 如果 ReduceTask 的数量 > getPartition 的结果数,则会多产生几个空的输出文件 part-r-000xx;

- 如果1 < ReduceTask 的数量 < getPartition 的结果数,则有一部分分区数据无处安放,会 Exception;

- 如果 ReduceTask 的数量 = 1,则不管 MapTask 端输出多少个分区文件,最终结果都交给这一个 ReduceTask,最终也就只会产生一个结果文件 part-r-00000。


 例如:假设自定义分区数为 7,则:

(1)`job.setNumReduceTasks(1);` # 会正常运行,只不过会产生一个输出文件


 (2)`job.setNumReduceTasks(2);` # 会报错


 (3)`job.setNumReduceTasks(8);` # 大于7,程序会正常运行,会产生空文件


 Map 端和 Reduce 端程序不用任何改动,使用 FlowSumSortMapper 和 FlowSumSortReducer。Driver 端添加设置自定义分区类和设置 ReduceTask 个数的代码即可。完整代码如下所示:


publicclassFlowSumSortDemo {
/*** (2)得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序;*/publicstaticvoidmain(String[] args) throwsIOException, ClassNotFoundException, InterruptedException {
// (1)获取配置信息类Configurationconf=newConfiguration();
// 指定mapreduce程序运行的hdfs的相关运行参数conf.set("fs.defaultFS", "hdfs://localhost:9000");
// (2)新建一个Job对象Jobjob=Job.getInstance(conf);
// (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)job.setJarByClass(FlowSumSortDemo.class);
// (4)指定 Mapper 类和 Reducer 类job.setMapperClass(FlowSumSortMapper.class);
job.setReducerClass(FlowSumSortReducer.class);
// 指定自定义分区类job.setPartitionerClass(TelePartitioner.class);
// 设置 ReduceTask的个数job.setNumReduceTasks(7);
// (5)指定 MapTask 的输出key-value类型job.setMapOutputKeyClass(FlowBeanSort.class);
job.setMapOutputValueClass(Text.class);
// (6)指定 ReduceTask 的输出key-value类型job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBeanSort.class);
// (7)指定该 mapreduce 程序数据的输入和输出路径PathinPath=newPath("/flow/output_sum");
PathoutPath=newPath("/flow/output_partition");
// 获取fs对象FileSystemfs=FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
        }
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
// (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出booleanwaitForCompletion=job.waitForCompletion(true);
System.exit(waitForCompletion?0 : 1);
    }
}















相关文章
|
1月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
36 0
|
7月前
|
分布式计算 数据库
35 MAPREDUCE自定义outputFormat
35 MAPREDUCE自定义outputFormat
19 0
|
7月前
|
分布式计算
29 MAPREDUCE中的分区Partitioner
29 MAPREDUCE中的分区Partitioner
37 0
|
5天前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
7 0
|
7月前
|
分布式计算 DataWorks
DataWorks想在mapreduce中指定两个表的所有分区
DataWorks想在mapreduce中指定两个表的所有分区,
28 1
|
1月前
|
存储 分布式计算 关系型数据库
MapReduce【自定义OutputFormat】
MapReduce【自定义OutputFormat】
|
1月前
|
分布式计算
MapReduce【自定义InputFormat】
MapReduce【自定义InputFormat】
|
1月前
|
分布式计算
MapReduce【自定义分区Partitioner】
MapReduce【自定义分区Partitioner】
|
7月前
|
分布式计算
36 MAPREDUCE自定义GroupingComparator
36 MAPREDUCE自定义GroupingComparator
27 0
|
7月前
|
数据采集 分布式计算
34 MAPREDUCE自定义inputFormat
34 MAPREDUCE自定义inputFormat
21 0