需求
数据
0000001 Pdt_01 222.8 0000002 Pdt_06 722.4 0000001 Pdt_05 25.8 0000003 Pdt_01 222.8 0000003 Pdt_01 33.8 0000002 Pdt_03 522.8 0000002 Pdt_04 122.4
订单号 x 价格
希望的到的数据: 根据订单号得到分区,每个分区清洗出最大的价格
分区1
0000001 222.8
分区2
0000002 722.4
分区3
0000003 222.8
思路
这个每行数据可以定义一个orderbean,实现序列化接口,自定义比较器,先根据id排序,然后根据价格倒序,重写写tostring方法(这里定义输出格式)map的key就是一个orderben,value可以为空分组; 根据orderbean的id进行分组,因为上面一步价格倒序了,所以分组得到的第一条数据都是最大的价格,满足要求分区,根据订单号id进行分区
reduce合并,输出key orderbean,value可以为空自定义main函数,将map, 分组比较器,分区,reduce等写在job中,执行job任务,指定到文件夹输出即可码代码
bean
@Setter @Getter public class OrderBean implements WritableComparable<OrderBean> { private int id; private double price; @Override public int compareTo(OrderBean o) { if (this.id > o.id) { return 1; } else if (this.id < o.id) { return -1; } else { return this.price - o.price > 0 ? -1 : 1; } } @Override public void write(DataOutput output) throws IOException { output.writeInt(this.id); output.writeDouble(this.price); } @Override public void readFields(DataInput input) throws IOException { this.id = input.readInt(); this.price = input.readDouble(); } @Override public String toString() { return this.id + "\t" + this.price; } }
map
//0000002 Pdt_04 122.4 //mapreduce根据key进行排序,所以k2,v2的类型应该为 FlowBean, NullWritable public class OrderMap extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); String[] split = s.split("\t"); bean.setId(Integer.parseInt(split[0])); bean.setPrice(Double.parseDouble(split[2])); context.write(bean, NullWritable.get()); } }
分组比较器(辅助排序)、
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparator extends WritableComparator { public GroupComparator() { //按照自定义的orderbean排序,第二个参数是辅助排序 super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean)a; OrderBean ob = (OrderBean)b; if(oa.getId() > ob.getId()){ return 1; }else if(oa.getId() < ob.getId()){ return -1; }else{ return 0; } } }
分区
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderPartition extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) { return (orderBean.getId() & Integer.MAX_VALUE) % 3; } }
reduce
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OrderReduce extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean bean, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(bean, NullWritable.get()); } }
测试job任务
public class MainOrder { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"F:\\input\\GroupingComparator.txt", "F:\\output\\GroupingComparator4"}; //获取配置文件 Configuration conf = new Configuration(); //创建job任务 Job job = Job.getInstance(conf); // 加载jar包 job.setJarByClass(MainOrder.class); //指定Map类和map的输出类型 Text, NullWritable //关联map和reduce job.setMapperClass(OrderMap.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 设置最终输出类型 job.setReducerClass(OrderReduce.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); //指定辅助排序 job.setGroupingComparatorClass(GroupComparator.class); //设置分区 job.setPartitionerClass(OrderPartition.class); job.setNumReduceTasks(3); //指定数据输入的路径和输出的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
查看结果
完美!!大功告成!!