HBase表数据的读、写操作与综合操作

简介: HBase表数据的读、写操作与综合操作


HBase表数据的读、写操作与综合操作

一、实验目标

  1. 熟练掌握通过HBase shell命令来设计HBase表结构实例
  2. 掌握使用HBase编程创建HBase表、删除HBase表、修改HBase表和查看HBase表和表结构。
  3. 掌握通过HBase 编程实现HBase表数据的读、写操作

二、实验要求及注意事项

  1. 给出每个实验的主要实验步骤、实现代码和测试效果截图。
  2. 对本次实验工作进行全面的总结分析。
  3. 建议工程名,类名、包名或表名显示个人学号或者姓名

三、实验内容及步骤

实验任务1:使用MapReduce批量将HBase表中数据导入到HDFS上。表名和表中数据自拟。

主要实现步骤和运行效果图:

完整程序

WjwReadMapper:

package hbase;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.io.*;
public class WjwReadMapper extends TableMapper<Writable, Writable> {
  private Text k=new Text();
  private Text v=new Text();
  public static final String F1="\u0001";
  protected void setup(Context c){
  }
  public void map(ImmutableBytesWritable row,Result r,Context c){
    String value=null;
    String rk=new String(row.get());
    byte[] family=null;
    byte[] column=null;
    long ts=0L;
    try{
      for(KeyValue kv:r.list()){
      value=Bytes.toStringBinary(kv.getValue());
      family=kv.getFamily();
      column=kv.getQualifier();
      ts=kv.getTimestamp();
      k.set(rk);
      v.set(Bytes.toString(family)+F1+Bytes.toString(column)+F1+value+F1+ts);
      c.write(k, v);
      }
    }catch(Exception e){
      e.printStackTrace();
      System.err.println();
    }
  }
}

WjwReadMain:

package hbase;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.*;
public class WjwReadMain {
  public static final Log LOG = LogFactory.getLog(WjwMain.class);
  public static final String NAME = "Member Test1";
  public static final String TEMP_INDEX_PATH = "hdfs://master:9000/tmp/tb_wjw";
  public static String inputTable = "tb_wjw";
  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
    Configuration conf = HBaseConfiguration.create();
    Scan scan = new Scan();
    scan.setBatch(0);
    scan.setCaching(10000);
    scan.setMaxVersions();
    scan.setTimeRange(System.currentTimeMillis() - 3*24*3600*1000L, System.currentTimeMillis());
    scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("keyword"));
    conf.setBoolean("mapred.map.tasks.speculative.execution", false);
    conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
    Path tmpIndexPath = new Path(TEMP_INDEX_PATH);
    FileSystem fs = FileSystem.get(conf);
    if(fs.exists(tmpIndexPath)){
      fs.delete(tmpIndexPath, true);
    }
    Job job = new Job(conf, NAME);
    job.setJarByClass(WjwMain.class);
    TableMapReduceUtil.initTableMapperJob(inputTable, scan, WjwMapper.class, Text.class, Text.class, job);
     job.setNumReduceTasks(0);
     job.setOutputFormatClass(TextOutputFormat.class);
     FileOutputFormat.setOutputPath(job, tmpIndexPath);
     boolean success = job.waitForCompletion(true);
     System.exit(success?0:1);
  }
}

运行结果

创建表,用于等会将数据传入hadoop里

运行map程序将表数据导入hadoop,并查看是否导入成功

实验任务2:使用MapReduce批量将HDFS上的数据导入到HBase表中。表名和数据自拟,建议体现个人学号或姓名。使用Java编程创建表和删除表,表名和列族自拟。

主要实现步骤和运行效果图:

完整程序

WjwWriteMapper:

package hbase;
import java.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.*;
public class WjwWriteMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
  private byte[] family=null;
  private byte[] qualifier=null;
  private byte[] val=null;
  private String rk=null;
  private long ts=System.currentTimeMillis();
  protected void map(LongWritable key,Text value,Context context) throws InterruptedException, IOException{
    try{
      String line=value.toString();
      String[] arr=line.split("\t",-1);
      if(arr.length==2){
         rk=arr[0];
          String[] vals=arr[1].split("\u0001",-1);
          if(vals.length==4){
              family=vals[0].getBytes();
            qualifier=vals[1].getBytes();
            val=vals[2].getBytes();
            ts=Long.parseLong(vals[3]);
            Put put=new Put(rk.getBytes(),ts);
            put.add(family,qualifier,val);
              context.write(new ImmutableBytesWritable(rk.getBytes()), put);
          }
      }
    }catch(Exception e){
        e.printStackTrace();
    }
  }
}

WjwWriteMain:

