Map join(Distributedcache分布式缓存)
使用场景
一张表十分小、一张表很大。
解决方案
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。
具体办法: 采用distributedcache
1)在mapper的setup阶段,将文件读取到缓存集合中。
2)在驱动函数中加载缓存。
job.addCacheFile(new URI(“file:/e:/mapjoincache/pd.txt”));// 缓存普通文件到task运行节点
数据
order.txt
201801 01 1 201802 02 2 201803 03 3 201804 01 4 201805 02 5 201806 03 6
pd.txt
01 小米 02 华为 03 格力
实例:
目标文件:
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; /* 01 小米 02 华为 03 格力 */ /** * 201801 01 1 * 201802 02 2 * 201803 03 3 * 201804 01 4 * 201805 02 5 * 201806 03 6 */ public class MapJoin extends Mapper<LongWritable, Text, Text, NullWritable> { HashMap hashMap = new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取缓存的文件(产品表) BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\input\\pd.txt"), "UTF-8")); //获取缓存的文件(产品表) //一行一行读取数据 String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { //切分 01 小米 String[] split = line.split("\t"); //数据存储进集合 hashMap.put(split[0], split[1]); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取大表(定单表 order.txt) String line = value.toString(); //切分 201901 01 1 String[] fileds = line.split("\t"); String pid = fileds[1]; //进行量表按照pid关联 if (hashMap.containsKey(pid)) { context.write(new Text(fileds[0] + "\t" + hashMap.get(pid) + "\t" + fileds[2]), NullWritable.get()); } } }
public class Drive { /** * 主类 * @param object 主类 * @param mymap map类 * @param mymapkey map输入key * @param mymapvalue map输出value * @param args1 FileInputFormat输入路径 * @param args2 FileOutputFormat输出路径 * @param num reduce个数 * @param args3 加载缓存的路径 * * */ public static void run(Class<?> object,Class<? extends Mapper> mymap,Class<?> mymapkey,Class<?> mymapvalue,int num,String args1,String args2,String args3) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(object); // 3 关联map和reduce job.setMapperClass(mymap); // 4 设置最终输出类型 job.setMapOutputKeyClass(mymapkey); job.setMapOutputValueClass(mymapvalue); //缓存小表的数据 job.addCacheArchive(new URI(args3)); // 设置reducetask个数为0 job.setNumReduceTasks(num); //判断输出路径是否存在 Path path = new Path(args2); FileSystem fs = FileSystem.get(conf); if(fs.exists(path)) { fs.delete(path, true); } // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args1)); FileOutputFormat.setOutputPath(job, new Path(args2)); // 6 提交 job.waitForCompletion(true); } }
import com.hfl.driver.Drive; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; import java.net.URISyntaxException; public class MapJoinMain { public static void main(String[] args) throws ClassNotFoundException, URISyntaxException, InterruptedException, IOException { args = new String[]{"F:\\input\\order.txt", "F:\\input\\mapper", "file:///F:/input/pd.txt"}; Drive.run(MapJoinMain.class, MapJoin.class, Text.class, NullWritable.class, 0, args[0], args[1], args[2]); } }
运行结果:
reduce join
1)原理:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了。
2)该方法的缺点
这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
TableBean
import lombok.Getter; import lombok.Setter; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * order.txt * 201801 01 1 201802 02 2 201803 03 3 201804 01 4 201805 02 5 201806 03 6 */ /** * pd.txt 01 小米 02 华为 03 格力 */ @Setter @Getter public class TableBean implements Writable { //订单id private String order_id; //产品id private String p_id; //产品数量 private int amonut; //产品名称 private String pname; //标记 private String flag; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(order_id); dataOutput.writeUTF(p_id); dataOutput.writeInt(amonut); dataOutput.writeUTF(pname); dataOutput.writeUTF(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.order_id = dataInput.readUTF(); this.p_id = dataInput.readUTF(); this.amonut = dataInput.readInt(); this.pname = dataInput.readUTF(); this.flag = dataInput.readUTF(); } @Override public String toString() { return this.order_id+"\t"+this.pname+"\t"+this.amonut; } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class ReduceJoinMap extends Mapper<LongWritable,Text,Text,TableBean> { TableBean tableBean = new TableBean(); Text t = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取文件的路径 FileSplit fileSplit = (FileSplit) context.getInputSplit(); //每个文件的名字 String name = fileSplit.getPath().getName(); //获取数据 String line = value.toString(); //判断,根据文件的名字不同添加标记 if (name.equals("order.txt")){ String[] fields = line.split("\t"); //封装 tableBean.setOrder_id(fields[0]); tableBean.setP_id(fields[1]); tableBean.setAmonut(Integer.parseInt(fields[2])); tableBean.setFlag("0"); tableBean.setPname(""); t.set(fields[1]); }else{ String[] fields = line.split("\t"); //封装 tableBean.setP_id(fields[0]); tableBean.setPname(fields[1]); tableBean.setFlag("1"); tableBean.setOrder_id(""); tableBean.setAmonut(0); t.set(fields[0]); } context.write(t,tableBean); } }
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import static org.apache.commons.beanutils.BeanUtils.copyProperties; public class ReduceJoinReduce extends Reducer<Text,TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { //创建集合,存储订单表的对象 ArrayList<TableBean> orderbeans = new ArrayList<TableBean>(); //存储产品表对象 TableBean pdbean = new TableBean(); for (TableBean bean:values) { //判断是否是订单表 if("0".equals(bean.getFlag())){ //定义一个存储order.txt的对象 TableBean orderbean = new TableBean(); try { copyProperties(orderbean,bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderbeans.add(orderbean); }else { //拷贝传递过来的产品表到内存 try { copyProperties(pdbean,bean); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } System.out.println("a"); } System.out.println("==========分隔符========"); for (TableBean bean2:orderbeans) { //将产品表里面名字传到定点表里面 bean2.setPname(pdbean.getPname()); //数据输出 context.write(bean2,NullWritable.get()); } } }
import com.hfl.driver.Drive; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; import java.net.URISyntaxException; public class ReduceJoin { public static void main(String[] args) throws ClassNotFoundException, URISyntaxException, InterruptedException, IOException { args = new String[]{"F:\\input\\reduce", "F:\\input\\reduce\\red"}; Drive.run(ReduceJoin.class, ReduceJoinMap.class, Text.class, TableBean.class, ReduceJoinReduce.class, TableBean.class, NullWritable.class, args[0], args[1]); } }
运行结果: