HADOOP MapReduce 处理 Spark 抽取的 Hive 数据【解决方案一】

简介: 开端:今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。沙问题?java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。

开端:

今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。

沙问题?

java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。

坑号1: spark之前抽取的数据是.parquet格式的, 对 mapreduce 不太友好,我决定从新抽取, 还是用spark技术,换一种文件格式

坑号2: 使用新方法进行sink的时候我是直接like别的现成表结构折磨干的,后来hive分割字段都TM乱套啦,赞看看!42.png

需求:

1.使用scala+spark技术实现抽取mysql到Hive中

2.使用java+ Mapeduce 技术实现清洗Hive数据

问题产生:

  • Mapeduce 无法 正常读取Hive数据
  • Mapeduce 无法 正常将结果sink到Hive中

解决路线:首先从spark入手

为了解决spark写入hive后文件格式为 .parquet 问题

首先我们需要创建一个表,至于为什么不用自动建表,是因为自动建表 spark使用的是.parquet文件格式存储的

hive>  CREATE TABLE `ods.region2`(
       >   `regionkey` string,
       >    `name` string,
       >    `comment` string)
       >    PARTITIONED BY (
       >   `etldate` string
       >    )
       >   row format delimited
       >   fields terminated by '|' ;
    OK
Time taken: 0.055 seconds

spark sink hive 部分代码

    spark.sql("select *,'20220616' as etldate from data ")
      .write
      .partitionBy("etldate")
      .mode(saveMode = SaveMode.Overwrite)
      .format("hive")
      .option("delimiter","|")
      .insertInto("ods.region2")

重点是这两条

.format("hive")

.insertInto("ods.region2")

我们看一下写好的数据

hdfs dfs -cat /user/hive/warehouse/ods.db/region2/etldate=20220616/*

3|EUROPE|ly final courts cajole furiously final excuse
4|MIDDLE EAST|uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl
0|AFRICA|lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to 
1|AMERICA|hs use ironic, even requests. s
2|ASIA|ges. thinly even pinto beans ca

可以正常编写和运行java mapReduce 代码啦

代码不再一一贴出,放一个driver把

  <groupId>org.li</groupId>
    <artifactId>mapreduce_06-21</artifactId>
    <version>1.0</version>
   <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <parquet.version>1.8.1</parquet.version>
        <!-- JDateTime 依赖 -->
        <jodd.version>3.3.8</jodd.version>
    </properties>    
<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.6</version>
        </dependency>
        <!-- parquet-hadoop -->
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <!-- jodd -->
        <dependency>
            <groupId>org.jodd</groupId>
            <artifactId>jodd</artifactId>
            <version>${jodd.version}</version>
        </dependency>
    </dependencies>
package com.li.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.hadoop.ParquetInputFormat;
//import org.apache.parquet.hadoop.ParquetInputFormat;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HiveDriver{
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        System.setProperty("HADOOP_USER_NAME","root");
        System.out.println("删除本地目录" + new File("/home/rjxy/output").delete());
        Configuration configuration = new Configuration();
        configuration.set("dfs.client.use.datanode.hostname","true");
        //hadoop配值文件
        //获取i工作势力
        Job instance = Job.getInstance(configuration);
        //关联driver
        instance.setJarByClass(HiveDriver.class);
        //关联mapper reduce
        instance.setMapperClass(HiveMapper.class);
        instance.setReducerClass(HiveReduce.class);
        //设置map输出的kv类型
        instance.setMapOutputKeyClass(LongWritable.class);
        instance.setMapOutputValueClass(Text.class);
        //设置最终的输入输出类型
        instance.setOutputKeyClass(NullWritable.class);
        instance.setOutputValueClass(Text.class);
        //Parquet
//        instance.setInputFormatClass();
//        instance.setInputFormatClass(ParquetInputFormat.class);
        //设置输入输出路径
        FileInputFormat.setInputPaths(instance,new Path("hdfs://master:9000/user/hive/warehouse/ods.db/" + "region2" + "/*/*"));
        Path outputDir = new Path("hdfs://master:9000/test4");
//        Path outputDir = new Path("/home/rjxy/output");
        FileOutputFormat.setOutputPath(instance, outputDir);
        //7 提交job
        boolean result = instance.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

这些代码就包含啦我的resource 数据信息 sink 位置

现在看一下怎么将hdfs数据进行load进hive表中

先建好表

hive>  CREATE TABLE `ods.region2`(
       >   `regionkey` string,
       >    `name` string,
       >    `comment` string)
       >    PARTITIONED BY (
       >   `etldate` string
       >    )
       >   row format delimited
       >   fields terminated by '|' ;
    OK
Time taken: 0.055 seconds
LOAD DATA INPATH '/test2' INTO TABLE ods.region2 partition(etldate="20220622");

查看hive清洗后的数据

hive (default)> select * from ods.region2 where etldate="20220622";
OK
region2.regionkey region2.name  region2.comment region2.etldate
3 EUROPE  ly final courts cajole furiously final excuse 20220622
3 EUROPE  ly final courts cajole furiously final excuse 20220622
4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl  20220622
4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl  20220622
0 AFRICA  lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to  20220622
0 AFRICA  lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to  20220622
1 AMERICA hs use ironic, even requests. s 20220622
1 AMERICA hs use ironic, even requests. s 20220622
2 ASIA  ges. thinly even pinto beans ca 20220622
2 ASIA  ges. thinly even pinto beans ca 20220622
Time taken: 0.194 seconds, Fetched: 10 row(s)
目录
相关文章
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
192 6
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
21天前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
115 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
84 1
|
2月前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
72 1
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
67 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
110 0
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
存储 分布式计算 Hadoop
Hadoop基础学习---6、MapReduce框架原理(一)
Hadoop基础学习---6、MapReduce框架原理(一)

相关实验场景

更多