对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口,同时对MapReduce处理好的数据利用Hive实现数据的基本统计。
设计要求:
- 根据数据特征,设计一个任务场景,利用MapReduce编程实现数据的清洗和预处理。(10分)
- 利用HDFS的JavaAPI编写程序将原始数据和预处理后的数据上传到分布式文件系统
数据集:
链接:https://pan.baidu.com/s/1rnUJn5ld45HpLhzbwYIM1A
提取码:7bsd
package com.company.HDFS; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class step0 { final static String INPUT_PATH="hdfs://192.168.88.100/data"; final static String OUTPUT_PATH="hdfs://192.168.88.100/output"; public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration configuration = new Configuration(); FileSystem fileSystem =FileSystem.get(new URI(INPUT_PATH),configuration); if (fileSystem.exists(new Path(OUTPUT_PATH))) { fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = new Job(configuration,"step0"); FileInputFormat.setInputPaths(job, INPUT_PATH); FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH)); job.setJarByClass(step0.class); job.setMapperClass(ReMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(ReReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } public static class ReReducer extends Reducer<Text,NullWritable, Text,NullWritable> { private IntWritable result = new IntWritable(); public ReReducer() { } protected void reduce(Text key2, Iterable<NullWritable> value2, Reducer<Text,NullWritable, Text,NullWritable>.Context context) throws IOException, InterruptedException { context.write(key2,NullWritable.get()); } } public static class ReMapper extends Mapper<LongWritable, Text, Text,NullWritable> { private static final int FAIL_DATA=9999; public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } } } • 1 • 2 • 3 • 4 • 5 • 6 • 7 • 8 • 9 • 10 • 11 • 12 • 13 • 14 • 15 • 16 • 17 • 18 • 19 • 20 • 21 • 22 • 23 • 24 • 25 • 26 • 27 • 28 • 29 • 30 • 31 • 32 • 33 • 34 • 35 • 36 • 37 • 38 • 39 • 40 • 41 • 42 • 43 • 44 • 45 • 46 • 47 • 48 • 49 • 50 • 51 • 52 • 53 • 54 • 55 • 56 • 57 • 58 • 59 • 60 • 61 • 62 • 63 • 64 • 65 • 66 • 67
package com.company.HDFS; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class step1 { /** * 查看 所有文件 */ @Test public void demo_03() { try { //1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root"); // 2 获取文件详情 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus status = listFiles.next(); // 输出详情 // 文件名称 System.out.println(status.getPath().getName()); // 长度 System.out.println(status.getLen()); // 权限 System.out.println(status.getPermission()); // 分组 System.out.println(status.getGroup()); // 获取存储的块信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 获取块存储的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("-----------分割线----------"); } // 3 关闭资源 fs.close(); } catch (Exception ex) { } } /** * 上传 */ @Test public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException { // 1 获取文件系统 Configuration configuration = new Configuration(); configuration.set("dfs.replication", "2"); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root"); // 2 上传文件 fs.copyFromLocalFile(new Path("J:\\the_efforts_paid_offf\\HDFS_HBase_HiveApi\\src\\main\\java\\com\\company\\datas\\iris.data"), new Path("hdfs://192.168.88.100/input")); // 3 关闭资源 fs.close(); System.out.println("over"); } } • 1 • 2 • 3 • 4 • 5 • 6 • 7 • 8 • 9 • 10 • 11 • 12 • 13 • 14 • 15 • 16 • 17 • 18 • 19 • 20 • 21 • 22 • 23 • 24 • 25 • 26 • 27 • 28 • 29 • 30 • 31 • 32 • 33 • 34 • 35 • 36 • 37 • 38 • 39 • 40 • 41 • 42 • 43 • 44 • 45 • 46 • 47 • 48 • 49 • 50 • 51 • 52 • 53 • 54 • 55 • 56 • 57 • 58 • 59 • 60 • 61 • 62 • 63 • 64 • 65 • 66 • 67 • 68 • 69 • 70 • 71 • 72 • 73 • 74 • 75 • 76 • 77 • 78 • 79 • 80 • 81 • 82 • 83 • 84 • 85
package com.company.HDFS; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import java.io.IOException; /** * @author ChinaManor * #Description hbase的javaAPI * #Date: 2021/12/19 18:10 */ public class step2 { /** * @Description: createTable():创建表的方法 * @Param: 0 * @return: 0 */ @Test public void createTable() throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181"); //建立连接 Connection conn = ConnectionFactory.createConnection(conf); //获取表的管理类 Admin admin = conn.getAdmin(); //定义表 HTableDescriptor hTableDescriptor=new HTableDescriptor(TableName.valueOf("demo")); //定义列簇 HColumnDescriptor hColumnDescriptor =new HColumnDescriptor("info"); //讲列簇定义到表中 hTableDescriptor.addFamily(hColumnDescriptor); //执行建表操作 admin.createTable(hTableDescriptor); admin.close(); conn.close(); } /** * @Description: 向Hbase中插入数据的方法 * @Param: null * @return: null */ @Test public void put(){ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181"); try { //建立连接 Connection conn= ConnectionFactory.createConnection(conf); //获取表 Table table=conn.getTable(TableName.valueOf("demo")); //用行键实例化put Put put= new Put("rk001".getBytes()); //指定列簇名,列名,和值 put.addColumn("info".getBytes(),"name".getBytes(),"zhangsan".getBytes()); table.put(put); table.close(); conn.close(); } catch (IOException e) { e.printStackTrace(); } } /** * @Description: scan()查询一个表的所有信息 * @Param: 1 * @return: 1 */ @Test public void scan() throws IOException { Configuration conf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181"); //建立连接 Connection conn=ConnectionFactory.createConnection(conf); //获取表 Table table=conn.getTable(TableName.valueOf("demo")); //初始化Scan实例 Scan scan=new Scan(); //增加过滤条件 scan.addColumn("info".getBytes(), "name".getBytes()); //返回结果 ResultScanner rss=table.getScanner(scan); //迭代并取出结果 for(Result rs:rss){ String valStr=Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes())); System.out.println(valStr); } //关闭连接 table.close(); conn.close(); } /** * @Description: delete()删除表中的信息 * @Param: 1 * @return: 1 */ @Test public void delete() throws IOException { Configuration conf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "node1:2181"); //建立连接 Connection conn=ConnectionFactory.createConnection(conf); //获取表 Table table=conn.getTable(TableName.valueOf("demo")); // 用行键来实例化Delete实例 Delete del = new Delete("rk0001".getBytes()); // 执行删除 table.delete(del); //关闭连接 table.close(); conn.close(); } } • 1 • 2 • 3 • 4 • 5 • 6 • 7 • 8 • 9 • 10 • 11 • 12 • 13 • 14 • 15 • 16 • 17 • 18 • 19 • 20 • 21 • 22 • 23 • 24 • 25 • 26 • 27 • 28 • 29 • 30 • 31 • 32 • 33 • 34 • 35 • 36 • 37 • 38 • 39 • 40 • 41 • 42 • 43 • 44 • 45 • 46 • 47 • 48 • 49 • 50 • 51 • 52 • 53 • 54 • 55 • 56 • 57 • 58 • 59 • 60 • 61 • 62 • 63 • 64 • 65 • 66 • 67 • 68 • 69 • 70 • 71 • 72 • 73 • 74 • 75 • 76 • 77 • 78 • 79 • 80 • 81 • 82 • 83 • 84 • 85 • 86 • 87 • 88 • 89 • 90 • 91 • 92 • 93 • 94 • 95 • 96 • 97 • 98 • 99 • 100 • 101 • 102 • 103 • 104 • 105 • 106 • 107 • 108 • 109 • 110 • 111 • 112 • 113 • 114 • 115 • 116 • 117 • 118 • 119 • 120 • 121 • 122 • 123 • 124 • 125 • 126 • 127 • 128 • 129 • 130 • 131 • 132 • 133 • 134 • 135 • 136 • 137 • 138 • 139 • 140 • 141 • 142 • 143 • 144