hadoop之Map join和Reduce join (13)

简介: hadoop之Map join和Reduce join (13)

Map join(Distributedcache分布式缓存)


使用场景


一张表十分小、一张表很大。


解决方案


在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。


具体办法: 采用distributedcache


1)在mapper的setup阶段,将文件读取到缓存集合中。

2)在驱动函数中加载缓存。

job.addCacheFile(new URI(“file:/e:/mapjoincache/pd.txt”));// 缓存普通文件到task运行节点


数据


order.txt


201801  01  1
201802  02  2
201803  03  3
201804  01  4
201805  02  5
201806  03  6


pd.txt


01  小米
02  华为
03  格力


实例:


目标文件:


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
/*
01  小米
02  华为
03  格力
*/
/**
 * 201801 01  1
 * 201802 02  2
 * 201803 03  3
 * 201804 01  4
 * 201805 02  5
 * 201806 03  6
 */
public class MapJoin extends Mapper<LongWritable, Text, Text, NullWritable> {
    HashMap hashMap = new HashMap<String, String>();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取缓存的文件(产品表)
        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\input\\pd.txt"), "UTF-8"));
        //获取缓存的文件(产品表)
        //一行一行读取数据
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切分  01  小米
            String[] split = line.split("\t");
            //数据存储进集合
            hashMap.put(split[0], split[1]);
        }
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取大表(定单表 order.txt)
        String line = value.toString();
        //切分 201901 01  1
        String[] fileds = line.split("\t");
        String pid = fileds[1];
        //进行量表按照pid关联
        if (hashMap.containsKey(pid)) {
            context.write(new Text(fileds[0] + "\t" + hashMap.get(pid) + "\t" + fileds[2]), NullWritable.get());
        }
    }
}


public class  Drive {
    /**
     * 主类
     * @param object 主类
     * @param mymap map类
     * @param mymapkey map输入key
     * @param mymapvalue map输出value
     * @param args1 FileInputFormat输入路径
     * @param args2 FileOutputFormat输出路径
     * @param num reduce个数
     * @param args3 加载缓存的路径
     *            * */
    public static void run(Class<?> object,Class<? extends Mapper> mymap,Class<?> mymapkey,Class<?> mymapvalue,int num,String args1,String args2,String args3) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 加载jar包
        job.setJarByClass(object);
        // 3 关联map和reduce
        job.setMapperClass(mymap);
        // 4 设置最终输出类型
        job.setMapOutputKeyClass(mymapkey);
        job.setMapOutputValueClass(mymapvalue);
        //缓存小表的数据
        job.addCacheArchive(new URI(args3));
        // 设置reducetask个数为0
        job.setNumReduceTasks(num);
        //判断输出路径是否存在
        Path path = new Path(args2);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(path)) {
            fs.delete(path, true);
        }
        // 5 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args1));
        FileOutputFormat.setOutputPath(job, new Path(args2));
        // 6 提交
        job.waitForCompletion(true);
    }
}
import com.hfl.driver.Drive;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.net.URISyntaxException;
public class MapJoinMain {
    public static void main(String[] args) throws ClassNotFoundException, URISyntaxException, InterruptedException, IOException {
        args = new String[]{"F:\\input\\order.txt", "F:\\input\\mapper", "file:///F:/input/pd.txt"};
        Drive.run(MapJoinMain.class, MapJoin.class, Text.class, NullWritable.class, 0, args[0], args[1], args[2]);
    }
}


运行结果:


1dc618a0ed9580ce8bfa6facb208c08f.png

5d4c6812c8535adbb050f4ddf2e1bce8.png


reduce join


46a9d80a6e05e4e3b19d57a0ee70bcdf.png


1)原理:


Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了。


2)该方法的缺点


这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

66ba272a0bfc97be54a5fa679e3d5482.png


TableBean


import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * order.txt
 *
 201801 01  1
 201802 02  2
 201803 03  3
 201804 01  4
 201805 02  5
 201806 03  6
 */
/**
 * pd.txt
 01 小米
 02 华为
 03 格力
 */
