Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作

简介: Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作

章节内容

上一节我们完成了:


MapReduce的介绍

Hadoop序列化介绍

Mapper编写规范

Reducer编写规范

Driver编写规范

WordCount功能开发

WordCount本地测试

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。


注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!

请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!


但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:


2C4G 编号 h121

2C4G 编号 h122

2C2G 编号 h123

业务需求

平常我们在业务上,有很多时候表都是分开的,通过一些 id 或者 code 来进行关联。

在大数据的情况下,也有很多这种情况,我们需要进行联表操作。

表1

项目编码projectCode 项目名projectName
• 1

表2

项目编码projectCode 项目类型projectType 项目分类projectFrom

SQL 中,可以通过 LEFT JOIN 来实现字段补齐。大数据下,也需要进行这样的操作,我们需要借助 MapReduce

表1测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"  "社区项目1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"  "社区项目2"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"  "智慧社区"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"  "公交站点"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"  "街道布建"
"2e556d83-bb56-45b1-8d6e-00510902c464"  "街道公交站点"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"  "未命名项目1"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"  "未命名项目2"

表2测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"  "重要类型"  "种类1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"  "重要类型"  "种类1"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"  "重要类型"  "种类1"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"  "普通类型"  "种类1"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"  "普通类型"  "种类2"
"2e556d83-bb56-45b1-8d6e-00510902c464"  "普通类型"  "种类2"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"  "一般类型"  "种类2"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"  "一般类型"  "种类2"

SQL连表

假设我们使用SQL的方式联表:

SELECT
  *
FROM
  t_project
LEFT JOIN
  t_project_info
ON
  t_project.projectCode=t_project_info.projectCode

Reduce JOIN

有时候,表可能过大,无法支持我们使用 SQL 进行连表查询。

这里我们编写一个程序来完成操作。

ProjectBean

这里是最终的Bean类,里边是两个表把字段补齐的结果,一会儿我们将使用这个类进行表的连接。

package icu.wzk.demo03;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ProjectBean implements Writable {

    private String projectCode;

    private String projectName;

    private String projectType;

    private String projectFrom;

    private String flag;

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(projectCode);
        dataOutput.writeUTF(projectName);
        dataOutput.writeUTF(projectType);
        dataOutput.writeUTF(projectFrom);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.projectCode = dataInput.readUTF();
        this.projectName = dataInput.readUTF();
        this.projectType = dataInput.readUTF();
        this.projectFrom = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    public ProjectBean(String projectCode, String projectName, String projectType, String projectFrom, String flag) {
        this.projectCode = projectCode;
        this.projectName = projectName;
        this.projectType = projectType;
        this.projectFrom = projectFrom;
        this.flag = flag;
    }

    public ProjectBean() {

    }

    @Override
    public String toString() {
        return "ProjectBean{" +
                "projectCode='" + projectCode + '\'' +
                ", projectName='" + projectName + '\'' +
                ", projectType='" + projectType + '\'' +
                ", projectFrom='" + projectFrom + '\'' +
                ", flag=" + flag + '\'' +
                '}';
    }

    public String getProjectCode() {
        return projectCode;
    }

    public void setProjectCode(String projectCode) {
        this.projectCode = projectCode;
    }

    public String getProjectName() {
        return projectName;
    }

    public void setProjectName(String projectName) {
        this.projectName = projectName;
    }

    public String getProjectType() {
        return projectType;
    }

    public void setProjectType(String projectType) {
        this.projectType = projectType;
    }

    public String getProjectFrom() {
        return projectFrom;
    }

    public void setProjectFrom(String projectFrom) {
        this.projectFrom = projectFrom;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }
}

Reduce Driver

