MapReduce(二)

简介: MapReduce(二)

WritableComparable排序(全排序)(全排序)(全排序)


排序分类


●部分排序

MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序。


●全排序

最终输出的结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所以文件,完全丧失了MapReduce所提供的并行架构。


●辅助排序(GroupingComparator分组):

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或者几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序


●二次排序


在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序


WritableComparable排序案例实操


需求


将数据按照总流量(上行流量+下行流量)从大到小排序


输入数据


手机号                  上行流量     下行流量       总流量      

 

13470253144 180 180 360
13509468723 7335  110349  117684
13560439638 918 4938  5856
13568436656 3597  25635 29232
。。。


预期结果(按照最后一列排序)


13509468723 7335  110349  117684
13736230513 2481  24681 27162
13956435636 132 1512    1644
13846544121 264 0 264


实体类实现WritableComparable接口


package com.imooc.compareto;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
//步骤1 实现WritableComparable接口
public class FlowBean implements WritableComparable<FlowBean> {
  private long upFlow;// 上行流量
  private long downFlow;// 下行流量
  private long sumFlow;// 总流量
  // 步骤2:无参构造
  public FlowBean() {
  }
  // 有参构造
  public FlowBean(long upFlow, long downFlow) {
    this.upFlow = upFlow;
    this.downFlow = downFlow;
    this.sumFlow = upFlow + downFlow;
  }
  // 步骤3:序列化方法
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
  }
  // 步骤4:反序列化方法
  // 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
  @Override
  public void readFields(DataInput in) throws IOException {
    // 必须与序列化中顺序一致
    this.upFlow = in.readLong();
    this.downFlow = in.readLong();
    this.sumFlow = in.readLong();
  }
   步骤5:比较
  @Override
  public int compareTo(FlowBean bean) {
    int result;
    // 按照总流量大小,倒序排列
    if (sumFlow > bean.getSumFlow()) {
      result = -1;
    } else if (sumFlow < bean.getSumFlow()) {
      result = 1;
    } else {
      result = 0;
    }
    return result;
  }
  // 步骤6:为后续方便,重新toString方法
  @Override
  public String toString() {
    return upFlow + "\t" + downFlow + "\t" + sumFlow;
  }
  public long getUpFlow() {
    return upFlow;
  }
  public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
  }
  public long getDownFlow() {
    return downFlow;
  }
  public void setDownFlow(long downFlow) {
    this.downFlow = downFlow;
  }
  public long getSumFlow() {
    return sumFlow;
  }
  public void setSumFlow(long sumFlow) {
    this.sumFlow = sumFlow;
  }
}


Mapper


package com.imooc.compareto;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowBeanMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     // 1 获取一行
     String line = value.toString();
     // 2 切割字段
     String[] words = line.split("\t");
     // 3 封装对象
     Text v = new Text();
     v.set(words[0]);//取出手机号码
     // 取出上行流量和下行流量
     long upFlow = Long.parseLong(words[1]);
     long downFlow = Long.parseLong(words[2]);
     long sumFlow=Long.parseLong(words[3]);
     FlowBean k = new FlowBean(upFlow, downFlow);
     // 4 写出
     context.write(k, v);
  }
}


Reducer


1. package com.imooc.compareto;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowBeanReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
  @Override
  protected void reduce(FlowBean key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {
    for (Text text : values) {
      context.write(text, key);
    }
  }
}


Driver


package com.imooc.compareto;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowBeanDriver {
  public static void main(String[] args) throws Exception {
    // 输入路径(处理E:\temp\input下的***文件)
    String inputPath = "E:\\temp\\input";
    // 输出路径(output文件夹不能存在,否则报错)
    String outputPath = "E:\\temp\\output";
    Configuration conf = new Configuration();
    // 1 获取Job对象
    Job job = Job.getInstance(conf);
    // 2 设置jar存储位置(当前类.class)
    job.setJarByClass(FlowBeanDriver.class);
    // 3 关联Map和Reduce类
    job.setMapperClass(FlowBeanMapper.class);
    job.setReducerClass(FlowBeanReducer.class);
    // 4 设置Mapper阶段输出数据的key和value类型
    job.setMapOutputKeyClass(FlowBean.class);
    job.setMapOutputValueClass(Text.class);
    // 5 设置最终数据输出的key和value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    // 6 设置输入路径和输出路径
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    // 7 提交job
    // job.submit();
    job.waitForCompletion(true);
    System.out.println("-------OVER-----------");
  }
}


