ReduceJoin需求
类似于MySQL的join操作,我们希望将两张表合并为一张表,即将order.txt的pid替换为pd.txt中pid对应的pname。
输入
order.txt
id pid amounts 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
pd.txt
pid pname 01 小米 02 华为 03 格力
输出
id pname amounts 1004 小米 4 1001 小米 1 1005 华为 5 1002 华为 2 1006 格力 6 1003 格力 3
代码
TableBean
我们待会在Mapper输出之后,希望是以一个TableBean的对象的形式输出,方便在Reducer中获取它的属性。它需要重写序列化和反序列化方法,因为我们实际开发中可能会遇到跨节点的传送。
package com.lyh.mapreduce.reduceJoin; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class TableBean implements Writable { private String id;//订单id private String pid;//产品id private int amount;//产品数量 private String pname;//产品名称 private String flag;//判断是order表还是pd表的标志字段 public TableBean() { } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } //序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeUTF(id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } //反序列化方法 @Override public void readFields(DataInput in) throws IOException { this.id = in.readUTF(); this.pid = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); } @Override public String toString() { return id + "\t" + pname + "\t" + amount; } }
TableMapper
我们需要在setup预处理阶段获取到此刻的文件名称,来区分此时的文件是order.txt还是pd.txt,从而输出不同的结果。
经过Mapper处理后,输出:
pid id amounts pname flag 01 1001 1 "" order 01 1004 4 "" order 01 "" 0 小米 pd 02 1002 2 "" order 02 1005 5 "" order 02 "" 0 华为 pd 03 1003 3 "" order 03 1006 6 "" order 03 "" 0 格力 pd
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class TableMapper extends Mapper<LongWritable, Text, Text,TableBean> { private String fileName; private Text OUT_KEY = new Text(); private TableBean OUT_VALUE = new TableBean(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取对应的文件名称 FileSplit split = (FileSplit) context.getInputSplit(); fileName = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行文本 String line = value.toString(); if (fileName.contains("order")) {//订单表 String[] split = line.split("\t"); //输入 // id pid amount // 输出 // pid id amount "" order OUT_KEY.set(split[1]); OUT_VALUE.setId(split[0]); OUT_VALUE.setPid(split[1]); OUT_VALUE.setAmount(Integer.parseInt(split[2])); OUT_VALUE.setPname(""); OUT_VALUE.setFlag("order"); }else {//商品表 String[] split = line.split("\t"); //输入 //pid name //输出 //pid "" 0 name pd OUT_KEY.set(split[0]); OUT_VALUE.setId(""); OUT_VALUE.setPid(split[0]); OUT_VALUE.setAmount(0); OUT_VALUE.setPname(split[1]); OUT_VALUE.setFlag("pd"); } context.write(OUT_KEY,OUT_VALUE); } }
TableReducer
当我们遍历reduce方法的参数values往list集合中添加对象时,hadoop默认会将我们的对象地址添加进去,而不是我们的实际对象,所以需要一个中间变量(TableBean类型),通过它来向list添加对象实现,并通过BeanUtils.copyProperties(tmp,value)。
import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { //传进来相同key的values // pid id amount name flag // 01 1001 1 "" order // 01 1004 4 "" order // 01 "" 0 小米 pd //创建集合 List<TableBean> list = new ArrayList<>(); TableBean pdBean = new TableBean(); for (TableBean tableBean : values) { if (tableBean.getFlag().contains("order")){//订单表 //hadoop默认为了优化会将对象类型的数据当做地址来存,这就使得我们的集合存的是地址而不是对象,需要我们自己来定义一个临时对象来存 TableBean tmp = new TableBean(); try { BeanUtils.copyProperties(tmp,tableBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } list.add(tmp); }else {//商品表 try { BeanUtils.copyProperties(pdBean,tableBean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //循环遍历 list // pid id amount name flag // 01 1001 1 "" order // 01 1004 4 "" order for (TableBean tableBean : list) { tableBean.setPname(pdBean.getPname()); context.write(tableBean,NullWritable.get()); } } }
TableDriver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:\\MapReduce_Data_Test\\reducejoin\\input")); FileOutputFormat.setOutputPath(job, new Path("D:\\MapReduce_Data_Test\\reducejoin\\output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
MapJoin需求
MapJoin适用于一个小表和一个大表进行Join操作的场景,其中小表可以被完全放入内存中。通常,如果使用普通的Join方式,大表需要在每个任务中进行扫描和读取,会产生大量的磁盘I/O和网络传输,导致查询性能较差。而使用MapJoin,则可以将小表放入内存中,并且只需要扫描一次大表,将其与小表进行Join操作,从而大幅提高查询性能。
我们这里的pd.txt正是我们的小表,order.txt是大表,实际开发中它可能是上百万条的数据,所以我们可以将pd.txt放进内存。
我们之前的reduceJoin中,开启了一个reduceTask,显然面对大数据量效率一定很低,这里我们不需要开启ReduceTask,因为我们mapTask本来就是每128MB开启一个MapTask,而我们的结果再Mapper处理之后就已经是我们需要的结果了。
MapJoinMapper
这里需要读取Job设置的缓存目录(存放我们的小表pd.txt)。
import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { //map(pid,pname) private HashMap<String, String> pdMap = new HashMap<>(); private Text outK = new Text(); @Override protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //获取缓存文件并封装 URI[] cacheFiles = context.getCacheFiles(); FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0])); //从流中读取数据 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); String line; while (StringUtils.isNoneEmpty(line = reader.readLine())) { //切割 String[] fields = line.split("\t"); //赋值 pdMap.put(fields[0], fields[1]); } //关流 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //处理order.txt String line = value.toString(); String[] fields = line.split("\t"); //获取id String id = fields[0]; //从产品缓存文件中获取pname String pname = pdMap.get(fields[1]); //数量 String amount = fields[2]; //封装 outK.set(id + "\t" + pname + "\t" + amount); context.write(outK, NullWritable.get()); } }
MapJoinDriver
这里需要注意的就是输入目录只有order.txt,reduceTask的数量应该为0、需要设置缓存目录存放我们的小表(pd.txt)。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * 需要注意的是pd.txt需要放到缓存目录下,当MapReduce程序启动时会自动将它放到内存 * input目录只能有 order.txt */ public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException, IOException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinDriver.class); job.setMapperClass(MapJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 加载缓存数据到本地磁盘 job.addCacheFile(new URI("file:///D:/MapReduce_Data_Test/mapjoin/cache/pd.txt")); //加载缓存到HDFS // job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt")); // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0 job.setNumReduceTasks(0); //使用reducejoin的数据 FileInputFormat.setInputPaths(job, new Path("D:\\MapReduce_Data_Test\\mapjoin\\input")); FileOutputFormat.setOutputPath(job, new Path("D:\\MapReduce_Data_Test\\mapjoin\\output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
运行结果
1004 小米 4 1001 小米 1 1005 华为 5 1002 华为 2 1006 格力 6 1003 格力 3