开端:
今天咱先说问题,经过几天测试题的练习,我们有从某题库中找到了新题型,并且成功把我们干趴下,昨天今天就干了一件事,站起来。
沙问题?
java mapeduce 清洗 hive 中的数据 ,清晰之后将driver代码 进行截图提交。
坑号1:
spark之前抽取的数据是.parquet格式的, 对 mapreduce 不太友好,我决定从新抽取, 还是用spark技术,换一种文件格式
坑号2:
使用新方法进行sink的时候我是直接like别的现成表结构折磨干的,后来hive分割字段都TM乱套啦,赞看看!
需求:
1.使用scala+spark技术实现抽取mysql到Hive中
2.使用java+ Mapeduce 技术实现清洗Hive数据
问题产生:
- Mapeduce 无法 正常读取Hive数据
- Mapeduce 无法 正常将结果sink到Hive中
解决路线:首先从spark入手
为了解决spark写入hive后文件格式为 .parquet 问题
首先我们需要创建一个表,至于为什么不用自动建表,是因为自动建表 spark使用的是.parquet文件格式存储的
hive> CREATE TABLE `ods.region2`( > `regionkey` string, > `name` string, > `comment` string) > PARTITIONED BY ( > `etldate` string > ) > row format delimited > fields terminated by '|' ; OK Time taken: 0.055 seconds
spark sink hive 部分代码
spark.sql("select *,'20220616' as etldate from data ") .write .partitionBy("etldate") .mode(saveMode = SaveMode.Overwrite) .format("hive") .option("delimiter","|") .insertInto("ods.region2")
重点是这两条
.format("hive")
.insertInto("ods.region2")
我们看一下写好的数据
hdfs dfs -cat /user/hive/warehouse/ods.db/region2/etldate=20220616/*
3|EUROPE|ly final courts cajole furiously final excuse 4|MIDDLE EAST|uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl 0|AFRICA|lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to 1|AMERICA|hs use ironic, even requests. s 2|ASIA|ges. thinly even pinto beans ca
可以正常编写和运行java mapReduce 代码啦
代码不再一一贴出,放一个driver把
<groupId>org.li</groupId> <artifactId>mapreduce_06-21</artifactId> <version>1.0</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <parquet.version>1.8.1</parquet.version> <!-- JDateTime 依赖 --> <jodd.version>3.3.8</jodd.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.6</version> </dependency> <!-- parquet-hadoop --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>${parquet.version}</version> </dependency> <!-- jodd --> <dependency> <groupId>org.jodd</groupId> <artifactId>jodd</artifactId> <version>${jodd.version}</version> </dependency> </dependencies>
package com.li.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.parquet.hadoop.ParquetInputFormat; //import org.apache.parquet.hadoop.ParquetInputFormat; import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class HiveDriver{ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { System.setProperty("HADOOP_USER_NAME","root"); System.out.println("删除本地目录" + new File("/home/rjxy/output").delete()); Configuration configuration = new Configuration(); configuration.set("dfs.client.use.datanode.hostname","true"); //hadoop配值文件 //获取i工作势力 Job instance = Job.getInstance(configuration); //关联driver instance.setJarByClass(HiveDriver.class); //关联mapper reduce instance.setMapperClass(HiveMapper.class); instance.setReducerClass(HiveReduce.class); //设置map输出的kv类型 instance.setMapOutputKeyClass(LongWritable.class); instance.setMapOutputValueClass(Text.class); //设置最终的输入输出类型 instance.setOutputKeyClass(NullWritable.class); instance.setOutputValueClass(Text.class); //Parquet // instance.setInputFormatClass(); // instance.setInputFormatClass(ParquetInputFormat.class); //设置输入输出路径 FileInputFormat.setInputPaths(instance,new Path("hdfs://master:9000/user/hive/warehouse/ods.db/" + "region2" + "/*/*")); Path outputDir = new Path("hdfs://master:9000/test4"); // Path outputDir = new Path("/home/rjxy/output"); FileOutputFormat.setOutputPath(instance, outputDir); //7 提交job boolean result = instance.waitForCompletion(true); System.exit(result?0:1); } }
这些代码就包含啦我的resource 数据信息 sink 位置
现在看一下怎么将hdfs数据进行load进hive表中
先建好表
hive> CREATE TABLE `ods.region2`( > `regionkey` string, > `name` string, > `comment` string) > PARTITIONED BY ( > `etldate` string > ) > row format delimited > fields terminated by '|' ; OK Time taken: 0.055 seconds
LOAD DATA INPATH '/test2' INTO TABLE ods.region2 partition(etldate="20220622");
查看hive清洗后的数据
hive (default)> select * from ods.region2 where etldate="20220622"; OK region2.regionkey region2.name region2.comment region2.etldate 3 EUROPE ly final courts cajole furiously final excuse 20220622 3 EUROPE ly final courts cajole furiously final excuse 20220622 4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl 20220622 4 MIDDLE EAST uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl 20220622 0 AFRICA lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to 20220622 0 AFRICA lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to 20220622 1 AMERICA hs use ironic, even requests. s 20220622 1 AMERICA hs use ironic, even requests. s 20220622 2 ASIA ges. thinly even pinto beans ca 20220622 2 ASIA ges. thinly even pinto beans ca 20220622 Time taken: 0.194 seconds, Fetched: 10 row(s)