WritableComparable排序(分区排序)(分区排序)(分区排序)


需求


要求每个省份(136、137、148、139、其他)手机号输出的文件中按照总流量内部排序。


需求分析:基于前一个需求WritableComparableq排序(全排序),增加自定义分区类,分区按照省份手机号设置。


自定义分区


package com.imooc.compareto_partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//泛型为map的输出类型,即reduce的输入类型
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
  @Override
  public int getPartition(FlowBean key, Text value, int numPartitions) {
    // 1 获取电话号码的前三位
    String preNum = value.toString().substring(0, 3);
    int partition = 4;
    // 2 判断是哪个省
    if ("136".equals(preNum)) {
      partition = 0;
    } else if ("137".equals(preNum)) {
      partition = 1;
    } else if ("138".equals(preNum)) {
      partition = 2;
    } else if ("139".equals(preNum)) {
      partition = 3;
    }
    return partition;
  }
}


Driver启动类


核心代码


// 指定自定义数据分区(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应数量的reducetask(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
job.setNumReduceTasks(5);


package com.imooc.compareto_partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowBeanDriver {
  public static void main(String[] args) throws Exception {
    // 输入路径(处理E:\temp\input下的***文件)
    String inputPath = "E:\\temp\\input";
    // 输出路径(output文件夹不能存在,否则报错)
    String outputPath = "E:\\temp\\output";
    Configuration conf = new Configuration();
    // 1 获取Job对象
    Job job = Job.getInstance(conf);
    // 2 设置jar存储位置(当前类.class)
    job.setJarByClass(FlowBeanDriver.class);
    // 3 关联Map和Reduce类
    job.setMapperClass(FlowBeanMapper.class);
    job.setReducerClass(FlowBeanReducer.class);
    // 4 设置Mapper阶段输出数据的key和value类型
    job.setMapOutputKeyClass(FlowBean.class);
    job.setMapOutputValueClass(Text.class);
    // 5 设置最终数据输出的key和value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    // 指定自定义数据分区(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
    job.setPartitionerClass(ProvincePartitioner.class);
    // 同时指定相应数量的reduce
    // task(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置)
    job.setNumReduceTasks(5);
    // 6 设置输入路径和输出路径
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    // 7 提交job
    // job.submit();
    job.waitForCompletion(true);
    System.out.println("-------OVER-----------");
  }
}


Combiner合并


简介


1.Combiner是MR程序中Mapper和Reducer之外的一种组件

2.Combiner组件的父类就是Reducer

3.Combiner和Reducer的区别在于运行的位置(Combiner是在每一个MapTask所在的节点运行)(Reducer是在接收全局所有Mapper的输出结果)

4.Combiner的意义就是对每一个MapTask的输出进行局部徽章,以减少网络传输量。

5.Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出KV应该和Reducer的输入KV类型对应起来


GroupingComparator分组(辅助排序)

在reduce端对key进行分组,应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序


分组排序步骤:


  • (1)自定义类继承WritableComparator
  • (2)重写compare()方法


@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}


  • (3)创建一个构造将比较对象的类传给父类


protected OrderGroupingComparator() {
super(OrderBean.class, true);
}


  • (4)在驱动里设置关联


// 8 设置reduce端的分组
job.setGroupingComparatorClass(OrderGroupingComparator.class);


GroupingComparator分组实操:


需求:统计一个订单里价格最高的数据