package icu.wzk.demo03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReducerJoinDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // String inputPath = args[0];
        // String outputPath = args[1];

        // === 测试环境 ===
        String inputPath = "project_test";
        String outputPath = "project_test_output";
        // === ===

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "ReducerJoinDriver");
        job.setJarByClass(ReducerJoinDriver.class);

        job.setMapperClass(ReducerJoinMapper.class);
        job.setReducerClass(ReducerJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(ProjectBean.class);

        job.setOutputKeyClass(ProjectBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

ReduceMapper

package icu.wzk.demo03;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReducerJoinMapper extends Mapper<LongWritable, Text, Text, ProjectBean> {

    String name;
    ProjectBean projectBean = new ProjectBean();
    Text k = new Text();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {
        // 获取路径信息
        name = context.getInputSplit().toString();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (name.contains("layout_project")) {
            // layout_project
            String[] fields = line.split("\t");
            projectBean.setProjectCode(fields[0]);
            projectBean.setProjectName(fields[1]);
            projectBean.setProjectType("");
            projectBean.setProjectFrom("");
            projectBean.setFlag("layout_project");
            // projectCode 关联
            k.set(fields[0]);
        } else {
            // project_info
            String[] fields = line.split("\t");
            projectBean.setProjectCode(fields[0]);
            projectBean.setProjectName("");
            projectBean.setProjectType(fields[1]);
            projectBean.setProjectFrom(fields[2]);
            projectBean.setFlag("project_info");
            // projectCode 关联
            k.set(fields[0]);
        }
        context.write(k, projectBean);
    }
}

ReduceReducer

package icu.wzk.demo03;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ReducerJoinReducer extends Reducer<Text, ProjectBean, ProjectBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<ProjectBean> values, Reducer<Text, ProjectBean, ProjectBean, NullWritable>.Context context) throws IOException, InterruptedException {
        List<ProjectBean> dataList = new ArrayList<>();
        ProjectBean deviceProjectBean = new ProjectBean();
        for (ProjectBean pb : values) {
            if ("layout_project".equals(pb.getFlag())) {
                // layout_project
                ProjectBean projectProjectBean = new ProjectBean(
                        pb.getProjectCode(),
                        pb.getProjectName(),
                        pb.getProjectType(),
                        pb.getProjectFrom(),
                        pb.getFlag()
                );
                dataList.add(projectProjectBean);
            } else {
                // project_info
                deviceProjectBean = new ProjectBean(
                        pb.getProjectCode(),
                        pb.getProjectName(),
                        pb.getProjectType(),
                        pb.getProjectFrom(),
                        pb.getFlag()
                );
            }
        }

        for (ProjectBean pb : dataList) {
            pb.setProjectType(deviceProjectBean.getProjectType());
            pb.setProjectFrom(deviceProjectBean.getProjectFrom());
            context.write(pb, NullWritable.get());
        }
    }
}

运行结果

ProjectBean{projectCode='"02d9c090-e467-42b6-9c14-52cacd72a4a8"', projectName='"社区项目2"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"244dcaca-0778-4eec-b3a2-403f8fac1dfb"', projectName='"智慧社区"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"2e556d83-bb56-45b1-8d6e-00510902c464"', projectName='"街道公交站点"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"3ba00542-eac9-4399-9c2b-3b06e671f4c9"', projectName='"未命名项目1"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"5a5982d7-7257-422f-822a-a0c2f31c28d1"', projectName='"未命名项目2"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"', projectName='"社区项目1"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"94befb97-d1af-43f2-b5d5-6df9ce5b9393"', projectName='"公交站点"', projectType='"普通类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"', projectName='"街道布建"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}

方案缺点

JOIN 操作是在 reduce 阶段完成的,reduce端处理压力过大map节点的运算负载很低,资源利用不高

目录
打赏
0
3
3
0
103
分享
相关文章
|
2月前
|
Java使用sql查询mongodb
通过MongoDB Atlas Data Lake或Apache Drill,可以在Java中使用SQL语法查询MongoDB数据。这两种方法都需要适当的配置和依赖库的支持。希望本文提供的示例和说明能够帮助开发者实现这一目标。
70 17
【潜意识Java】MyBatis中的动态SQL灵活、高效的数据库查询以及深度总结
本文详细介绍了MyBatis中的动态SQL功能,涵盖其背景、应用场景及实现方式。
186 6
如何在 Java 代码中使用 JSqlParser 解析复杂的 SQL 语句?
大家好,我是 V 哥。JSqlParser 是一个用于解析 SQL 语句的 Java 库,可将 SQL 解析为 Java 对象树,支持多种 SQL 类型(如 `SELECT`、`INSERT` 等)。它适用于 SQL 分析、修改、生成和验证等场景。通过 Maven 或 Gradle 安装后,可以方便地在 Java 代码中使用。
540 11
如何用 Java 校验 SQL 语句的合法性?
本文介绍了五种校验 SQL 语句合法性的方案:1) 使用 JDBC API 的 `execute()` 方法,通过捕获异常判断合法性;2) 使用 JSqlParser 库解析 SQL 语句为 Java 对象;3) 使用正则表达式检查 SQL 语句格式;4) 使用 ANTLR 生成 SQL 解析器;5) 使用 Apache Calcite 解析 SQL。每种方法各有优劣,具体选择取决于需求和个人偏好。需要注意的是,这些方法仅能校验语法合法性,无法保证语义正确性,仍需防范 SQL 注入攻击。
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
81 7
【潜意识Java】深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
深入理解MyBatis的Mapper层,以及让数据访问更高效的详细分析
124 1
|
3月前
|
Java使用sql查询mongodb
通过使用 MongoDB Connector for BI 和 JDBC,开发者可以在 Java 中使用 SQL 语法查询 MongoDB 数据库。这种方法对于熟悉 SQL 的团队非常有帮助,能够快速实现对 MongoDB 数据的操作。同时,也需要注意到这种方法的性能和功能限制,根据具体应用场景进行选择和优化。
164 9
|
4月前
|
使用java在未知表字段情况下通过sql查询信息
使用java在未知表字段情况下通过sql查询信息
62 8
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL
167 3
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
94 2