原理阐述
适用于关联表中有小表的情形;
可以将小表分发到所有的map节点
,这样,map节点就可以在本地对自己所读到的大表数据进行join
并输出最终结果,可以大大提高join操作的并发度,加快处理速度。
实现示例
1.在mapper类中预先定义好小表,进行join
2.引入实际场景中的解决方案:一次加载数据库或者用distributedcache
。
public class TestDistributedCache { static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{ FileReader in = null; BufferedReader reader = null; HashMap<String,String> b_tab = new HashMap<String, String>(); String localpath =null; String uirpath = null; //是在map任务初始化的时候调用一次 @Override protected void setup(Context context) throws IOException, InterruptedException { //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用 Path[] files = context.getLocalCacheFiles(); localpath = files[0].toString(); URI[] cacheFiles = context.getCacheFiles(); //缓存文件的用法——直接用本地IO来读取 //这里读的数据是map task所在机器本地工作目录中的一个小文件 in = new FileReader("b.txt"); reader =new BufferedReader(in); String line =null; while(null!=(line=reader.readLine())){ String[] fields = line.split(","); b_tab.put(fields[0],fields[1]); } IOUtils.closeStream(reader); IOUtils.closeStream(in); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //这里读的是这个map task所负责的那一个切片数据(在hdfs上) String[] fields = value.toString().split("\t"); String a_itemid = fields[0]; String a_amount = fields[1]; String b_name = b_tab.get(a_itemid); // 输出结果 1001 98.9 banan context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name )); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TestDistributedCache.class); job.setMapperClass(TestDistributedCacheMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //这里是我们正常的需要处理的数据所在路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //不需要reducer job.setNumReduceTasks(0); //分发一个文件到task进程的工作目录 job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt")); //分发一个归档文件到task进程的工作目录 // job.addArchiveToClassPath(archive); //分发jar包到task节点的classpath下 // job.addFileToClassPath(jarfile); job.waitForCompletion(true); } }