我认为,GroupingComparator所解决的问题是:当一个entity里有多个属性(id,price,age'等等)需要排序时候,需要将entity作为作为mapper的输出K,这样才能排序,但是因为(id,price,age)只要有一个不同,他们就不同,所以每一个k(entity)都是不同的,而我又想将id相同的但是实际不同的K伪装成相同的放在一个reduce中


比如:


(id,price)     A(1,11)   B(1,33)   C(2,33) 三个数都作为map的输出K是不相同(假设我实体类的compare方法中先按id排序,在按price排序)的,所以要进入3个reduce中,如下图


10.png


但是我的需求是id相同的进入一个reduce中,所以出现了GroupingComparator


11.png


在通俗一点:在reduce端对key进行分组,应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序


输入数据:


订单号         商品号         价格


0000001 Pdt_01  222.8
0000002 Pdt_05  722.4
0000001 Pdt_02  33.8
0000003 Pdt_06  232.8
0000003 Pdt_02  33.8
0000002 Pdt_03  522.8
0000002 Pdt_04  122.4


期望输出结果:


1. 1  222.8
2. 2  722.4
3. 3  232.8


需求分析:


  1. 1.利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
  2. 2.在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,


封装的实体类(实现WritableComparable接口)


package com.imooc.GroupingComparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean> {
  private int order_id;
  private double price;
  @Override
  public void readFields(DataInput in) throws IOException {
    this.order_id = in.readInt();
    this.price = in.readDouble();
  }
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeInt(order_id);
    out.writeDouble(price);
  }
  // 二次排序
  @Override
  public int compareTo(OrderBean o) {
    int result;
        //先按id升序,在按价格降序
    if (order_id > o.getOrder_id()) {
      result = 1;
    } else if (order_id < o.getOrder_id()) {
      result = -1;
    } else {
      // 价格倒序排序
      result = price > o.getPrice() ? -1 : 1;
    }
    return result;
  }
  public OrderBean() {
  }
  public int getOrder_id() {
    return order_id;
  }
  public void setOrder_id(int order_id) {
    this.order_id = order_id;
  }
  public double getPrice() {
    return price;
  }
  public void setPrice(double price) {
    this.price = price;
  }
  @Override
  public String toString() {
    return order_id + "\t" + price;
  }
}


Mapper


package com.imooc.GroupingComparator;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 1 获取一行
    String line = value.toString();
    // 2 截取
    String[] fields = line.split("\t");
    // 3 封装对象
    OrderBean k = new OrderBean();
    k.setOrder_id(Integer.parseInt(fields[0]));
    k.setPrice(Double.parseDouble(fields[2]));
    // 4 写出
    context.write(k, NullWritable.get());
  }
}


Reducer


package com.imooc.GroupingComparator;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
  @Override
  protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)
      throws IOException, InterruptedException {
    context.write(key, NullWritable.get());
  }
}


继承GroupingComparator的类


package com.imooc.GroupingComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroupingComparator extends WritableComparator {
  protected OrderGroupingComparator() {
    super(OrderBean.class, true);
  }
  @Override
  public int compare(WritableComparable a, WritableComparable b) {
    OrderBean aBean = (OrderBean) a;
    OrderBean bBean = (OrderBean) b;
    int result;
    if (aBean.getOrder_id() > bBean.getOrder_id()) {
      result = 1;
    } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
      result = -1;
    } else {
      result = 0;
    }
    return result;
  }
}


Driver驱动类


package com.imooc.GroupingComparator;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class OrderDriver {
  public static void main(String[] args) throws Exception {
    // 输入路径(处理E:\temp\input下的***文件)
    String inputPath = "E:\\temp\\input";
    // 输出路径(output文件夹不能存在,否则报错)
    String outputPath = "E:\\temp\\output";
    Configuration conf = new Configuration();
    // 1 获取Job对象
    Job job = Job.getInstance(conf);
    // 2 设置jar存储位置(当前类.class)
    job.setJarByClass(OrderDriver.class);
    // 3 关联Map和Reduce类
    job.setMapperClass(OrderMapper.class);
    job.setReducerClass(OrderReducer.class);
    // 4 设置Mapper阶段输出数据的key和value类型
    job.setMapOutputKeyClass(OrderBean.class);
    job.setMapOutputValueClass(NullWritable.class);
    // 5 设置最终数据输出的key和value类型
    job.setOutputKeyClass(OrderBean.class);
    job.setOutputValueClass(NullWritable.class);
    // 6 设置输入路径和输出路径
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    // 8 设置reduce端的分组
    job.setGroupingComparatorClass(OrderGroupingComparator.class);
    // 7 提交job
    // job.submit();
    job.waitForCompletion(true);
    System.out.println("-------OVER-----------");
  }
}