package hbase;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.mapreduce.*;
import java.io.IOException;
import org.apache.commons.logging.*;
public class WjwWriteMain extends Configured implements Tool{
  static final Log LOG=LogFactory.getLog(WjwWriteMain.class);
  public int run(String[] args)throws Exception{
    if(args.length!=2){
        LOG.info("2 parameters needed!");
    }
    String input="hdfs://master:9000/tmp/tb_wjw/part-m-00000";
    String table="tb_wjw01";
    Configuration conf=HBaseConfiguration.create();
    Job job=new Job(conf,"Input from file "+input+" into table "+table);
    job.setJarByClass(WjwWriteMain.class);
    job.setMapperClass(WjwWriteMapper.class);
    job.setJarByClass(WjwWriteMain.class);
    job.setMapperClass(WjwWriteMapper.class);
    job.setOutputFormatClass(TableOutputFormat.class);
    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Waitable.class);
    job.setNumReduceTasks(0);
    FileInputFormat.addInputPath(job, new Path(input));
    return job.waitForCompletion(true)?0:1;
  }
  public static void main(String[] args) throws IOException {
      Configuration conf=new Configuration();
     String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
    try {
        System.out.println(ToolRunner.run(conf, new WjwWriteMain(),otherArgs));
    }catch(Exception e) {
        e.printStackTrace();
    }
  }
}

运行结果

创建一个空表tb_wjw01,用于等会将tb_wjw的数据导入tb_wjw01

配置yarn,并运行map程序

查看hadoop里的表tb_wjw

将hadoop里tb_wjw的数据导入hbase里的tb_wjw01里面

实验任务3:在实验任务1和实验任务2的基础上,通过HBase编程,实现创建HBase表,修改HBase表(包括增加列族和删除列族),向HBase表中写入数据,读取HBase表中数据,查看HBase数据库中所有表和表结构功能,建议在一个类中定义多个方法实现上述功能,并进行验证。表名和数据自拟。

主要实现步骤和运行效果图:

完整程序

package hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class WjwHbase{
    private static Configuration conf = HBaseConfiguration.create();
    public static void createTable(String tableName, String[] families)
            throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            System.out.println("Table already exists!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            for (String family : families) {
                tableDesc.addFamily(new HColumnDescriptor(family));
            }
            admin.createTable(tableDesc);
            System.out.println("Table created successfully!");
        }
        admin.close();
        conn.close();
    }
    public static void addRecord(String tableName, String rowKey,
                                  String family, String qualifier, String value) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
        table.put(put);
        System.out.println("Record added successfully!");
        table.close();
        conn.close();
    }
    public static void deleteRecord(String tableName, String rowKey,
                                     String family, String qualifier) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        table.delete(delete);
        System.out.println("Record deleted successfully!");
        table.close();
        conn.close();
    }
    public static void deleteTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
            System.out.println("Table deleted successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }
    public static void addColumnFamily(String tableName, String columnFamily) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            HColumnDescriptor columnDesc = new HColumnDescriptor(columnFamily);
            admin.addColumn(TableName.valueOf(tableName), columnDesc);
            System.out.println("Column family added successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }
    public static void deleteColumnFamily(String tableName, String columnFamily) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));
            System.out.println("Column family deleted successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }
    public static void getRecord(String tableName, String rowKey,
                                  String family, String qualifier) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        System.out.println("Result: " + Bytes.toString(value));
        table.close();
        conn.close();
    }
    public static void scanTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            System.out.println("Result: " + result);
        }
        table.close();
        conn.close();
    }
    public static void listTables() throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        HTableDescriptor[] tableDescs = admin.listTables();
        List<String> tableNames = new ArrayList<String>();
        for (HTableDescriptor tableDesc : tableDescs) {
            tableNames.add(tableDesc.getNameAsString());
        }
        System.out.println("Tables: " + tableNames);
        admin.close();
        conn.close();
    }
    public static void describeTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
        System.out.println("Table structure: " + tableDesc);
        admin.close();
        conn.close();
    }
    public static void main(String[] args) throws IOException {
        String tableName = "wjwtest";
        String rowKey = "row1";
        String family = "cf1";
        String qualifier = "q1";
        String value = "this is wjw!";
        String columnFamily = "cf2";
        String[] families = {family};
        createTable(tableName, families);
        addRecord(tableName, rowKey, family, qualifier, value);
        getRecord(tableName, rowKey, family, qualifier);
        scanTable(tableName);
        addColumnFamily(tableName, columnFamily);
        describeTable(tableName);
        deleteColumnFamily(tableName, columnFamily);
        deleteRecord(tableName, rowKey, family, qualifier);
        deleteTable(tableName);
        listTables();
    }
}

运行结果

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
7月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
163 0
|
Shell 分布式数据库 Apache
Hbase常用shell操作
Hbase常用shell操作
432 1
|
Java Shell 分布式数据库
HBase高级操作
HBase高级操作
302 0
|
7月前
|
分布式计算 Hadoop Shell
熟悉常用的HBase操作
熟悉常用的HBase操作
168 3
熟悉常用的HBase操作
|
7月前
|
分布式计算 Hadoop Shell
|
5月前
|
DataWorks 数据管理 大数据
DataWorks操作报错合集之在连接HBase时出现超时问题,该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
Java 大数据 API
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
156 0
【大数据】HDFS、HBase操作教程(含指令和JAVA API)
|
7月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
143 0
|
7月前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
105 0
|
7月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
64 0