对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口


对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口,同时对MapReduce处理好的数据利用Hive实现数据的基本统计。

设计要求:

  1. 根据数据特征,设计一个任务场景,利用MapReduce编程实现数据的清洗和预处理。(10分)
  2. 利用HDFS的JavaAPI编写程序将原始数据和预处理后的数据上传到分布式文件系统

数据集:

链接:https://pan.baidu.com/s/1rnUJn5ld45HpLhzbwYIM1A

提取码:7bsd

package com.company.HDFS;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class step0 {
  final static String INPUT_PATH="hdfs://192.168.88.100/data";
  final static String OUTPUT_PATH="hdfs://192.168.88.100/output";
  public static void main(String[] args) throws Exception {
    // TODO Auto-generated method stub
    Configuration configuration = new Configuration();
    FileSystem fileSystem =FileSystem.get(new URI(INPUT_PATH),configuration);
    if (fileSystem.exists(new Path(OUTPUT_PATH))) {
      fileSystem.delete(new Path(OUTPUT_PATH),true);
    }
    Job job = new Job(configuration,"step0");
    FileInputFormat.setInputPaths(job, INPUT_PATH);
    FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
    job.setJarByClass(step0.class);
    job.setMapperClass(ReMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setReducerClass(ReReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    job.waitForCompletion(true);
  }
    public static class ReReducer extends Reducer<Text,NullWritable, Text,NullWritable> {
        private IntWritable result = new IntWritable();
        public ReReducer() {
        }
        protected void reduce(Text key2, Iterable<NullWritable> value2, Reducer<Text,NullWritable, Text,NullWritable>.Context context) throws IOException, InterruptedException {
            context.write(key2,NullWritable.get());
        }
    }
    public static class ReMapper extends Mapper<LongWritable, Text, Text,NullWritable> {
        private static final int FAIL_DATA=9999;
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
          context.write(value,NullWritable.get());
        }
    }
}
• 1
• 2
• 3
• 4
• 5
• 6
• 7
• 8
• 9
• 10
• 11
• 12
• 13
• 14
• 15
• 16
• 17
• 18
• 19
• 20
• 21
• 22
• 23
• 24
• 25
• 26
• 27
• 28
• 29
• 30
• 31
• 32
• 33
• 34
• 35
• 36
• 37
• 38
• 39
• 40
• 41
• 42
• 43
• 44
• 45
• 46
• 47
• 48
• 49
• 50
• 51
• 52
• 53
• 54
• 55
• 56
• 57
• 58
• 59
• 60
• 61
• 62
• 63
• 64
• 65
• 66
• 67
package com.company.HDFS;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class step1 {
    /**
     * 查看   所有文件
     */
    @Test
    public void demo_03() {
        try {
            //1 获取文件系统
            Configuration configuration = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root");
            // 2 获取文件详情
            RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
            while (listFiles.hasNext()) {
                LocatedFileStatus status = listFiles.next();
                // 输出详情
                // 文件名称
                System.out.println(status.getPath().getName());
                // 长度
                System.out.println(status.getLen());
                // 权限
                System.out.println(status.getPermission());
                // 分组
                System.out.println(status.getGroup());
                // 获取存储的块信息
                BlockLocation[] blockLocations = status.getBlockLocations();
                for (BlockLocation blockLocation : blockLocations) {
                    // 获取块存储的主机节点
                    String[] hosts = blockLocation.getHosts();
                    for (String host : hosts) {
                        System.out.println(host);
                    }
                }
                System.out.println("-----------分割线----------");
            }
            // 3 关闭资源
            fs.close();
        } catch (Exception ex) {
        }
    }
    /**
     * 上传
     */
    @Test
    public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {
        // 1 获取文件系统
        Configuration configuration = new Configuration();
        configuration.set("dfs.replication", "2");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.88.100:8020"), configuration, "root");
        // 2 上传文件
        fs.copyFromLocalFile(new Path("J:\\the_efforts_paid_offf\\HDFS_HBase_HiveApi\\src\\main\\java\\com\\company\\datas\\iris.data"), new Path("hdfs://192.168.88.100/input"));
        // 3 关闭资源
        fs.close();
        System.out.println("over");
    }
}
• 1
• 2
• 3
• 4
• 5
• 6
• 7
• 8
• 9
• 10
• 11
• 12
• 13
• 14
• 15
• 16
• 17
• 18
• 19
• 20
• 21
• 22
• 23
• 24
• 25
• 26
• 27
• 28
• 29
• 30
• 31
• 32
• 33
• 34
• 35
• 36
• 37
• 38
• 39
• 40
• 41
• 42
• 43
• 44
• 45
• 46
• 47
• 48
• 49
• 50
• 51
• 52
• 53
• 54
• 55
• 56
• 57
• 58
• 59
• 60
• 61
• 62
• 63
• 64
• 65
• 66
• 67
• 68
• 69
• 70
• 71
• 72
• 73
• 74
• 75
• 76
• 77
• 78
• 79
• 80
• 81
• 82
• 83
• 84
• 85
package com.company.HDFS;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import java.io.IOException;
/**
 * @author ChinaManor
 * #Description hbase的javaAPI
 * #Date: 2021/12/19 18:10
 */
public class step2 {
    /**
     * @Description: createTable():创建表的方法
     * @Param: 0
     * @return: 0
     */
    @Test
    public void createTable() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node1:2181");
        //建立连接
        Connection conn = ConnectionFactory.createConnection(conf);
        //获取表的管理类
        Admin admin = conn.getAdmin();
        //定义表
        HTableDescriptor hTableDescriptor=new HTableDescriptor(TableName.valueOf("demo"));
        //定义列簇
        HColumnDescriptor hColumnDescriptor =new HColumnDescriptor("info");
        //讲列簇定义到表中
        hTableDescriptor.addFamily(hColumnDescriptor);
        //执行建表操作
        admin.createTable(hTableDescriptor);
        admin.close();
        conn.close();
    }
    /**
     * @Description: 向Hbase中插入数据的方法
     * @Param: null
     * @return: null
     */
    @Test
    public void put(){
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","node1:2181");
        try {
            //建立连接
            Connection conn= ConnectionFactory.createConnection(conf);
            //获取表
            Table table=conn.getTable(TableName.valueOf("demo"));
            //用行键实例化put
            Put put= new Put("rk001".getBytes());
            //指定列簇名,列名,和值
            put.addColumn("info".getBytes(),"name".getBytes(),"zhangsan".getBytes());
            table.put(put);
            table.close();
            conn.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * @Description: scan()查询一个表的所有信息
     * @Param: 1
     * @return: 1
     */
    @Test
    public  void scan() throws IOException {
        Configuration conf=HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node1:2181");
        //建立连接
        Connection conn=ConnectionFactory.createConnection(conf);
        //获取表
        Table table=conn.getTable(TableName.valueOf("demo"));
        //初始化Scan实例
        Scan scan=new Scan();
        //增加过滤条件
        scan.addColumn("info".getBytes(), "name".getBytes());
        //返回结果
        ResultScanner rss=table.getScanner(scan);
        //迭代并取出结果
        for(Result rs:rss){
            String valStr=Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
            System.out.println(valStr);
        }
        //关闭连接
        table.close();
        conn.close();
    }
    /**
     * @Description: delete()删除表中的信息
     * @Param: 1
     * @return: 1
     */
    @Test
    public  void delete() throws IOException {
        Configuration conf=HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node1:2181");
        //建立连接
        Connection conn=ConnectionFactory.createConnection(conf);
        //获取表
        Table table=conn.getTable(TableName.valueOf("demo"));
        // 用行键来实例化Delete实例
        Delete del = new Delete("rk0001".getBytes());
        // 执行删除
        table.delete(del);
        //关闭连接
        table.close();
        conn.close();
    }
}
• 1
• 2
• 3
• 4
• 5
• 6
• 7
• 8
• 9
• 10
• 11
• 12
• 13
• 14
• 15
• 16
• 17
• 18
• 19
• 20
• 21
• 22
• 23
• 24
• 25
• 26
• 27
• 28
• 29
• 30
• 31
• 32
• 33
• 34
• 35
• 36
• 37
• 38
• 39
• 40
• 41
• 42
• 43
• 44
• 45
• 46
• 47
• 48
• 49
• 50
• 51
• 52
• 53
• 54
• 55
• 56
• 57
• 58
• 59
• 60
• 61
• 62
• 63
• 64
• 65
• 66
• 67
• 68
• 69
• 70
• 71
• 72
• 73
• 74
• 75
• 76
• 77
• 78
• 79
• 80
• 81
• 82
• 83
• 84
• 85
• 86
• 87
• 88
• 89
• 90
• 91
• 92
• 93
• 94
• 95
• 96
• 97
• 98
• 99
• 100
• 101
• 102
• 103
• 104
• 105
• 106
• 107
• 108
• 109
• 110
• 111
• 112
• 113
• 114
• 115
• 116
• 117
• 118
• 119
• 120
• 121
• 122
• 123
• 124
• 125
• 126
• 127
• 128
• 129
• 130
• 131
• 132
• 133
• 134
• 135
• 136
• 137
• 138
• 139
• 140
• 141
• 142
• 143
• 144


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
6天前
|
SQL Java 数据库连接
数据库常用接口
ODBC(Open Database Connectivity):开放数据库互连技术为访问不同的SQL数据库提供了一个共同的接口。ODBC使用SQL作为访问数据的标准。这一接口提供了最大限度的互操作性,一个应用程序可以通过共同的一组代码访问不同的SQL数据库管理系统(DBMS)。 一个基于ODBC的应用程序对数据库的操作不依赖任何DBMS,不直接与DBMS打交道,所有的数据库操作由对应的DBMS的ODBC驱动程序完成。也就是说,不论是Access,MySQL还是Oracle数据库,均可用ODBC API进行访问。由此可见,ODBC的最大优点是能以统一的方式处理所有的数据库。
|
25天前
|
SQL JavaScript 关系型数据库
node博客小项目:接口开发、连接mysql数据库
【10月更文挑战第14天】node博客小项目:接口开发、连接mysql数据库
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
32 3
|
1月前
|
分布式计算 Hadoop Shell
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
57 3
|
2月前
|
存储 数据库 Python
python的对象数据库ZODB的使用(python3经典编程案例)
该文章介绍了如何使用Python的对象数据库ZODB来进行数据存储,包括ZODB的基本操作如创建数据库、存储和检索对象等,并提供了示例代码。
39 0
|
2月前
|
JSON NoSQL 数据库
和SQLite数据库对应的NoSQL数据库:TinyDB的详细使用(python3经典编程案例)
该文章详细介绍了TinyDB这一轻量级NoSQL数据库的使用方法,包括如何在Python3环境中安装、创建数据库、插入数据、查询、更新以及删除记录等操作,并提供了多个编程案例。
111 0
|
3月前
|
存储 SQL 数据库
|
3月前
|
SQL 数据库 索引
SQL 编程最佳实践简直太牛啦!带你编写高效又可维护的 SQL 代码,轻松应对数据库挑战!
【8月更文挑战第31天】在SQL编程中,高效与可维护的代码至关重要,不仅能提升数据库性能,还降低维护成本。本文通过案例分析探讨SQL最佳实践:避免全表扫描,利用索引加速查询;合理使用JOIN,避免性能问题;避免使用`SELECT *`,减少不必要的数据传输;使用`COMMIT`和`ROLLBACK`确保事务一致性;添加注释提高代码可读性。遵循这些实践,不仅提升性能,还便于后期维护和扩展。应根据具体情况选择合适方法并持续优化SQL代码。
53 0
|
3月前
|
存储 缓存 关系型数据库
Django后端架构开发:缓存机制,接口缓存、文件缓存、数据库缓存与Memcached缓存
Django后端架构开发:缓存机制,接口缓存、文件缓存、数据库缓存与Memcached缓存
66 0
|
3月前
|
SQL 关系型数据库 MySQL
"Python与MySQL的浪漫邂逅:一键掌握增删改查,开启你的数据库编程之旅!"
【8月更文挑战第21天】Python因其简洁的语法和强大的库支持,成为连接数据库的首选工具。本文介绍如何使用Python连接MySQL数据库并执行基本操作。首先需安装`mysql-connector-python`库。通过配置连接信息建立数据库连接后,可利用`cursor.execute()`执行SQL语句进行数据的增删改查,并通过`commit()`提交更改。查询时使用`fetchall()`或`fetchone()`获取结果。记得处理异常及关闭连接以释放资源。掌握这些基础,有助于高效进行数据库编程。
62 0

热门文章

最新文章