OutputFormat数据输出


OutputFormat接口实现类


OutputFormat是MapReduce输出的基类,所有 实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。


1.文本输出TextOutputFormat

默认的输出格式是TextOutputFormat ,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。


2. SequenceFileOutputF ormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很客易被压缩。


3.自定义OutputFormat

根据用户需求,自定义实现输出。


自定义OutputFormat步骤


自定义一个类继承FileOutputFormat

改写recordWriter,具体改写输出数据的方法write()

job.setOutputFormatClass(FilterOutputFormat.class);


自定义OutputFormat案例实操


需求:


过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。


输入数据:


http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com


期望输出数据:


12.png


Mapper


单纯的切割和写


package com.imooc.myoutputformat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    Text k=new Text(line);
    context.write(k, NullWritable.get());
  }
}


Reducer


单纯的写,但是要改一下格式,方便在输出文件里看,否则连在一起在一样


package com.imooc.myoutputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
  @Override
  protected void reduce(Text key, Iterable<NullWritable> values, Context context)
      throws IOException, InterruptedException {
    // 1 获取一行
    String line = key.toString();
    // 2 拼接
    line = line + "\r\n";
    // 3 设置key
    Text k=new Text();
    k.set(line);
    for (NullWritable nullWritable : values) {
      context.write(k, NullWritable.get());
    }
  }
}


FRecordWriter


继承RecordWriter

初始化数据流并且完整业务逻辑

而且在这个类中还可以写入MySQL,Redis等数据库中


