WritableComparable排序(全排序)(全排序)(全排序)
排序分类
●部分排序
MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序。
●全排序
最终输出的结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所以文件,完全丧失了MapReduce所提供的并行架构。
●辅助排序(GroupingComparator分组):
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或者几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序
●二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序
WritableComparable排序案例实操
需求
将数据按照总流量(上行流量+下行流量)从大到小排序
输入数据
手机号 上行流量 下行流量 总流量
13470253144 180 180 360 13509468723 7335 110349 117684 13560439638 918 4938 5856 13568436656 3597 25635 29232 。。。
预期结果(按照最后一列排序)
13509468723 7335 110349 117684 13736230513 2481 24681 27162 13956435636 132 1512 1644 13846544121 264 0 264
实体类实现WritableComparable接口
package com.imooc.compareto; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; //步骤1 实现WritableComparable接口 public class FlowBean implements WritableComparable<FlowBean> { private long upFlow;// 上行流量 private long downFlow;// 下行流量 private long sumFlow;// 总流量 // 步骤2:无参构造 public FlowBean() { } // 有参构造 public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } // 步骤3:序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } // 步骤4:反序列化方法 // 反序列化方法读顺序必须和写序列化方法的写顺序必须一致 @Override public void readFields(DataInput in) throws IOException { // 必须与序列化中顺序一致 this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } 步骤5:比较 @Override public int compareTo(FlowBean bean) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; } else if (sumFlow < bean.getSumFlow()) { result = 1; } else { result = 0; } return result; } // 步骤6:为后续方便,重新toString方法 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } }
Mapper
package com.imooc.compareto; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowBeanMapper extends Mapper<LongWritable, Text, FlowBean, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割字段 String[] words = line.split("\t"); // 3 封装对象 Text v = new Text(); v.set(words[0]);//取出手机号码 // 取出上行流量和下行流量 long upFlow = Long.parseLong(words[1]); long downFlow = Long.parseLong(words[2]); long sumFlow=Long.parseLong(words[3]); FlowBean k = new FlowBean(upFlow, downFlow); // 4 写出 context.write(k, v); } }
Reducer
1. package com.imooc.compareto; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowBeanReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text text : values) { context.write(text, key); } } }
Driver
package com.imooc.compareto; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowBeanDriver { public static void main(String[] args) throws Exception { // 输入路径(处理E:\temp\input下的***文件) String inputPath = "E:\\temp\\input"; // 输出路径(output文件夹不能存在,否则报错) String outputPath = "E:\\temp\\output"; Configuration conf = new Configuration(); // 1 获取Job对象 Job job = Job.getInstance(conf); // 2 设置jar存储位置(当前类.class) job.setJarByClass(FlowBeanDriver.class); // 3 关联Map和Reduce类 job.setMapperClass(FlowBeanMapper.class); job.setReducerClass(FlowBeanReducer.class); // 4 设置Mapper阶段输出数据的key和value类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5 设置最终数据输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 7 提交job // job.submit(); job.waitForCompletion(true); System.out.println("-------OVER-----------"); } }
WritableComparable排序(分区排序)(分区排序)(分区排序)
需求
要求每个省份(136、137、148、139、其他)手机号输出的文件中按照总流量内部排序。
需求分析:基于前一个需求WritableComparableq排序(全排序),增加自定义分区类,分区按照省份手机号设置。
自定义分区
package com.imooc.compareto_partition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; //泛型为map的输出类型,即reduce的输入类型 public class ProvincePartitioner extends Partitioner<FlowBean, Text> { @Override public int getPartition(FlowBean key, Text value, int numPartitions) { // 1 获取电话号码的前三位 String preNum = value.toString().substring(0, 3); int partition = 4; // 2 判断是哪个省 if ("136".equals(preNum)) { partition = 0; } else if ("137".equals(preNum)) { partition = 1; } else if ("138".equals(preNum)) { partition = 2; } else if ("139".equals(preNum)) { partition = 3; } return partition; } }
Driver启动类
核心代码
// 指定自定义数据分区(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置) job.setPartitionerClass(ProvincePartitioner.class); // 同时指定相应数量的reducetask(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置) job.setNumReduceTasks(5);
package com.imooc.compareto_partition; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowBeanDriver { public static void main(String[] args) throws Exception { // 输入路径(处理E:\temp\input下的***文件) String inputPath = "E:\\temp\\input"; // 输出路径(output文件夹不能存在,否则报错) String outputPath = "E:\\temp\\output"; Configuration conf = new Configuration(); // 1 获取Job对象 Job job = Job.getInstance(conf); // 2 设置jar存储位置(当前类.class) job.setJarByClass(FlowBeanDriver.class); // 3 关联Map和Reduce类 job.setMapperClass(FlowBeanMapper.class); job.setReducerClass(FlowBeanReducer.class); // 4 设置Mapper阶段输出数据的key和value类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5 设置最终数据输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 指定自定义数据分区(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置) job.setPartitionerClass(ProvincePartitioner.class); // 同时指定相应数量的reduce // task(自定义partition重要配置)(自定义partition重要配置)(自定义partition重要配置) job.setNumReduceTasks(5); // 6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 7 提交job // job.submit(); job.waitForCompletion(true); System.out.println("-------OVER-----------"); } }
Combiner合并
简介
1.Combiner是MR程序中Mapper和Reducer之外的一种组件
2.Combiner组件的父类就是Reducer
3.Combiner和Reducer的区别在于运行的位置(Combiner是在每一个MapTask所在的节点运行)(Reducer是在接收全局所有Mapper的输出结果)
4.Combiner的意义就是对每一个MapTask的输出进行局部徽章,以减少网络传输量。
5.Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出KV应该和Reducer的输入KV类型对应起来
GroupingComparator分组(辅助排序)
在reduce端对key进行分组,应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序
分组排序步骤:
- (1)自定义类继承WritableComparator
- (2)重写compare()方法
@Override public int compare(WritableComparable a, WritableComparable b) { // 比较的业务逻辑 return result; }
- (3)创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator() { super(OrderBean.class, true); }
- (4)在驱动里设置关联
// 8 设置reduce端的分组 job.setGroupingComparatorClass(OrderGroupingComparator.class);
GroupingComparator分组实操:
需求:统计一个订单里价格最高的数据
我认为,GroupingComparator所解决的问题是:当一个entity里有多个属性(id,price,age'等等)需要排序时候,需要将entity作为作为mapper的输出K,这样才能排序,但是因为(id,price,age)只要有一个不同,他们就不同,所以每一个k(entity)都是不同的,而我又想将id相同的但是实际不同的K伪装成相同的放在一个reduce中
比如:
(id,price) A(1,11) B(1,33) C(2,33) 三个数都作为map的输出K是不相同(假设我实体类的compare方法中先按id排序,在按price排序)的,所以要进入3个reduce中,如下图
但是我的需求是id相同的进入一个reduce中,所以出现了GroupingComparator
在通俗一点:在reduce端对key进行分组,应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序
输入数据:
订单号 商品号 价格
0000001 Pdt_01 222.8 0000002 Pdt_05 722.4 0000001 Pdt_02 33.8 0000003 Pdt_06 232.8 0000003 Pdt_02 33.8 0000002 Pdt_03 522.8 0000002 Pdt_04 122.4
期望输出结果:
1. 1 222.8 2. 2 722.4 3. 3 232.8
需求分析:
- 1.利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
- 2.在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,
封装的实体类(实现WritableComparable接口)
package com.imooc.GroupingComparator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private int order_id; private double price; @Override public void readFields(DataInput in) throws IOException { this.order_id = in.readInt(); this.price = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); } // 二次排序 @Override public int compareTo(OrderBean o) { int result; //先按id升序,在按价格降序 if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { // 价格倒序排序 result = price > o.getPrice() ? -1 : 1; } return result; } public OrderBean() { } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public String toString() { return order_id + "\t" + price; } }
Mapper
package com.imooc.GroupingComparator; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封装对象 OrderBean k = new OrderBean(); k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 写出 context.write(k, NullWritable.get()); } }
Reducer
package com.imooc.GroupingComparator; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
继承GroupingComparator的类
package com.imooc.GroupingComparator; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } }
Driver驱动类
package com.imooc.GroupingComparator; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderDriver { public static void main(String[] args) throws Exception { // 输入路径(处理E:\temp\input下的***文件) String inputPath = "E:\\temp\\input"; // 输出路径(output文件夹不能存在,否则报错) String outputPath = "E:\\temp\\output"; Configuration conf = new Configuration(); // 1 获取Job对象 Job job = Job.getInstance(conf); // 2 设置jar存储位置(当前类.class) job.setJarByClass(OrderDriver.class); // 3 关联Map和Reduce类 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4 设置Mapper阶段输出数据的key和value类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终数据输出的key和value类型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 8 设置reduce端的分组 job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7 提交job // job.submit(); job.waitForCompletion(true); System.out.println("-------OVER-----------"); } }
OutputFormat数据输出
OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有 实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
1.文本输出TextOutputFormat
默认的输出格式是TextOutputFormat ,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
2. SequenceFileOutputF ormat
将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很客易被压缩。
3.自定义OutputFormat
根据用户需求,自定义实现输出。
自定义OutputFormat步骤
自定义一个类继承FileOutputFormat
改写recordWriter,具体改写输出数据的方法write()
job.setOutputFormatClass(FilterOutputFormat.class);
自定义OutputFormat案例实操
需求:
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
输入数据:
http://www.baidu.com http://www.google.com http://cn.bing.com http://www.atguigu.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com
期望输出数据:
Mapper
单纯的切割和写
package com.imooc.myoutputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Text k=new Text(line); context.write(k, NullWritable.get()); } }
Reducer
单纯的写,但是要改一下格式,方便在输出文件里看,否则连在一起在一样
package com.imooc.myoutputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key Text k=new Text(); k.set(line); for (NullWritable nullWritable : values) { context.write(k, NullWritable.get()); } } }
FRecordWriter
继承RecordWriter
初始化数据流并且完整业务逻辑
而且在这个类中还可以写入MySQL,Redis等数据库中
package com.imooc.myoutputformat; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class FRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream fosatguigu; FSDataOutputStream fosother; // 初始化方法 public FRecordWriter(TaskAttemptContext job){ try { // 1、获取文件系统 FileSystem fs = FileSystem.get(job.getConfiguration()); fosatguigu = fs.create(new Path("e:/atguigu.log")); // 2、创建两个文件输出流 fosother = fs.create(new Path("e:/other.log")); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } // 业务逻辑类 @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { try { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { fosatguigu.write(key.toString().getBytes()); } else { fosother.write(key.toString().getBytes()); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } // close方法 @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭流 IOUtils.closeStream(fosatguigu); IOUtils.closeStream(fosother); } }
FilterOutputFormat
因为上面创建了FRecordWriter类,所在下面的类中直接返回,基本不用改
package com.imooc.myoutputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new FRecordWriter(job); } }
Driver启动类
核心代码
// 设置(自定义outputFormat重点配置)(自定义outputFormat重点配置)(自定义outputFormat重点配置) job.setOutputFormatClass(FilterOutputFormat.class); // 6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(inputPath)); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承fileoutputforma而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(outputPath));
package com.imooc.myoutputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterDriver { public static void main(String[] args) throws Exception { // 输入路径(处理E:\temp\input下的***文件) String inputPath = "E:\\temp\\input"; // 输出路径(output文件夹不能存在,否则报错) String outputPath = "E:\\temp\\output"; Configuration conf = new Configuration(); // 设置切割符(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置)(KeyValueInputFormat重点配置) conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); // 1 获取Job对象 Job job = Job.getInstance(conf); // 2 设置jar存储位置 job.setJarByClass(FilterDriver.class); // 3 关联Map和Reduce类 job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); // 4 设置Mapper阶段输出数据的key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 5 设置最终(reducer)数据输出的key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置(自定义outputFormat重点配置)(自定义outputFormat重点配置)(自定义outputFormat重点配置) job.setOutputFormatClass(FilterOutputFormat.class); // 6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(inputPath)); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(outputPath)); // 7 提交job // job.submit(); job.waitForCompletion(true); System.out.println("-------OVER-----------"); } }
Reduce Join
注意:
一般业务逻辑都在map阶段处理,所以不推荐reduce join ,推荐map join
工作原理
Map端的主要工作:
为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key ,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:
在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志分开,最后进行合并就ok了。
Reduce Join案例实操
需求、输入数据、输出数据
(select id,pname,amount form t_order,t_product where t_order.pid=t_product.pid )
订单数据表t_order + 商品信息表t_product ====== 最终数据形式
设计思路
1.创建一个(订单id,商品pid,数量amount,商品名称pname,标志位flag)实体类(包含两个表的所有属性)
2.在map阶段中根据切片的信息获取操作文件名称,并且封装对象,如果操作的名称为order.txt,flag="order”,否则flag="product"
3.map的输出 key 为共同的属性 pid ,value为封装的实体类
4.同一个reduce里有两种数据,一种是pid为某某某的订单记录(多条),另一种是pid为某某某的商品记录(一条),而且有fiag标记位,很容易分开,然后就可以拼接成最终的数据形式了
编码
实体类
package com.imooc.reducejoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * @author 柴火 * */ public class TableBean implements Writable { private String order_id; // 订单id private String p_id; // 产品id private int amount; // 产品数量 private String pname; // 产品名称 private String flag; // 表的标记 public TableBean() { super(); } @Override public void readFields(DataInput in) throws IOException { this.order_id = in.readUTF(); this.p_id = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public String toString() { return order_id + "\t" + amount + "\t" + pname; } public String getOrder_id() { return order_id; } public void setOrder_id(String order_id) { this.order_id = order_id; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } }
mapper
package com.imooc.reducejoin; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> { String name; TableBean bean = new TableBean(); Text k = new Text(); @Override protected void setup(Context context){ // 1 获取输入文件切片 FileSplit split = (FileSplit) context.getInputSplit(); // 2 获取输入文件名称 name = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取输入数据 String line = value.toString(); // 2 不同文件分别处理 if (name.startsWith("order")) {// 订单表处理 // 2.1 切割 String[] fields = line.split("\t"); // 2.2 封装bean对象 bean.setOrder_id(fields[0]); bean.setP_id(fields[1]); bean.setAmount(Integer.parseInt(fields[2])); bean.setPname(""); bean.setFlag("order"); k.set(fields[1]); } else {// 产品表处理 // 2.3 切割 String[] fields = line.split("\t"); // 2.4 封装bean对象 bean.setP_id(fields[0]); bean.setPname(fields[1]); bean.setFlag("pd"); bean.setAmount(0); bean.setOrder_id(""); k.set(fields[0]); } // 3 写出 context.write(k, bean); } }
Reducer
package com.imooc.reducejoin; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { // 1准备存储订单的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 2 准备bean对象 TableBean pdBean = new TableBean(); for (TableBean bean : values) { if ("order".equals(bean.getFlag())) {// 订单表 // 拷贝传递过来的每条订单数据到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); } else {// 产品表 try { // 拷贝传递过来的产品表到内存中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } // 3 表的拼接 for (TableBean bean : orderBeans) { bean.setPname(pdBean.getPname()); System.out.println(bean); // 4 数据写出去 context.write(bean, NullWritable.get()); } } }
Driver驱动类
package com.imooc.reducejoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TableDriver { public static void main(String[] args) throws Exception { // 0 根据自己电脑路径重新配置 args = new String[] { "e:/temp/input", "e:/temp/output" }; // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路径 job.setJarByClass(TableDriver.class); // 3 指定本业务job要使用的Mapper/Reducer业务类 job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); // 4 指定Mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); // 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); // 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 job.waitForCompletion(true); System.out.println("---------OVER------------"); } }
Map Join
使用场景
Map Join适用于一张表十分小、一张表很大的场景。
优点
思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
具体办法:采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。
// 缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file://e:/cache/pd.txt"));
MapJoin实操
需求、输入数据、输出数据同上面的ReduceJoin
编码步骤
- 创建Mapper类
- 重新setup方法,读小表
@Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取缓存的文件 URI[] cacheFiles = context.getCacheFiles(); String path = cacheFiles[0].getPath().toString(); BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8")); String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { // 2 切割 String[] fields = line.split("\t"); // 3 缓存数据到集合 pdMap.put(fields[0], fields[1]); } // 4 关流 reader.close(); }
- 重新map方法,进行join
- 编写Driver驱动类
// 5、设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 6 加载缓存数据 job.addCacheFile(new URI("file:///e:/temp/product.txt")); // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0);
编码
mapper
package com.imooc.mapjoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Map<String, String> pdMap = new HashMap<>(); @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { // 1 获取缓存的文件 URI[] cacheFiles = context.getCacheFiles(); String path = cacheFiles[0].getPath().toString(); BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8")); String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { // 2 切割 String[] fields = line.split("\t"); // 3 缓存数据到集合 pdMap.put(fields[0], fields[1]); } // 4 关流 reader.close(); } Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 获取产品id String pId = fields[1]; // 4 获取商品名称 String pdName = pdMap.get(pId); // 5 拼接 k.set(line + "\t" + pdName); // 6 写出 context.write(k, NullWritable.get()); } }
Driver
package com.imooc.mapjoin; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DistributedCacheDriver { public static void main(String[] args) throws Exception { // 0 根据自己电脑路径重新配置 args = new String[] { "e:/temp/input", "e:/temp/output" }; // 1 获取job信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置加载jar包路径 job.setJarByClass(DistributedCacheDriver.class); // 3 关联map job.setMapperClass(DistributedCacheMapper.class); // 4、设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 5、设置最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 6 加载缓存数据 job.addCacheFile(new URI("file:///e:/temp/product.txt")); // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
倒排索引案例(多job串联)
敲黑板
多job串联不是配置在一起的,而是单独运行的,比如2个job串联就有2个驱动类,而不是一个驱动类
输入数据
期望输出数据
atguigu c.txt-->2 b.txt-->2 a.txt-->3
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2
操作步骤
job1
首先将a.txt
1. atguigu pingping 2. atguigu ss 3. atguigu ss
转换为
1. atguigu--a.txt 3 2. ss--a.txt 2 3. .....
切分,然后在setup中获得切片中文件的名称(a.txt),在map方法中将其拼接在atguigu后作为K,传入到reduce中,value为1
在reduce阶段进行累加
job2
job1的输出结果,即为job2的输出结果
atguigu--a.txt 3 atguigu--b.txt 2 atguigu--c.txt 2 pingping--a.txt 1 pingping--b.txt 3 pingping--c.txt 1 ss--a.txt 2 ss--b.txt 1 ss--c.txt 1
在map阶段将 (atguigu--a.txt 3)切分为 key=atguigu value=a.txt 3 ,传给reduce
在reduce阶段将key相同的values(a.txt 3 b.txt 3 c.txt 2 )进行拼接,从而得出预期结果
编码
如果上面看懂的话在这编码已经不是问题了,重要的是思路,思路,思路
项目代码下载
github
Demoo/MapReduce at master · cbeann/Demoo · GitHub
码云
Demoo: SSM,SpringBoot or other demo - Gitee.com
百度网盘
链接:https://pan.baidu.com/s/1zmdNlZen7ZhoMeL-pKbsuA
提取码:eejl
复制这段内容后打开百度网盘手机App,操作更方便哦