输入数据文件:
期望输出文件:
程序编写:
程序主体架构:
(1)FlowMapper编写
package org.example.Partitioner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private Text outK = new Text(); private FlowBean outV = new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { //获取一行 //1 13736230511 192.196.100.1 www.baidu.com 6789 12345 400 String line = value.toString(); //切割 //1,13736230511,192.196.100.1,www.baidu.com,6789,12345,400 String[] split = line.split("\t"); //抓取数据 //手机号 13736230511 //上行流量和下行流量 6789 12345 String phone = split[1]; String up = split[split.length - 3]; //7-() = 4 String down = split[split.length - 2]; //6-() = 3 //封装 outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow(); //写出 context.write(outK, outV); } }
(2)FlowReducer编写
package org.example.Partitioner; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { private FlowBean outV = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { //遍历集合累加值 long totalup = 0; long totaldown = 0; for (FlowBean value : values) { totalup += value.getUpFlow(); totaldown += value.getDownFlow(); } //封装outK,outV outV.setUpFlow(totalup); outV.setDownFlow(totaldown); outV.setSumFlow(); //写出 context.write(key, outV); } }
(3)FlowBean编写
package org.example.Partitioner; /* * 1、定义类实现writable接口 * 2、重写序列化和反序列化方法 * 3、重写空参构造 * 4、重写toString方法 */ import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { //上行流量 private long upFlow; //下行流量 private long downFlow; //总流量 private long sumFlow; public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //空参构造 public FlowBean() { } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
(4)provincePartitioner编写
package org.example.Partitioner; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int i) { String phone = text.toString(); String prephone = phone.substring(0, 3); int partitioner; if ("136".equals(prephone)) { partitioner = 0; } else if ("137".equals(prephone)) { partitioner = 1; } else if ("138".equals(prephone)) { partitioner = 2; } else if ("139".equals(prephone)) { partitioner = 3; } else { partitioner = 4; } return partitioner; } }
(5)FlowDriver编写
package org.example.Partitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //1、获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2、设置jar job.setJarByClass(FlowDriver.class); //3、关联mapper 和 reducer job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //4、设置mapper输出的k.v类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //自定义 job.setPartitionerClass(ProvincePartitioner.class); //分区为5 job.setNumReduceTasks(5); //5、设置最终输出k.v类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //6、设置数据输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path("E:\\input\\inputflow")); FileOutputFormat.setOutputPath(job, new Path("E:\\output\\ProvincePartitioner")); //7、提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
数据分析:
注:由于分区自定义设置为5,则应共产生5个输出文件,上图所示,与预想效果一致
//自定义
job.setPartitionerClass(ProvincePartitioner.class);
//分区为5
job.setNumReduceTasks(5);
MapReduce之ProvincePartitioner案例实施(序列化案例实施)完成