Hadoop学习:MapReduce实现两张表合并

简介: Hadoop学习:MapReduce实现两张表合并

一、✌题目要求

record表:

ID 城市编号 空气指数
001 03 245
002 02 655
003 05 743
004 04 246
005 02 956
006 01 637
007 05 831
008 03 683
009 02 349

city表:

城市编号 城市名称
01 长沙
02 株洲
03 湘潭
04 怀化
05 岳阳

目标表:

ID 城市名称 空气指数
001 湘潭 245
002 株洲 655
003 岳阳 743
004 怀化 246
005 株洲 956
006 长沙 637
007 岳阳 831
008 湘潭 683
009 株洲 349

二、✌实现思想

我们将重复字段作为Map的Key,其他属性封装在Bean中作为Value

经过Map后,文件的格式为:

城市编号 ID 城市名称 空气指数 文件类型
1 006 637 record
1 长沙 city
2 002 655 record
2 005 956 record
2 009 349 record
2 株洲 city
3 001 245 record
3 008 683 record
3 湘潭 city
4 004 246 record
4 怀化 city
5 003 743 record
5 007 831 record
5 岳阳 city

1.将所有属性封装成一个对象,同时实现序列化

2.Map的输入格式应为LongWritable,Text

3.Map的输出格式为Text,Bean

4.Reduce输出格式为Bean,NullWritable

三、✌代码实现

1.✌Bean类

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Bean implements Writable {
  //定义属性
    private String id;
    private String pid;
    private int amount;
    private String pname;
    private String type;
  //定义空参构造,为后面反射使用
    public Bean() {
        super();
    }
  //有参构造
    public Bean(String id, String pid, int amount, String pname, String type) {
        this.id = id;
        this.pid = pid;
        this.amount = amount;
        this.pname = pname;
        this.type = type;
    }
  //重写toString方法
    @Override
    public String toString() {
        return id + "\t" + pname + "\t\t" + amount;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getPid() {
        return pid;
    }
    public void setPid(String pid) {
        this.pid = pid;
    }
    public int getAmount() {
        return amount;
    }
    public void setAmount(int amount) {
        this.amount = amount;
    }
    public String getPname() {
        return pname;
    }
    public void setPname(String pname) {
        this.pname = pname;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
  //重写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(type);
    }
  //反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readUTF();
        pid = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        type = in.readUTF();
    }
}

2.✌Map类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class Map extends Mapper<LongWritable, Text, Text, Bean> {
    String fileName;
  //获得文件的名称,因为在同一目录,方便再map阶段对不同表做不同操作
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit split = (FileSplit) context.getInputSplit();
        fileName = split.getPath().getName();
    }
  //map阶段,将文章内容封装为Bean对象
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (fileName.startsWith("record")) {
            String[] words = line.split("\t");
            context.write(new Text(words[1]), new Bean(words[0], words[1], Integer.parseInt(words[2]), "", "record"));
        } else {
            String[] words = line.split("\t");
            context.write(new Text(words[0]), new Bean("", words[0], 0, words[1], "city"));
        }
    }
}

3.✌Reduce类

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class Reduce extends Reducer<Text, Bean, Bean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {
    //存取多个Bean对象
        ArrayList<Bean> list = new ArrayList<>();
        Bean pd = new Bean();
    //对不同表做不同操作,设置pname
        for (Bean value : values) {
            if ("record".equals(value.getType())) {
                Bean temp = new Bean();
                try {
                    BeanUtils.copyProperties(temp, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                list.add(temp);
            } else {
                try {
                    BeanUtils.copyProperties(pd, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        for (Bean bean : list) {
            bean.setPname(pd.getPname());
            context.write(bean, NullWritable.get());
        }
    }
}

4.✌Driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //设置本地输入输出路径
        args = new String[]{"D:/input", "D:/output"};
        BasicConfigurator.configure();
    //配置信息
        Configuration conf = new Configuration();
    //获取job对象
        Job job = Job.getInstance(conf);
    //关联相关类
        job.setJarByClass(Driver.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
    //设置map输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Bean.class);
    //设置最终输出类型
        job.setOutputKeyClass(Bean.class);
        job.setOutputValueClass(NullWritable.class);
    //设置文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //提交任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}


目录
相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
73 2
|
11天前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
2月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
69 1
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
50 4
|
2月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
100 3
|
2月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
41 2
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
96 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
43 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
56 0
|
6月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
65 1

相关实验场景

更多