MapReduce之输出结果排序

简介: 前面的案例中我们介绍了统计出每个用户的上行流量,下行流量及总流量,现在我们想要将输出的结果按照总流量倒序排序。


 前面的案例中我们介绍了统计出每个用户的上行流量,下行流量及总流量,现在我们想要将输出的结果按照总流量倒序排序。

image.png

实现思路

 MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前会排序),排序的依据是map输出的key。所以我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法来指定比较规则

实现步骤

1.自定义Bean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
 * 存储流量相关数据
 * @author 波波烤鸭
 *
 */
public class Flow implements WritableComparable<Flow> {
  // 上下流量
  private long upFlow;
  // 下行流量
  private long downFlow;
  // 总流量
  private long sumFlow;
  /**
   * 比较Flow对象的总流量
   */
  @Override
  public int compareTo(Flow o) {
    // TODO Auto-generated method stub
    return -(int)(this.sumFlow - o.getSumFlow());
  }
  public long getUpFlow() {
    return upFlow;
  }
  public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
  }
  public long getDownFlow() {
    return downFlow;
  }
  public void setDownFlow(long downFlow) {
    this.downFlow = downFlow;
  }
  public Flow(long upFlow, long downFlow) {
    super();
    this.upFlow = upFlow;
    this.downFlow = downFlow;
    this.sumFlow = upFlow + downFlow;
  }
  /**
   * 无参构造方法必须要有 反射的时候需要用到
   */
  public Flow() {
    super();
  }
  /**
   * 序列化方法
   */
  @Override
  public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
  }
  @Override
  public String toString() {
    return upFlow + "\t" + downFlow + "\t" + sumFlow;
  }
  /**
   * 反序列化 反序列化的顺序和序列化的顺序一致
   */
  @Override
  public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
  }
  public long getSumFlow() {
    return sumFlow;
  }
  public void setSumFlow(long sumFlow) {
    this.sumFlow = sumFlow;
  }
}

2.Map阶段

public class FlowCountMap extends Mapper<LongWritable, Text, Flow, Text> {
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 将一行数据转换为String
    String line = value.toString();
    // 切分字段
    String[] fields = line.split("\t");
    // 取出手机号
    String phoneNum = fields[0];
    // 取出上行流量下行流量
    long upFlow = Long.parseLong(fields[fields.length - 3]);
    long downFlow = Long.parseLong(fields[fields.length - 2]);
    Flow flow = new Flow(upFlow, downFlow);
    context.write(flow, new Text(phoneNum));
  }
}

3.Reduce阶段

public class FlowCountReducer extends Reducer<Flow, Text, Text, Flow>{
  @Override
  protected void reduce(Flow flow, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {
    String phone = values.iterator().next().toString();
    // 输出结果
    context.write(new Text(phone), flow);
  }
}

4.启动类

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration(true);
    conf.set("mapreduce.framework.name", "local");
    // 输出到HDFS文件系统中
    // conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000");
    // 输出到本地文件系统
    conf.set("fs.defaultFS", "file:///");
    Job job = Job.getInstance(conf);
    job.setJarByClass(FlowTest.class);
    // 指定本job要使用的map/reduce的工具类
    job.setMapperClass(FlowCountMap.class);
    job.setReducerClass(FlowCountReducer.class);
    // 指定mapper输出kv的类型
    job.setMapOutputKeyClass(Flow.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Flow.class);
    // 指定job的原始文件输入目录
    // 6.设置输出输出类
    FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/sort/input/"));
    FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/sort/output/"));
    //将job中配置的相关参数,以及job所用的jar包提交给yarn运行
    //job.submit();  waitForCompletion等待执行完成
    boolean flag = job.waitForCompletion(true);
    System.exit(flag?0:1);
  }
}

5.输出结果

image.png

image.png

image.png

成功倒序输出

本案例的目的有两个:

   实现对输出结果排序我们可以在自定义对象的compareTo方法中指定

   如果一次MapReduce任务获取不到我们需要的结果我们可以对输出的结果做多次MapReduce任务。


相关文章
|
分布式计算 Java 大数据
MapReduce基础编程之按日期统计及按日期排序(下)
MapReduce基础编程之按日期统计及按日期排序(下)
354 0
MapReduce基础编程之按日期统计及按日期排序(下)
|
存储 分布式计算 Hadoop
MapReduce排序
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数
|
分布式计算 算法 Java
MapReduce入门编程-成绩求和排序
MapReduce入门编程-成绩求和排序
MapReduce入门编程-成绩求和排序
|
分布式计算
mapreduce辅助排序和序列化的实例练习
mapreduce辅助排序和序列化的实例练习
108 0
mapreduce辅助排序和序列化的实例练习
|
分布式计算 Java Hadoop
MapReduce基础编程之按日期统计及按日期排序(上)
MapReduce基础编程之按日期统计及按日期排序(上)
275 0
MapReduce基础编程之按日期统计及按日期排序(上)
|
分布式计算 搜索推荐 Java
一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序
一:序列化概念 序列化(Serialization)是指把结构化对象转化为字节流。反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。Java序列化(java.io.Serializable) 二:Hadoop序列化的特点 (1):序列化格式特点:  紧凑:高效使用存储空间。
1507 0