一、需求分析
MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有半连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
案例分析:
一个电商网站后台数据存在两个表(可以看为两个文件):
用户表信息:用户ID、用户名、电话
订单表信息:订单ID、用户ID、商品价格、商品名
如果想把两张表关联成:用户ID、用户名、电话、订单ID,价格,商品名,并且按照需求对其输出。
在关系型数据库中,我们可以通过简单的sql语句实现
**customer cid cname telphone **order oid cid price pname cid cname teplone pname price == (select * from customer a,order b where a.cid = b.cid)
那么在mapreduce我们如何实现?根据不同场景,可以选用不同的方案,下面我们详细了解不同的方法。
我们先准备好数据集:
customer -> 数据集 1,jone,13423459976 2,ben,15099871134 3,henry,13599187709 4,tony,13399008876 order -> 数据集 100,1,45.50,product-1 200,1,23,product-2 300,1,50,product-3 400,1,99,product-4 102,2,19.9,product-5 103,2,33,product-6 104,3,44,product-7 105,4,1009,product-8 106,5,22,product-9
MR中,多数据集关联,关键是找到KEY,两个方案来实现join:
二、Map join实现
Map join实现的思路:Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多 份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
应用场景:两个待连接表中,有一个表非常大,而另一个表非常小,比如就电商中的用户订单表和用户表,一个用户可以有多个订单,这样一对多的关系,就会导致用户表数据量较小,订单表数据量较大,我们可以把用户表放入缓存中List,Map,将订单表放入map端,map()输入数据与内存中的数据进行匹配,如果匹配上,就输出他们的关联合并数据。
较少数据集放入内存中List,Map
Map<key,value> == Map<cid,customerInfo>
- map()输入数据与内存中的数据进行匹配,如果匹配上,就输出他们的关联合并数据
read order files -> <cid,orderInfo> -> find in Map
实现代码:
package com.kfk.hadoop.mr.join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */ public class DistributedCache extends Configured implements Tool { static Map<String,String> customerMap = new HashMap<String, String>(); /** * map * TODO */ public static class TemplateMapper extends Mapper<LongWritable, Text,Text, Text>{ private static final Text outputKey = new Text(); private static final Text outputValue = new Text(); /** * 将数据缓存到内存 * @param context * @throws IOException * @throws InterruptedException */ @Override public void setup(Context context) throws IOException, InterruptedException { // 创建configuration Configuration configuration = context.getConfiguration(); // 获取要将数据缓存到内存中数据的路径 URI[] uri = Job.getInstance(configuration).getCacheFiles(); Path path = new Path(uri[0]); FileSystem fileSystem = FileSystem.get(configuration); // 创建输入流 InputStream inputStream = fileSystem.open(path); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); BufferedReader bufferedReader = new BufferedReader(inputStreamReader); // 读取每一行数据 String line = null; while ((line = bufferedReader.readLine()) != null){ // 如果每一行的长度大于0,将数据放入map中 if (line.trim().length() > 0){ // 数据按照","分开,第一个作为map的key,整行作为value customerMap.put(line.split(",")[0],line); } } // 关闭流 bufferedReader.close(); inputStream.close(); inputStreamReader.close(); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将map端输入每一行的value转化为字符串 String linevalue = value.toString(); // 将每一行数据按","分开 String[] lines = linevalue.split(","); /* 解析数据:将每一行的数据放入到一个数组中,取出数组中的第二个元素,customerMap.get()方法返回的value 如果value不为空,map端的outputKey就为数组中的第二个元素,map端的outputValue就为customerMap的value + map输入端的value eg:1 (1,jone,13423459976)+(400,1,99,product-4) */ if (customerMap.get(lines[1]) != null){ // outputKey的数据模型是cid outputKey.set(lines[1]); // outputValue的数据模型是orderInfo+customerInfo outputValue.set(customerMap.get(lines[1])+linevalue); System.out.println(outputKey+"---"+outputValue); context.write(outputKey,outputValue); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * reduce * TODO */ public static class TemplateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * run * @param args * @return * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1) get conf Configuration configuration = this.getConf(); // 2) create job Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input,指定job的输入 Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map,指定job的mapper和输出的类型 job.setMapperClass(TemplateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 1.分区 // job.setPartitionerClass(); // 2.排序 // job.setSortComparatorClass(); // 3.combiner -可选项 // job.setCombinerClass(WordCountCombiner.class); // 4.compress -可配置 // configuration.set("mapreduce.map.output.compress","true"); // 使用的SnappyCodec压缩算法 // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); // 5.分组 // job.setGroupingComparatorClass(); // 6.设置reduce的数量 // job.setNumReduceTasks(2); // 3.3) reduce,指定job的reducer和输出类型 // job.setReducerClass(TemplateReducer.class); // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(IntWritable.class); // 获取要将数据缓存到内存中数据的路径 URI uri = new URI(args[2]); // 设置要缓存到内存中的数据 job.addCacheFile(uri); // 3.4) output,指定job的输出 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit,执行job boolean isSuccess = job.waitForCompletion(true); // 如果正常执行返回0,否则返回1 return (isSuccess) ? 0 : 1; } public static void main(String[] args) { // 添加输入,输入参数 args = new String[]{ "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/order.txt", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/customer.txt" }; // WordCountUpMR wordCountUpMR = new WordCountUpMR(); Configuration configuration = new Configuration(); try { // 判断输出的文件存不存在,如果存在就将它删除 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } // 调用run方法 int status = ToolRunner.run(configuration,new DistributedCache(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
运行结果:
1 1,jone,13423459976400,1,99,product-4 1 1,jone,13423459976300,1,50,product-3 1 1,jone,13423459976200,1,23,product-2 1 1,jone,13423459976100,1,45.50,product-1 2 2,ben,15099871134103,2,33,product-6 2 2,ben,15099871134102,2,19.9,product-5 3 3,henry,13599187709104,3,44,product-7 4 4,tony,13399008876105,4,1009,product-8
三、Reduce join实现
Reduce join实现思路:相同key的value值合并在一起,所有的数据集都作为map的输入。 在map阶段,map函数同时读取两个文件customer.txt和order.txt,为了区分两种来源的key/value数据对,对每条数据打一个标签tag,比如:tag:customer表示来自文件customer.txt,tag:order表示来自文件order.txt。即:map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段,reduce函数获取key相同的来自customer.txt和order.txt文件的value list, 然后对于同一个key,对customer.txt和order.txt中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
map端:
**customer <cid,customerInfo> -> <cid,customerInfo(tag:customer data:cinfo)> **order <cid,orderInfo> -> <cid,orderInfo(tag:order data:oInfo)>
reduce端:
<cid,List(customerInfo + orderInfo)>
计算过程:
自定义数据类型代码:
package com.kfk.hadoop.mr.join; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/17 * @time : 4:50 下午 */ public class DataJoinWritable implements Writable { private String tag; private String data; public DataJoinWritable() { } public DataJoinWritable(String tag, String data) { this.set(tag,data); } public void set(String tag, String data) { this.tag = tag; this.data = data; } public void write(DataOutput out) throws IOException { out.writeUTF(this.getTag()); out.writeUTF(this.getData()); } public void readFields(DataInput in) throws IOException { this.tag = in.readUTF(); this.data = in.readUTF(); } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getData() { return data; } public void setData(String data) { this.data = data; } @Override public String toString() { return "DataJoinWritable{" + "tag='" + tag + '\'' + ", data='" + data + '\'' + '}'; } }
package com.kfk.hadoop.mr.join; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/17 * @time : 5:04 下午 */ public class DataCommon { public final static String CUSTOMER = "customer"; public final static String ORDER = "order"; }
实现代码:
package com.kfk.hadoop.mr.join; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */ public class DataJoinMR extends Configured implements Tool { /** * map * TODO */ public static class TemplateMapper extends Mapper<LongWritable, Text,Text, DataJoinWritable>{ // 创建map端输出的key,value对象 private static final Text outputKey = new Text(); private static final DataJoinWritable outputValue = new DataJoinWritable(); @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将map端输入的value转换成数组 String[] values = value.toString().split(","); // 数据预处理 if ((values.length != 3) && (values.length != 4)){ return; } // customer数据处理 if (values.length == 3){ String cid = values[0]; String name = values[1]; String telephone = values[2]; // 将cid作为key outputKey.set(cid); // DataCommon.CUSTOMER表示为tag,并设置为value outputValue.set(DataCommon.CUSTOMER,name+","+telephone); } // order数据处理 if (values.length == 4){ String cid = values[1]; String price = values[2]; String productName = values[3]; // 将cid作为key outputKey.set(cid); // DataCommon.ORDER表示为tag,并设置为value outputValue.set(DataCommon.ORDER,price+","+productName); } context.write(outputKey,outputValue); // 打印出outputKey,outputValue的数据格式 System.out.println(outputKey+","+outputValue); } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * reduce * TODO */ public static class TemplateReducer extends Reducer<Text,DataJoinWritable, NullWritable,Text>{ // 创建reduce输出value对象,输出的数据类型为Text private static final Text outputValue = new Text(); @Override public void setup(Context context) throws IOException, InterruptedException { // TODO } @Override public void reduce(Text key, Iterable<DataJoinWritable> values, Context context) throws IOException, InterruptedException { // 打印出reduce输入的kv集合,用于本机测试 // List<DataJoinWritable> list = Lists.newArrayList(values); // System.out.println("Reduce in == KeyIn: "+key+" ValueIn: "+list); // reduce端输入的数据格式:<cid,List(customerInfo,orderInfo,orderInfo,orderInfo)> String customerInfo = null; // 定义一个orderList的列表,里面存放orderInfo List<String> orderList = new ArrayList<String>(); // 从reduce输入的列表中取数据 for (DataJoinWritable dataJoinWritable:values){ // 取出customerInfo的数据 if (DataCommon.CUSTOMER.equals(dataJoinWritable.getTag())){ customerInfo = dataJoinWritable.getData(); } // 取出orderInfo的数据 else if (DataCommon.ORDER.equals(dataJoinWritable.getTag())){ orderList.add(dataJoinWritable.getData()); } } // 输出数据 for (String orderInfo : orderList){ // 如果reduce输入的列表中没有customerInfo的数据就筛选掉 if (customerInfo == null) { continue; } // 设置reduce输出的value outputValue.set(key.toString()+","+customerInfo+","+orderInfo); // 输出key类型设置为null context.write(NullWritable.get(),outputValue); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { // TODO } } /** * run * @param args * @return * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1) get conf Configuration configuration = this.getConf(); // 2) create job Job job = Job.getInstance(configuration,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); // 3.1) input,指定job的输入 Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); // 3.2) map,指定job的mapper和输出的类型 job.setMapperClass(TemplateMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataJoinWritable.class); // 1.分区 // job.setPartitionerClass(); // 2.排序 // job.setSortComparatorClass(); // 3.combiner -可选项 // job.setCombinerClass(WordCountCombiner.class); // 4.compress -可配置 // configuration.set("mapreduce.map.output.compress","true"); // 使用的SnappyCodec压缩算法 // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); // 5.分组 // job.setGroupingComparatorClass(); // 6.设置reduce的数量 // job.setNumReduceTasks(2); // 3.3) reduce,指定job的reducer和输出类型 job.setReducerClass(TemplateReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); // 3.4) output,指定job的输出 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job,outpath); // 4) commit,执行job boolean isSuccess = job.waitForCompletion(true); // 如果正常执行返回0,否则返回1 return (isSuccess) ? 0 : 1; } public static void main(String[] args) { // 添加输入,输入参数 args = new String[]{ "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/join/", "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output" }; // WordCountUpMR wordCountUpMR = new WordCountUpMR(); Configuration configuration = new Configuration(); try { // 判断输出的文件存不存在,如果存在就将它删除 Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } // 调用run方法 int status = ToolRunner.run(configuration,new DataJoinMR(),args); // 退出程序 System.exit(status); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
运行结果:
1,jone,13423459976,99,product-4 1,jone,13423459976,50,product-3 1,jone,13423459976,23,product-2 1,jone,13423459976,45.50,product-1 2,ben,15099871134,33,product-6 2,ben,15099871134,19.9,product-5 3,henry,13599187709,44,product-7 4,tony,13399008876,1009,product-8
总结:blog介绍了两种join方式。这两种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,资源消耗也不相同,其中主要导致因素是网络传输。