自定义分区很简单,主要需要以下三步:
**(1)创建一个类继承抽象类 Partitioner,然后重写`getPartition()`方法。** 具体实现如下所示:
publicclassTelePartitionerextendsPartitioner<FlowBeanSort, Text> { // 重写自定义分区方法publicintgetPartition(FlowBeanSortkey, Textvalue, intnumPartitions) { // (1)获取手机号的前3位StringpreNum=value.toString().substring(0, 3); // (2)定义分区号,从0开始,最大的分区号为6intpartition=6; // (3)使用if...else判断,将手机号的前3位和分区号进行对应if ("134".equals(preNum)) { partition=0; } elseif ("135".equals(preNum)) { partition=1; } elseif ("136".equals(preNum)) { partition=2; } elseif ("137".equals(preNum)) { partition=3; } elseif ("138".equals(preNum)) { partition=4; }elseif("139".equals(preNum)) { partition=5; } // (4)返回分区号returnpartition; } }
**参数解析:**
- **key**:对应 MapTask 的输出 key,所以是封装并实现了自定义排序的流量信息类 FlowBeanSort
- **value**:对应 MapTask 的输出 value,所以是手机号
- **numPartitions**:指的是设置的 ReduceTask 的数量,默认值是1。
**(2)另外,还要在 Driver 端给任务设置需要执行的分区类:**
job.setPartitionerClass(TelePartitioner.class);
**(3)自定义分区后,要根据自定义的分区逻辑设置相应数量的ReduceTask:**
job.setNumReduceTasks(7);
**注意:**
- 如果 ReduceTask 的数量 > getPartition 的结果数,则会多产生几个空的输出文件 part-r-000xx;
- 如果1 < ReduceTask 的数量 < getPartition 的结果数,则有一部分分区数据无处安放,会 Exception;
- 如果 ReduceTask 的数量 = 1,则不管 MapTask 端输出多少个分区文件,最终结果都交给这一个 ReduceTask,最终也就只会产生一个结果文件 part-r-00000。
例如:假设自定义分区数为 7,则:
(1)`job.setNumReduceTasks(1);` # 会正常运行,只不过会产生一个输出文件
(2)`job.setNumReduceTasks(2);` # 会报错
(3)`job.setNumReduceTasks(8);` # 大于7,程序会正常运行,会产生空文件
Map 端和 Reduce 端程序不用任何改动,使用 FlowSumSortMapper 和 FlowSumSortReducer。Driver 端添加设置自定义分区类和设置 ReduceTask 个数的代码即可。完整代码如下所示:
publicclassFlowSumSortDemo { /*** (2)得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序;*/publicstaticvoidmain(String[] args) throwsIOException, ClassNotFoundException, InterruptedException { // (1)获取配置信息类Configurationconf=newConfiguration(); // 指定mapreduce程序运行的hdfs的相关运行参数conf.set("fs.defaultFS", "hdfs://localhost:9000"); // (2)新建一个Job对象Jobjob=Job.getInstance(conf); // (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)job.setJarByClass(FlowSumSortDemo.class); // (4)指定 Mapper 类和 Reducer 类job.setMapperClass(FlowSumSortMapper.class); job.setReducerClass(FlowSumSortReducer.class); // 指定自定义分区类job.setPartitionerClass(TelePartitioner.class); // 设置 ReduceTask的个数job.setNumReduceTasks(7); // (5)指定 MapTask 的输出key-value类型job.setMapOutputKeyClass(FlowBeanSort.class); job.setMapOutputValueClass(Text.class); // (6)指定 ReduceTask 的输出key-value类型job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBeanSort.class); // (7)指定该 mapreduce 程序数据的输入和输出路径PathinPath=newPath("/flow/output_sum"); PathoutPath=newPath("/flow/output_partition"); // 获取fs对象FileSystemfs=FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, outPath); // (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出booleanwaitForCompletion=job.waitForCompletion(true); System.exit(waitForCompletion?0 : 1); } }