package com.imooc.myoutputformat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FRecordWriter extends RecordWriter<Text, NullWritable> {
  FSDataOutputStream fosatguigu;
  FSDataOutputStream fosother;
  // 初始化方法
  public FRecordWriter(TaskAttemptContext job){
    try {
      // 1、获取文件系统
      FileSystem fs = FileSystem.get(job.getConfiguration());
      fosatguigu = fs.create(new Path("e:/atguigu.log"));
      // 2、创建两个文件输出流
      fosother = fs.create(new Path("e:/other.log"));
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
  // 业务逻辑类
  @Override
  public void write(Text key, NullWritable value) throws IOException, InterruptedException {
    try {
      // 判断是否包含“atguigu”输出到不同文件
      if (key.toString().contains("atguigu")) {
        fosatguigu.write(key.toString().getBytes());
      } else {
        fosother.write(key.toString().getBytes());
      }
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
  // close方法
  @Override
  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
    // 关闭流
    IOUtils.closeStream(fosatguigu);
    IOUtils.closeStream(fosother);
  }
}


FilterOutputFormat


因为上面创建了FRecordWriter类,所在下面的类中直接返回,基本不用改


package com.imooc.myoutputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable>{
  @Override
  public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
      throws IOException, InterruptedException {
    return new FRecordWriter(job);
  }
}


Driver启动类


核心代码


// 设置(自定义outputFormat重点配置)(自定义outputFormat重点配置)(自定义outputFormat重点配置)
job.setOutputFormatClass(FilterOutputFormat.class);
// 6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(inputPath));
// 虽然我们自定义了outputformat,但是因为我们的outputformat继承fileoutputforma而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path(outputPath));


package com.imooc.myoutputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterDriver {
  public static void main(String[] args) throws Exception {
    // 输入路径(处理E:\temp\input下的***文件)
    String inputPath = "E:\\temp\\input";
    // 输出路径(output文件夹不能存在,否则报错)
    String outputPath = "E:\\temp\\output";
    Configuration conf = new Configuration();
    // 设置切割符(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)
    conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
    // 1 获取Job对象
    Job job = Job.getInstance(conf);
    // 2 设置jar存储位置
    job.setJarByClass(FilterDriver.class);
    // 3 关联Map和Reduce类
    job.setMapperClass(FilterMapper.class);
    job.setReducerClass(FilterReducer.class);
    // 4 设置Mapper阶段输出数据的key和value类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);
    // 5 设置最终(reducer)数据输出的key和value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    // 设置(自定义outputFormat重点配置)(自定义outputFormat重点配置)(自定义outputFormat重点配置)
    job.setOutputFormatClass(FilterOutputFormat.class);
    // 6 设置输入路径和输出路径
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
    // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    // 7 提交job
    // job.submit();
    job.waitForCompletion(true);
    System.out.println("-------OVER-----------");
  }
}


Reduce Join


注意:


一般业务逻辑都在map阶段处理,所以不推荐reduce join ,推荐map join


工作原理


     Map端的主要工作:


为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key ,其余部分和新加的标志作为value,最后进行输出。


     Reduce端的主要工作:


在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志分开,最后进行合并就ok了。


Reduce Join案例实操


需求、输入数据、输出数据


(select id,pname,amount form t_order,t_product where t_order.pid=t_product.pid )


订单数据表t_order             +        商品信息表t_product       ======                                           最终数据形式        


微信截图_20230222163258.png


设计思路


1.创建一个(订单id,商品pid,数量amount,商品名称pname,标志位flag)实体类(包含两个表的所有属性)

2.在map阶段中根据切片的信息获取操作文件名称,并且封装对象,如果操作的名称为order.txt,flag="order”,否则flag="product"

3.map的输出 key 为共同的属性 pid ,value为封装的实体类

4.同一个reduce里有两种数据,一种是pid为某某某的订单记录(多条),另一种是pid为某某某的商品记录(一条),而且有fiag标记位,很容易分开,然后就可以拼接成最终的数据形式了


编码


实体类


package com.imooc.reducejoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
 * @author 柴火
 *
 */
public class TableBean implements Writable {
  private String order_id; // 订单id
  private String p_id; // 产品id
  private int amount; // 产品数量
  private String pname; // 产品名称
  private String flag; // 表的标记
  public TableBean() {
    super();
  }
  @Override
  public void readFields(DataInput in) throws IOException {
    this.order_id = in.readUTF();
    this.p_id = in.readUTF();
    this.amount = in.readInt();
    this.pname = in.readUTF();
    this.flag = in.readUTF();
  }
  @Override
  public void write(DataOutput out) throws IOException {
    out.writeUTF(order_id);
    out.writeUTF(p_id);
    out.writeInt(amount);
    out.writeUTF(pname);
    out.writeUTF(flag);
  }
  @Override
  public String toString() {
    return order_id + "\t" + amount + "\t" + pname;
  }
  public String getOrder_id() {
    return order_id;
  }
  public void setOrder_id(String order_id) {
    this.order_id = order_id;
  }
  public String getP_id() {
    return p_id;
  }
  public void setP_id(String p_id) {
    this.p_id = p_id;
  }
  public int getAmount() {
    return amount;
  }
  public void setAmount(int amount) {
    this.amount = amount;
  }
  public String getPname() {
    return pname;
  }
  public void setPname(String pname) {
    this.pname = pname;
  }
  public String getFlag() {
    return flag;
  }
  public void setFlag(String flag) {
    this.flag = flag;
  }
}


mapper


package com.imooc.reducejoin;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
  String name;
  TableBean bean = new TableBean();
  Text k = new Text();
  @Override
  protected void setup(Context context){
    // 1 获取输入文件切片
    FileSplit split = (FileSplit) context.getInputSplit();
    // 2 获取输入文件名称
    name = split.getPath().getName();
  }
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 1 获取输入数据
    String line = value.toString();
    // 2 不同文件分别处理
    if (name.startsWith("order")) {// 订单表处理
      // 2.1 切割
      String[] fields = line.split("\t");
      // 2.2 封装bean对象
      bean.setOrder_id(fields[0]);
      bean.setP_id(fields[1]);
      bean.setAmount(Integer.parseInt(fields[2]));
      bean.setPname("");
      bean.setFlag("order");
      k.set(fields[1]);
    } else {// 产品表处理
      // 2.3 切割
      String[] fields = line.split("\t");
      // 2.4 封装bean对象
      bean.setP_id(fields[0]);
      bean.setPname(fields[1]);
      bean.setFlag("pd");
      bean.setAmount(0);
      bean.setOrder_id("");
      k.set(fields[0]);
    }
    // 3 写出
    context.write(k, bean);
  }
}


Reducer


package com.imooc.reducejoin;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
  @Override
  protected void reduce(Text key, Iterable<TableBean> values, Context context)
      throws IOException, InterruptedException {
    // 1准备存储订单的集合
    ArrayList<TableBean> orderBeans = new ArrayList<>();
    // 2 准备bean对象
    TableBean pdBean = new TableBean();
    for (TableBean bean : values) {
      if ("order".equals(bean.getFlag())) {// 订单表
        // 拷贝传递过来的每条订单数据到集合中
        TableBean orderBean = new TableBean();
        try {
          BeanUtils.copyProperties(orderBean, bean);
        } catch (Exception e) {
          e.printStackTrace();
        }
        orderBeans.add(orderBean);
      } else {// 产品表
        try {
          // 拷贝传递过来的产品表到内存中
          BeanUtils.copyProperties(pdBean, bean);
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
    // 3 表的拼接
    for (TableBean bean : orderBeans) {
      bean.setPname(pdBean.getPname());
      System.out.println(bean);
      // 4 数据写出去
      context.write(bean, NullWritable.get());
    }
  }
}


Driver驱动类


package com.imooc.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TableDriver {
  public static void main(String[] args) throws Exception {
    // 0 根据自己电脑路径重新配置
    args = new String[] { "e:/temp/input", "e:/temp/output" };
    // 1 获取配置信息,或者job对象实例
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    // 2 指定本程序的jar包所在的本地路径
    job.setJarByClass(TableDriver.class);
    // 3 指定本业务job要使用的Mapper/Reducer业务类
    job.setMapperClass(TableMapper.class);
    job.setReducerClass(TableReducer.class);
    // 4 指定Mapper输出数据的kv类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(TableBean.class);
    // 5 指定最终输出的数据的kv类型
    job.setOutputKeyClass(TableBean.class);
    job.setOutputValueClass(NullWritable.class);
    // 6 指定job的输入原始文件所在目录
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
    job.waitForCompletion(true);
    System.out.println("---------OVER------------");
  }
}


Map Join


使用场景


Map Join适用于一张表十分小、一张表很大的场景。


优点


思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。


具体办法:采用DistributedCache


(1)在Mapper的setup阶段,将文件读取到缓存集合中。

(2)在驱动函数中加载缓存。

// 缓存普通文件到Task运行节点。

job.addCacheFile(new URI("file://e:/cache/pd.txt"));


MapJoin实操


需求、输入数据、输出数据同上面的ReduceJoin


编码步骤


  • 创建Mapper类
  • 重新setup方法,读小表


@Override
  protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
      throws IOException, InterruptedException {
    // 1 获取缓存的文件
    URI[] cacheFiles = context.getCacheFiles();
    String path = cacheFiles[0].getPath().toString();
    BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
    String line;
    while (StringUtils.isNotEmpty(line = reader.readLine())) {
      // 2 切割
      String[] fields = line.split("\t");
      // 3 缓存数据到集合
      pdMap.put(fields[0], fields[1]);
    }
    // 4 关流
    reader.close();
  }


  • 重新map方法,进行join
  • 编写Driver驱动类

// 5、设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6 加载缓存数据
job.addCacheFile(new URI("file:///e:/temp/product.txt"));
// 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);


编码


mapper


package com.imooc.mapjoin;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
  Map<String, String> pdMap = new HashMap<>();
  @Override
  protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
      throws IOException, InterruptedException {
    // 1 获取缓存的文件
    URI[] cacheFiles = context.getCacheFiles();
    String path = cacheFiles[0].getPath().toString();
    BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
    String line;
    while (StringUtils.isNotEmpty(line = reader.readLine())) {
      // 2 切割
      String[] fields = line.split("\t");
      // 3 缓存数据到集合
      pdMap.put(fields[0], fields[1]);
    }
    // 4 关流
    reader.close();
  }
  Text k = new Text();
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 1 获取一行
    String line = value.toString();
    // 2 截取
    String[] fields = line.split("\t");
    // 3 获取产品id
    String pId = fields[1];
    // 4 获取商品名称
    String pdName = pdMap.get(pId);
    // 5 拼接
    k.set(line + "\t" + pdName);
    // 6 写出
    context.write(k, NullWritable.get());
  }
}


Driver


package com.imooc.mapjoin;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistributedCacheDriver {
  public static void main(String[] args) throws Exception {
    // 0 根据自己电脑路径重新配置
    args = new String[] { "e:/temp/input", "e:/temp/output" };
    // 1 获取job信息
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    // 2 设置加载jar包路径
    job.setJarByClass(DistributedCacheDriver.class);
    // 3 关联map
    job.setMapperClass(DistributedCacheMapper.class);
    // 4、设置输入输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 5、设置最终输出数据类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    // 6 加载缓存数据
    job.addCacheFile(new URI("file:///e:/temp/product.txt"));
    // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
    job.setNumReduceTasks(0);
    // 8 提交
    boolean result = job.waitForCompletion(true);
    System.exit(result ? 0 : 1);
  }
}


倒排索引案例(多job串联)


敲黑板


多job串联不是配置在一起的,而是单独运行的,比如2个job串联就有2个驱动类,而不是一个驱动类


输入数据


13.png


期望输出数据


atguigu c.txt-->2 b.txt-->2 a.txt-->3

pingping c.txt-->1 b.txt-->3 a.txt-->1

ss c.txt-->1 b.txt-->1 a.txt-->2


操作步骤


job1


首先将a.txt


1. atguigu pingping
2. atguigu ss
3. atguigu ss


转换为


1. atguigu--a.txt 3
2. ss--a.txt 2
3. .....


切分,然后在setup中获得切片中文件的名称(a.txt),在map方法中将其拼接在atguigu后作为K,传入到reduce中,value为1


在reduce阶段进行累加


job2


job1的输出结果,即为job2的输出结果


atguigu--a.txt  3
atguigu--b.txt  2
atguigu--c.txt  2
pingping--a.txt 1
pingping--b.txt 3
pingping--c.txt 1
ss--a.txt 2
ss--b.txt 1
ss--c.txt 1


在map阶段将 (atguigu--a.txt  3)切分为  key=atguigu          value=a.txt  3        ,传给reduce


在reduce阶段将key相同的values(a.txt   3      b.txt   3   c.txt   2       )进行拼接,从而得出预期结果


编码


如果上面看懂的话在这编码已经不是问题了,重要的是思路,思路,思路


项目代码下载


github


 Demoo/MapReduce at master · cbeann/Demoo · GitHub


码云


Demoo: SSM,SpringBoot or other demo - Gitee.com


百度网盘


链接:https://pan.baidu.com/s/1zmdNlZen7ZhoMeL-pKbsuA

提取码:eejl

复制这段内容后打开百度网盘手机App,操作更方便哦


目录
相关文章
|
分布式计算 数据处理
38 MAPREDUCE中的其他应用
38 MAPREDUCE中的其他应用
51 0
|
存储 分布式计算 监控
19 为什么要MAPREDUCE?
19 为什么要MAPREDUCE?
70 0
|
分布式计算
37 MAPREDUCE中的DistributedCache应用
37 MAPREDUCE中的DistributedCache应用
40 0
|
数据采集 机器学习/深度学习 存储
E-MapReduce
E-MapReduce(简称EMR)是阿里云提供的一项大数据处理服务,它基于开源的 Apache Hadoop 和 Apache Spark 构建,并提供了易于使用的 Web 界面和 API 接口,方便用户快速创建、调度和管理大数据处理作业。
249 2
|
分布式计算 并行计算 大数据
初识MapReduce
初识MapReduce
82 0
|
存储 分布式计算 资源调度
|
存储 分布式计算 资源调度
|
存储 缓存 分布式计算
MapReduce —— 历久而弥新(1)
MapReduce —— 历久而弥新(1)
177 0
MapReduce —— 历久而弥新(1)
|
分布式计算 Hadoop Java
MapReduce使用
MapReduce使用
108 0
MapReduce使用
|
存储 分布式计算 监控
MapReduce —— 历久而弥新(2)
MapReduce —— 历久而弥新(2)
146 0