@Setter
@Getter
public class TableBean implements Writable {
    //订单id
    private String order_id;
    //产品id
    private String p_id;
    //产品数量
    private int amonut;
    //产品名称
    private String pname;
    //标记
    private String flag;
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(order_id);
        dataOutput.writeUTF(p_id);
        dataOutput.writeInt(amonut);
        dataOutput.writeUTF(pname);
        dataOutput.writeUTF(flag);
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.order_id = dataInput.readUTF();
        this.p_id = dataInput.readUTF();
        this.amonut = dataInput.readInt();
        this.pname = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }
    @Override
    public String toString() {
        return this.order_id+"\t"+this.pname+"\t"+this.amonut;
    }
}


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMap extends Mapper<LongWritable,Text,Text,TableBean> {
    TableBean tableBean = new TableBean();
    Text t = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取文件的路径
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        //每个文件的名字
        String name = fileSplit.getPath().getName();
        //获取数据
        String line = value.toString();
        //判断,根据文件的名字不同添加标记
        if (name.equals("order.txt")){
            String[] fields = line.split("\t");
            //封装
            tableBean.setOrder_id(fields[0]);
            tableBean.setP_id(fields[1]);
            tableBean.setAmonut(Integer.parseInt(fields[2]));
            tableBean.setFlag("0");
            tableBean.setPname("");
            t.set(fields[1]);
        }else{
            String[] fields = line.split("\t");
            //封装
            tableBean.setP_id(fields[0]);
            tableBean.setPname(fields[1]);
            tableBean.setFlag("1");
            tableBean.setOrder_id("");
            tableBean.setAmonut(0);
            t.set(fields[0]);
        }
        context.write(t,tableBean);
    }
}


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import static org.apache.commons.beanutils.BeanUtils.copyProperties;
public class ReduceJoinReduce extends Reducer<Text,TableBean, TableBean, NullWritable>  {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        //创建集合,存储订单表的对象
        ArrayList<TableBean> orderbeans = new ArrayList<TableBean>();
        //存储产品表对象
        TableBean pdbean  = new TableBean();
        for (TableBean bean:values) {
            //判断是否是订单表
            if("0".equals(bean.getFlag())){
                //定义一个存储order.txt的对象
                TableBean orderbean = new TableBean();
                try {
                    copyProperties(orderbean,bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderbeans.add(orderbean);
            }else {
                //拷贝传递过来的产品表到内存
                try {
                    copyProperties(pdbean,bean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("a");
        }
        System.out.println("==========分隔符========");
        for (TableBean bean2:orderbeans) {
            //将产品表里面名字传到定点表里面
            bean2.setPname(pdbean.getPname());
            //数据输出
            context.write(bean2,NullWritable.get());
        }
    }
}


import com.hfl.driver.Drive;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.net.URISyntaxException;
public class ReduceJoin {
    public static void main(String[] args) throws ClassNotFoundException, URISyntaxException, InterruptedException, IOException {
        args = new String[]{"F:\\input\\reduce", "F:\\input\\reduce\\red"};
        Drive.run(ReduceJoin.class, ReduceJoinMap.class, Text.class, TableBean.class, ReduceJoinReduce.class,
                TableBean.class, NullWritable.class, args[0], args[1]);
    }
}

运行结果:

46a9d80a6e05e4e3b19d57a0ee70bcdf.png66ba272a0bfc97be54a5fa679e3d5482.png

相关文章
|
8月前
|
JavaScript 前端开发
解释 JavaScript 中的`map()`、`filter()`和`reduce()`方法的用途。
解释 JavaScript 中的`map()`、`filter()`和`reduce()`方法的用途。
76 1
|
8月前
|
开发者 Python
Python中的函数式编程:理解map、filter和reduce
【2月更文挑战第13天】 本文深入探讨了Python中函数式编程的三个主要工具:map、filter和reduce。我们将详细解释这些函数的工作原理,并通过实例来展示它们如何使代码更简洁、更易读。我们还将讨论一些常见的误解和陷阱,以及如何避免它们。无论你是Python新手还是有经验的开发者,本文都将帮助你更好地理解和使用这些强大的函数。
|
8月前
|
分布式计算 JavaScript 前端开发
JS中数组22种常用API总结,slice、splice、map、reduce、shift、filter、indexOf......
JS中数组22种常用API总结,slice、splice、map、reduce、shift、filter、indexOf......
|
4月前
|
索引
ES5常见的数组方法:forEach ,map ,filter ,some ,every ,reduce (除了forEach,其他都有回调,都有return)
ES5常见的数组方法:forEach ,map ,filter ,some ,every ,reduce (除了forEach,其他都有回调,都有return)
|
7月前
|
Python
高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作
【6月更文挑战第20天】高阶函数如`map`, `filter`, `reduce`和`functools.partial`在Python中用于函数操作。装饰器如`@timer`接收或返回函数,用于扩展功能,如记录执行时间。`timer`装饰器通过包裹函数并计算执行间隙展示时间消耗,如`my_function(2)`执行耗时2秒。
41 3
|
3月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
61 3
|
4月前
|
JavaScript 前端开发
js map和reduce
js map和reduce
|
6月前
|
人工智能 算法 大数据
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
这篇内容介绍了编程中避免使用 for 循环的一些方法,特别是针对 Python 语言。它强调了 for 循环在处理大数据或复杂逻辑时可能导致的性能、可读性和复杂度问题。
64 6
算法金 | 推导式、生成器、向量化、map、filter、reduce、itertools,再见 for 循环
|
5月前
|
分布式计算 Python
【python笔记】高阶函数map、filter、reduce
【python笔记】高阶函数map、filter、reduce
|
6月前
|
JavaScript API
js【最佳实践】遍历数组的八种方法(含数组遍历 API 的对比)for,forEach,for of,map,filter,reduce,every,some
js【最佳实践】遍历数组的八种方法(含数组遍历 API 的对比)for,forEach,for of,map,filter,reduce,every,some
125 1

相关实验场景

更多