HBase表数据的读、写操作与综合操作
一、实验目标
- 熟练掌握通过HBase shell命令来设计HBase表结构实例
- 掌握使用HBase编程创建HBase表、删除HBase表、修改HBase表和查看HBase表和表结构。
- 掌握通过HBase 编程实现HBase表数据的读、写操作
二、实验要求及注意事项
- 给出每个实验的主要实验步骤、实现代码和测试效果截图。
- 对本次实验工作进行全面的总结分析。
- 建议工程名,类名、包名或表名显示个人学号或者姓名
三、实验内容及步骤
实验任务1:使用MapReduce批量将HBase表中数据导入到HDFS上。表名和表中数据自拟。
主要实现步骤和运行效果图:
完整程序
WjwReadMapper:
package hbase; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.io.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.io.*; public class WjwReadMapper extends TableMapper<Writable, Writable> { private Text k=new Text(); private Text v=new Text(); public static final String F1="\u0001"; protected void setup(Context c){ } public void map(ImmutableBytesWritable row,Result r,Context c){ String value=null; String rk=new String(row.get()); byte[] family=null; byte[] column=null; long ts=0L; try{ for(KeyValue kv:r.list()){ value=Bytes.toStringBinary(kv.getValue()); family=kv.getFamily(); column=kv.getQualifier(); ts=kv.getTimestamp(); k.set(rk); v.set(Bytes.toString(family)+F1+Bytes.toString(column)+F1+value+F1+ts); c.write(k, v); } }catch(Exception e){ e.printStackTrace(); System.err.println(); } } }
WjwReadMain:
package hbase; import java.io.IOException; import org.apache.hadoop.io.*; import org.apache.commons.logging.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.*; public class WjwReadMain { public static final Log LOG = LogFactory.getLog(WjwMain.class); public static final String NAME = "Member Test1"; public static final String TEMP_INDEX_PATH = "hdfs://master:9000/tmp/tb_wjw"; public static String inputTable = "tb_wjw"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = HBaseConfiguration.create(); Scan scan = new Scan(); scan.setBatch(0); scan.setCaching(10000); scan.setMaxVersions(); scan.setTimeRange(System.currentTimeMillis() - 3*24*3600*1000L, System.currentTimeMillis()); scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("keyword")); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); Path tmpIndexPath = new Path(TEMP_INDEX_PATH); FileSystem fs = FileSystem.get(conf); if(fs.exists(tmpIndexPath)){ fs.delete(tmpIndexPath, true); } Job job = new Job(conf, NAME); job.setJarByClass(WjwMain.class); TableMapReduceUtil.initTableMapperJob(inputTable, scan, WjwMapper.class, Text.class, Text.class, job); job.setNumReduceTasks(0); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, tmpIndexPath); boolean success = job.waitForCompletion(true); System.exit(success?0:1); } }
运行结果
创建表,用于等会将数据传入hadoop里
运行map程序将表数据导入hadoop,并查看是否导入成功
实验任务2:使用MapReduce批量将HDFS上的数据导入到HBase表中。表名和数据自拟,建议体现个人学号或姓名。使用Java编程创建表和删除表,表名和列族自拟。
主要实现步骤和运行效果图:
完整程序
WjwWriteMapper:
package hbase; import java.io.*; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.*; public class WjwWriteMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private byte[] family=null; private byte[] qualifier=null; private byte[] val=null; private String rk=null; private long ts=System.currentTimeMillis(); protected void map(LongWritable key,Text value,Context context) throws InterruptedException, IOException{ try{ String line=value.toString(); String[] arr=line.split("\t",-1); if(arr.length==2){ rk=arr[0]; String[] vals=arr[1].split("\u0001",-1); if(vals.length==4){ family=vals[0].getBytes(); qualifier=vals[1].getBytes(); val=vals[2].getBytes(); ts=Long.parseLong(vals[3]); Put put=new Put(rk.getBytes(),ts); put.add(family,qualifier,val); context.write(new ImmutableBytesWritable(rk.getBytes()), put); } } }catch(Exception e){ e.printStackTrace(); } } }
WjwWriteMain:
package hbase; import org.apache.hadoop.util.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.io.*; import org.apache.hadoop.hbase.mapreduce.*; import java.io.IOException; import org.apache.commons.logging.*; public class WjwWriteMain extends Configured implements Tool{ static final Log LOG=LogFactory.getLog(WjwWriteMain.class); public int run(String[] args)throws Exception{ if(args.length!=2){ LOG.info("2 parameters needed!"); } String input="hdfs://master:9000/tmp/tb_wjw/part-m-00000"; String table="tb_wjw01"; Configuration conf=HBaseConfiguration.create(); Job job=new Job(conf,"Input from file "+input+" into table "+table); job.setJarByClass(WjwWriteMain.class); job.setMapperClass(WjwWriteMapper.class); job.setJarByClass(WjwWriteMain.class); job.setMapperClass(WjwWriteMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Waitable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws IOException { Configuration conf=new Configuration(); String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs(); try { System.out.println(ToolRunner.run(conf, new WjwWriteMain(),otherArgs)); }catch(Exception e) { e.printStackTrace(); } } }
运行结果
创建一个空表tb_wjw01,用于等会将tb_wjw的数据导入tb_wjw01
配置yarn,并运行map程序
查看hadoop里的表tb_wjw
将hadoop里tb_wjw的数据导入hbase里的tb_wjw01里面
实验任务3:在实验任务1和实验任务2的基础上,通过HBase编程,实现创建HBase表,修改HBase表(包括增加列族和删除列族),向HBase表中写入数据,读取HBase表中数据,查看HBase数据库中所有表和表结构功能,建议在一个类中定义多个方法实现上述功能,并进行验证。表名和数据自拟。
主要实现步骤和运行效果图:
完整程序
package hbase; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; public class WjwHbase{ private static Configuration conf = HBaseConfiguration.create(); public static void createTable(String tableName, String[] families) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { System.out.println("Table already exists!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); for (String family : families) { tableDesc.addFamily(new HColumnDescriptor(family)); } admin.createTable(tableDesc); System.out.println("Table created successfully!"); } admin.close(); conn.close(); } public static void addRecord(String tableName, String rowKey, String family, String qualifier, String value) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); table.put(put); System.out.println("Record added successfully!"); table.close(); conn.close(); } public static void deleteRecord(String tableName, String rowKey, String family, String qualifier) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); table.delete(delete); System.out.println("Record deleted successfully!"); table.close(); conn.close(); } public static void deleteTable(String tableName) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); System.out.println("Table deleted successfully!"); } else { System.out.println("Table does not exist!"); } admin.close(); conn.close(); } public static void addColumnFamily(String tableName, String columnFamily) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { HColumnDescriptor columnDesc = new HColumnDescriptor(columnFamily); admin.addColumn(TableName.valueOf(tableName), columnDesc); System.out.println("Column family added successfully!"); } else { System.out.println("Table does not exist!"); } admin.close(); conn.close(); } public static void deleteColumnFamily(String tableName, String columnFamily) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)); System.out.println("Column family deleted successfully!"); } else { System.out.println("Table does not exist!"); } admin.close(); conn.close(); } public static void getRecord(String tableName, String rowKey, String family, String qualifier) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); System.out.println("Result: " + Bytes.toString(value)); table.close(); conn.close(); } public static void scanTable(String tableName) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println("Result: " + result); } table.close(); conn.close(); } public static void listTables() throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); HTableDescriptor[] tableDescs = admin.listTables(); List<String> tableNames = new ArrayList<String>(); for (HTableDescriptor tableDesc : tableDescs) { tableNames.add(tableDesc.getNameAsString()); } System.out.println("Tables: " + tableNames); admin.close(); conn.close(); } public static void describeTable(String tableName) throws IOException { Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName)); System.out.println("Table structure: " + tableDesc); admin.close(); conn.close(); } public static void main(String[] args) throws IOException { String tableName = "wjwtest"; String rowKey = "row1"; String family = "cf1"; String qualifier = "q1"; String value = "this is wjw!"; String columnFamily = "cf2"; String[] families = {family}; createTable(tableName, families); addRecord(tableName, rowKey, family, qualifier, value); getRecord(tableName, rowKey, family, qualifier); scanTable(tableName); addColumnFamily(tableName, columnFamily); describeTable(tableName); deleteColumnFamily(tableName, columnFamily); deleteRecord(tableName, rowKey, family, qualifier); deleteTable(tableName); listTables(); } }
运行结果