写作目的:当做自己的HBase笔记本
HBase表结构
Shell命名
进入hbase客户端命名行
./bin/hbase shell
查看当前库中有哪些表
list
创建表
create 表名称,列族名
create "student20190727","info"
插入数据
put 表命 ,rowkey,columnFamly,键,值
put "student20190727","1001","info:name","Tom"
put "student20190727","1001","info:age","18"
put "student20190727","1001","info:sex","male"
查看指定列
get "student20190727","1001"
get "student20190727","1001","info:name"
删除数据
delete "student20190727","1001","info:sex"
deleteall "student20190727","1001"
清空表数据
1. disable "student20190727" 2. truncate "student20190727"
删除表
1. disable "student20190727" 2. drop "student20190727"
API中常用的类介绍
HBaseAdmin(Admin):管理表(创建,删除)
HTableDescriptor:表描述器,用于创建表
HColumnDescriptor:列描述器(构建列族)
Table:用于表中数据的操作
Put:用于封装待存放的数据
Delete:用于封装待删除的数据
Get:用于得到某一个具体的数据
Scan:用于扫描表的配置信息
ResultScanner:通过配置的扫描器,得到一个扫描表的实例扫描器
Result:每一个该类型的实例化对象都对应了一个rowkey中的若干数据。
Cell:用于封装一个rowkey下面所有单元格中放入数据(rowKey,comulnFamily,column,value)
Java操作HBase
pom.xml
<dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.6</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies>
ZookeeperDemo
package com.imooc.demo; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; /** * Zookeeper 测试是否可以正常和zookeeper建立连接 */ public class ZookeeperDemo { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.16:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 2000;// ms /** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */ static final CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() { @Override public void process(WatchedEvent event) { // 获取事件的状态 KeeperState keeperState = event.getState(); EventType eventType = event.getType(); // 如果是建立连接 if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { // 如果建立连接成功,则发送信号量,让后续阻塞程序向下执行 System.out.println("zk 建立连接"); connectedSemaphore.countDown(); } } } }); // 进行阻塞 connectedSemaphore.await(); System.out.println(".."); // 创建父节点 // zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, // CreateMode.PERSISTENT); // 创建子节点 // zk.create("/testRoot/children", "children data".getBytes(), // Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 获取节点洗信息 // byte[] data = zk.getData("/testRoot", false, null); // System.out.println(new String(data)); // System.out.println(zk.getChildren("/testRoot", false)); // 修改节点的值 // zk.setData("/testRoot", "modify data root".getBytes(), -1); // byte[] data = zk.getData("/testRoot", false, null); // System.out.println(new String(data)); // 判断节点是否存在 // System.out.println(zk.exists("/testRoot/children", false)); // 删除节点 // zk.delete("/testRoot/children", -1); // System.out.println(zk.exists("/testRoot/children", false)); zk.close(); } }
HBaseDemo
package com.imooc.demo; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; public class HBaseDemo { public static Configuration conf; static { // 使用 HBaseConfiguration 的单例方法实例化 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1"); conf.set("hbase.zookeeper.property.clientPort", "2181"); } public static void main(String[] args) throws Exception { // createTable("student","info"); // System.out.println(isTableExist("student")); // dropTable("student2019-5-9-9-15"); // addRowData("student", "1001", "info", "name", "zhangsan"); // addRowData("student", "1002", "info", "name", "lisi"); // deleteOneRow("student", "1001"); // deleteMultiRow("student", "1001","1002"); // getAllRows("student"); //getRowQualifier("student", "1001", "info", "name"); } /** * 获取某一行指定“列族:列”的数据 * @param tableName * @param rowKey * @param family * @param qualifier * @throws IOException */ public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); Result result = hTable.get(get); for (Cell cell : result.rawCells()) { System.out.println("行键:" + Bytes.toString(result.getRow())); System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } } /** * 获取一个表的所有数据 * * @param tableName * @throws IOException */ public static void getAllRows(String tableName) throws IOException { Connection connection = ConnectionFactory.createConnection(conf); HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName)); // 得到用于扫描 region 的对象 Scan scan = new Scan(); // 使用 HTable 得到 resultcanner 实现类的对象 ResultScanner resultScanner = hTable.getScanner(scan); for (Result result : resultScanner) { Cell[] cells = result.rawCells(); for (Cell cell : cells) { // 得到 rowkey System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell))); // 得到列族 System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell))); System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell))); } } } /** * 删除一行数据 * * @param tableName * @param rowKey * @throws IOException */ public static void deleteOneRow(String tableName, String rowKey) throws IOException { // 创建 HTable 对象 Connection connection = ConnectionFactory.createConnection(conf); HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); hTable.delete(delete); hTable.close(); } /** * 删除多行数据 * * @param tableName * @param rows * @throws IOException */ public static void deleteMultiRow(String tableName, String... rows) throws IOException { // 创建 HTable 对象 Connection connection = ConnectionFactory.createConnection(conf); HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName)); List<Delete> deleteList = new ArrayList<Delete>(); for (String row : rows) { Delete delete = new Delete(Bytes.toBytes(row)); deleteList.add(delete); } hTable.delete(deleteList); hTable.close(); } /** * 添加一行数据 * * @param tableName * 表名 * @param rowKey * @param columnFamily * 列族 * @param column * @param value * @throws Exception */ public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws Exception { // 创建 HTable 对象 Connection connection = ConnectionFactory.createConnection(conf); HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName)); // 向表中插入数据 Put put = new Put(Bytes.toBytes(rowKey)); // 向 Put 对象中组装数据 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); hTable.put(put); hTable.close(); System.out.println("插入数据成功"); } /** * 删除表 * * @param tableName * @throws Exception */ public static void dropTable(String tableName) throws Exception { Connection connection = ConnectionFactory.createConnection(conf); HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); if (isTableExist(tableName)) { // disableTable,否则删除不成功 admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println("表" + tableName + "删除成功!"); } else { System.out.println("表" + tableName + "不存在!"); } } /** * 创建表 * * @param tableName:表名字 * @param columnFamily:列族 * @throws Exception */ public static void createTable(String tableName, String... columnFamily) throws Exception { Connection connection = ConnectionFactory.createConnection(conf); HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); // 判断表是否存在 if (isTableExist(tableName)) { System.out.println("表" + tableName + "已存在"); // System.exit(0); } else { // 创建表属性对象,表名需要转字节 HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); // 创建多个列族 for (String cf : columnFamily) { descriptor.addFamily(new HColumnDescriptor(cf)); } // 根据对表的配置,创建表 admin.createTable(descriptor); System.out.println("表" + tableName + "创建成功!"); } } /** * 判断表是否存在 * * @param tableName * @return * @throws Exception */ public static boolean isTableExist(String tableName) throws Exception { // 在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象 Connection connection = ConnectionFactory.createConnection(conf); HBaseAdmin admin = (HBaseAdmin) connection.getAdmin(); // 有问题 boolean tableExists = admin.tableExists(tableName); return tableExists; } }
实现将 HDFS 中的数据写入到 HBase 表中
mapper
package com.imooc.hdfstohbase; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class HDFStoHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 从 HDFS 中读取的数据 String lineValue = value.toString(); // 读取出来的每行数据使用\t 进行分割,存于 String 数组 String[] values = lineValue.split("\t"); // 根据数据中值的含义取值 String rowKey = values[0]; String name = values[1]; String color = values[2]; // 初始化 rowKey ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey)); // 初始化 put 对象 Put put = new Put(Bytes.toBytes(rowKey)); // 参数分别:列族、列、值 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color)); context.write(rowKeyWritable, put); } }
reducer
package com.imooc.hdfstohbase; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; public class HDFStoHBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { // 读出来的每一行数据写入到 fruit_hdfs 表中 for (Put put : values) { context.write(NullWritable.get(), put); } } }
Driver
package com.imooc.hdfstohbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HDFStoHBaseDriver implements Tool { private Configuration conf = null; public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new HDFStoHBaseDriver(), args); System.exit(status); } public int run(String[] args) throws Exception { // 得到 Configuration Configuration conf = this.getConf(); // 创建 Job 任务 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(HDFStoHBaseDriver.class); Path inPath = new Path("hdfs://192.168.1:8020/input_fruit/fruit.tsv"); FileInputFormat.addInputPath(job, inPath); // 设置 Mapper job.setMapperClass(HDFStoHBaseMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // 设置 Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", HDFStoHBaseReducer.class, job); // 设置 Reduce 数量,最少 1 个 job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); if (!isSuccess) { throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; } @Override public Configuration getConf() { return this.conf; } @Override public void setConf(Configuration conf) { this.conf = HBaseConfiguration.create(conf); } }
遇到的坑
问题1:
2019-05-08 21:53:21,253 INFO [org.apache.zookeeper.ZooKeeper] - Initiating client connection, connectString=47.105.132.96:2181 sessionTimeout=90000 watcher=org.apache.hadoop.hbase.zookeeper.PendingWatcher@5a4041cc 2019-05-08 21:53:22,077 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 47.105.132.96/47.105.132.96:2181. Will not attempt to authenticate using SASL (unknown error) 2019-05-08 21:53:43,081 WARN [org.apache.zookeeper.ClientCnxn] - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection timed out: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) 2019-05-08 21:53:44,311 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server 47.105.132.96/47.105.132.96:2181. Will not attempt to authenticate using SASL (unknown error) 2019-05-08 21:54:05,314 WARN [org.apache.zookeeper.ClientCnxn] - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection timed out: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
解决办法:
1)zookeeper服务没开
2)2181 端口没开
3)将 C:\Windows\System32\drivers\etc 下的 hosts 中 添加 linux映射
问题2:
2019-05-08 22:44:45,358 INFO [org.apache.zookeeper.ClientCnxn] - Socket connection established to 47.105.132.96/47.105.132.96:2181, initiating session 2019-05-08 22:44:45,418 INFO [org.apache.zookeeper.ClientCnxn] - Session establishment complete on server 47.105.132.96/47.105.132.96:2181, sessionid = 0x16a97cb36b40008, negotiated timeout = 40000 Exception in thread "main" java.io.IOException: Failed to get result within timeout, timeout=60000ms at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:206) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:60) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:212) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:314) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:289) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:164) at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:159) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:796) at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:602) at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366) at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:408) at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:418) at com.imooc.demo.HBaseDemo.isTableExist(HBaseDemo.java:62) at com.imooc.demo.HBaseDemo.main(HBaseDemo.java:23)
解决办法:
如果各种配置都没有错的话,那就是hbase下面的4个端口没开放
<property> <name>hbase.master.port</name> <value>16000</value> </property> <property> <name>hbase.master.info.port</name> <value>16010</value> </property> <property> <name>hbase.regionserver.port</name> <value>16201</value> </property> <property> <name>hbase.regionserver.info.port</name> <value>16301</value> </property>