- 上次将了HBase搭建完成后,一些Shell的使用,现在我们将了解一下HBase的代码控制,我使用的是java8以及hbase2.1.1版本
- 首先你的集群已经是搭建完成了,然后这次我使用的是IDEA,我们需要将集群上的
hbase-site.xml
和hdfs-site.xml
以及日志配置文件下载下来,放入根目录,如下
-
准备好这些,还需要你配置好机器的maven环境,下面是用到的pom文件
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.1</version> </dependency> </dependencies>
- 完成了这些,还需要注意一点就是你的配置文件中是你Linux主机的hostname,而你现在的机器如果没有配置Linux的ip映射,就会发生错误,那么两种办法可以解决,我的是win的系统,所以如果你把配置文件中的hostname配置到你的windows主机上,就请打开
C:\Windows\System32\drivers\etc\host
,输入你的{ip} {hostname}
配置映射即可,第二种办法,就是你直接更改你的配置文件中的hostname为Linux的ip就可以了 - 在完成昨天的学习后,我们的HBase中已经没有任何我们自己建的表,那么我们就使用api来建立HBase表
建表
-
对于我这个版本来说,参考书上的代码已经是部分过期不推荐使用了,而是换成了xxxBuilder来更灵活的创建对象,这次我们创建test表,列族依旧是cf,如下
public void createTable() throws IOException { //使用HBase配置创建新连接 Connection connection = ConnectionFactory.createConnection(); //或者HBase集群的管理员实例,实例不是线程安全的 Admin admin = connection.getAdmin(); //创建表名描述 TableName tableName = TableName.valueOf("default:test"); //创建表描述,并赋予表名 TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableName); //Builder构建列族信息 ColumnFamilyDescriptor cf = ColumnFamilyDescriptorBuilder.newBuilder("cf".getBytes()).build(); //将列族信息也赋予表描述对象 TableDescriptor table = tableBuilder.setColumnFamily(cf).build(); //创建表 admin.createTable(table); //关闭 admin.close(); //关闭 connection.close(); }
- 如果控制台没有报错,那么就是创建成功了,然后我们在控制台的输出上可以看到许多有关会话相关的环境信息的打印以及会话地址与会话和连接关闭动作等
- 这里需要说一点,上面的
TableName.valueOf("default:test");
中的default,就类似RDBMS中的库的概念,我们昨天在shell操作中,并没有指定哪个库,那么默认就会有一个default库供我们使用,库的名次在HBase中叫做namespace - 有了表,我们可以来修改表的信息,就像昨天在HBase shell中查看表属性一样,我们一样来修改VERSIONS属性,提前确定一下你目前的test表的
VERSIONS => '1'
修改列族属性
-
修改cf列族的VERSIONS属性为5
public void modifyCF() throws IOException { Connection connection = ConnectionFactory.createConnection(); Admin admin = connection.getAdmin(); TableName tableName = TableName.valueOf("default:test"); //修改列族的信息,肯定要得到列族描述对象 //需要注意的是,由于我们不指定VERSIONS的话,默认最大版本和最小版本都是1 //所以如果你像下面在修改VERSIONS的时候,需要遵循MinVersions < MaxVersions //最终修改到HBase中去的值是最大值 ColumnFamilyDescriptor cf = ColumnFamilyDescriptorBuilder.newBuilder("cf".getBytes()) .setMinVersions(4).setMaxVersions(5).build(); //执行修改 admin.modifyColumnFamily(tableName,cf); connection.close(); admin.close(); }
- 那么现在在shell窗口
desc 'test'
就会看到VERSIONS => '5'
,上面的修改操作是sync的,也有一个异步修改的方法,只是方法名不一样而已modifyColumnFamilyAsync(x,x)
,会修改了VERSIONS,那么修改别的属性只是在找方法名而已 - 好了,我们将列族内的属性修改了之后,他会为我们保留五个历史cell,所以下面我们就开始添加数据啦~
put添加数据
-
put没什么特别需要注意的,如下代码
public void putData() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); //创建put对象,一个put肯定要对应一行的,所以参数是必须的,但是有重载的参数,自己可以去看一下 Put putObj = new Put("row1".getBytes()); //上面行指定完后,就需要指定列族,列,值,这样才是完整的 putObj.addColumn("cf".getBytes(), "name".getBytes(), "wangziqiang".getBytes()); //执行~ table.put(putObj); connection.close(); table.close(); }
- 我们发现一直到现在我们写过的api中,都是
xx.getBytes()
显得很麻烦,那么有什么办法解决吗?还真..没有,既然不能解决,HBase就自带了一个Bytes类,使用也很简单就是Bytes.toXXX(ss)
就可以了,这也说明了HBase中所有数据都是bytes数组,一切可以序列化为bytes的对象都可以作为rowkey,所以你就可以使用任何的序列化工具来保存在HBase中,比如Avro之类的 -
Put类每次调用addColumn都会返回Put,所以我们就可以使用Builder模式来写代码了,如下
putObj.addColumn(xx) .addColumn(xx) .addColumn(xx);
- 但是如果在你读出数据之后和修改数据中间这段时间,如果有别人也修改了这个数据,就会发生数据不一致的问题,因为你后添加的数据将会更新掉人家添加的那个数据,那么我们怎么确保这期间不会有人插足,成为第三者呢?
-
checkAndMutate方法只是把检查和写入这两个步骤合二为一了.checkAndMutate方法在写入 前会先比较目前存在的数据是否与你传入的数据一致,如果一致则进行 put操作,并返回true.如果不一致,则返回false,但不写入数据(原子操作).那我们试试,首先HBase中的数据我并没有改,依旧是上面插入的wangziqiang,然后我们编写代码
public void checkAndPut() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("test")); Put putObj = new Put("row1".getBytes()); putObj.addColumn("cf".getBytes(), "name".getBytes(), "wzq".getBytes()); //如果你对链式编程很熟悉,那么下面会很简单 //猜测thenPut方法是一个"触发方法",因为java的stream就是这样的 //指定行和列族 boolean b = table.checkAndMutate("row1".getBytes(), "cf".getBytes()) //指定列 .qualifier("name".getBytes()) //判断是否与这个串相等 .ifEquals("wangziqiang".getBytes()) //相等就put返回true,否则不put返回false .thenPut(putObj); System.out.println(b); table.close(); connection.close(); }
-
在跑之前,我们将name值用shell改掉,看看他是否还是会put
hbase(main):022:0> scan 'test' ROW COLUMN+CELL row1 column=cf:name, timestamp=1543680836035, value=xgl 1 row(s) Took 0.0130 seconds
-
执行结果就是false,当我们在手动改回去的时候,再次执行,就会返回true,不仅仅只能判断是否相等,还可以有比较器啊,或者删除操作,那么比较器是怎么用呢?
boolean b = table.checkAndMutate("row1".getBytes(), "cf".getBytes()) //指定列 .qualifier("name".getBytes()) //判断是否比1小 .ifMatches(CompareOperator.LESS, Bytes.toBytes(1)) //相等就put返回true,否则不put返回false .thenPut(putObj);
-
对于枚举类可选参数有如下
/** less than */ LESS, /** less than or equal to */ LESS_OR_EQUAL, /** equals */ EQUAL, /** not equal */ NOT_EQUAL, /** greater than or equal to */ GREATER_OR_EQUAL, /** greater than */ GREATER, /** no operation */ NO_OP,
append追加数据
-
就是在你指定位置数据上追加一段数据,比如我们在name上随意追加一串字符,如下
public void appendData() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); Append append = new Append("row1".getBytes()); //指定列族列 append.addColumn("cf".getBytes(),"name".getBytes(),"NP".getBytes()); table.append(append); table.close(); connection.close(); }
- 没什么好说的
get获取数据
-
get方法跟shell作用的一样的,那么我们就可以得到一个版本的cell,或者全部版本的cell,如下
public void getData() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); //指定行 Get getObj = new Get("row1".getBytes()); //根据行查询出行中信息,封装为result对象 Result result = table.get(getObj); for (Cell cell : result.listCells()) { System.out.println(Bytes.toString(cell.getRowArray())); System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); System.out.println("Timestamp:" + cell.getTimestamp()); } table.close(); connection.close(); }
-
执行结果
row1cfage gl��20 family:cf qualifier:age value:20 Timestamp:1543706228633 row1cfname glj;wangziqiang family:cf qualifier:name value:wangziqiang Timestamp:1543706208827
- 你没看错,结果就有的是类似乱码,但这只是编码显示不了而已,我在这卡主了半天,硬是没想到偏移量,在查看帮助后才修改过来,所以自学是真的有点苦逼的哈哈 ,我们看上面的输出语句和对照下面的输出就能看到这个get是怎么用的,我猜那些个乱码应该是标识下一个数据长度的这么一个东西,不过只是猜测,之后肯定会有一个HBase的File或者Cell结构,我们就清楚了,现在不用了解很多,而且我也不会哈哈
- 网上有人说str.toBytes可能会有风险,我也不清楚,不过写在这里也是告诉大家一下,毕竟HBase提供了工具类Bytes
-
当然上面的代码中getObj也可以添加列条件,比如
... Get getObj = new Get("row1".getBytes()); getObj.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name")); ...
-
那么执行结果就只是有name的cell了,到这我们只是返回了一个版本的cell,我们之前可是put了很多的name的,那么如何查看这些信息呢?
//代码几乎是一致的,只是将get对象设置一个属性即可 public void getData() throws IOException { .... //指定行 Get getObj = new Get("row1".getBytes()); getObj.readVersions(5); //变化!!!! //根据行查询出行中信息,封装为result对象 Result result = table.get(getObj); for (Cell cell : result.getColumnCells(Bytes.toBytes("cf"),Bytes.toBytes("name"))) { System.out.println(Bytes.toString(cell.getValueArray())); System.out.println(Bytes.toString(CellUtil.cloneValue(cell))); } ... }
- 好了这就是查看历史cell的方法,如果你没有运行出结果来,而仅仅是一条,那么你就应该想到你的表是否多次添加过相同cell的数据,并且他的VERSIONS属性是设置正确的
-
上面有两个输出,他们是这样的
row1cfname glK�wangziqiang wangziqiang
- 第一个输出是直接getXXvalue的方法获取的,他将完整的信息全部都拿到了,而下面的clone方法,仅仅是我们需要的数据,所以在使用cell上优先使用第二种
increment
-
一看名字就知道,他肯定是原子性的并且是数字操作,好了我的表中现在我是20岁,我需要将它加十岁
public void incrementData() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); Increment increment = new Increment(Bytes.toBytes("row1")); increment.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),10L); table.increment(increment); table.close(); connection.close(); }
-
需要注意的是,当你查看HBase中的Long型数据的时候,他并不能直接看到,我们需要用到上面get方法才能取得我们能看得懂的数据,我找了一下没有发现shell中直接查看这个数据的内置命令,只能通过这个
//以后在找到了方便的在更新吧,如果您知道怎么做,请您不吝啬的抛给我哈哈 org.apache.hadoop.hbase.util.Bytes.toLong("\x00\x00\x00\x00\x00\x00\x00\x1E".to_java_bytes)
- 当然了传入-10L,就是减法操作了
删除数据
-
也不一定是数据,还可以是行啊或者列族
public void delete() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); Delete del = new Delete(Bytes.toBytes("row1")); del.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name")); del.addColumns(Bytes.toBytes("cf"),Bytes.toBytes("age")); table.delete(del); table.close(); connection.close(); }
- 注意使用方法的变化
addColumns
和addColumn
一个只删除最新版本,一个是全部版本都删除 -
delete也有一个类似put方法的check版本,其使用的方法都是一样的,就是
checkAndMutate
,之后后续调用的是thenDelete而已public void checkAndDelete() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); Delete del = new Delete(Bytes.toBytes("row1")); del.addColumns(Bytes.toBytes("cf"),Bytes.toBytes("name")); table.checkAndMutate(Bytes.toBytes("row1"),Bytes.toBytes("name")) .ifEquals(Bytes.toBytes("wangziqiang")) .thenDelete(del); table.close(); connection.close(); }
RowMutations
- 当你想在一行中添加一列的时候同时删除另一列,我们现在就可以做到,就是分开使用put和delete,但是
RowMutations
类可以让你完成这个想法 -
这个类的继承关系
public class RowMutations implements Row {} public interface Row extends Comparable<Row> {} public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable,HeapSize {} 而RowMutations,Append,Delete,Put,Increment,Get等都是这个类的实现,既然是他们的共同父类,所以就可以达到一起使用了
- 既然是一起执行,那么肯定就是原子性的了
-
使用
public void test() throws IOException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); //将name全部版本cell删除 Delete delete = new Delete(Bytes.toBytes("row1")) .addColumns(Bytes.toBytes("cf"), Bytes.toBytes("name")); //添加addr:beijing Put put = new Put(Bytes.toBytes("row1")) .addColumn(Bytes.toBytes("cf"),Bytes.toBytes("addr"),Bytes.toBytes("beijing")); List<Mutation> list = new ArrayList<>(); list.add(delete); list.add(put); RowMutations rowMutations = new RowMutations(Bytes.toBytes("row1")); rowMutations.add(list); //执行 table.mutateRow(rowMutations); table.close(); connection.close(); }
- 这个方法也存在check方法,都是使用与put和delete一样的方法
checkAndMutation
,具体自己可以试一下,就不贴出来了 - 看到这,如果我们现在需要put很多数据呢?难道用for吗,那不可能,每次put执行都会以rpc请求的方式去执行,这样就太慢了,所以HBase给出了批量操作的api
认识批量操作
-
批量操作的api依旧是对表的操作,所以还是在我们的Table对象上用批量操作
default void batch(final List<? extends Row> actions, final Object[] results)..{}
- 我们可以看到List的泛型为Row,而一般数据操作都是Row的子类,所以说batch可以混杂的批量执行很多命令,而第二个参数是执行结果
- 需要注意的是,虽然可以混杂执行但是最好不要把针对同一个单元格的Put和Delete放到同一个actions列 表里面,因为HBase不一定是顺序地执行这些操作的,你可能会得到 意想不到的结果
- results可能出现的结果类型
类型 | 说明 |
---|---|
null | 操作与服务器端的通信失败 |
EmptyResult | put和delete操作成功之后的返回结果 |
Result | Get操作成功之后的返回结果,如果没有匹配get的条件,那么就返回空的Result |
Throwable | 操作在服务器端出现异常了,服务端会将错误返回回来 |
- 了解了个大概之后我们来看一下具体的批量操作是怎么用的和需要注意的地方
批量put
- 需要注意的是当一部分数 据插入成功,但是另一部分数据插入失败,比如某个RegionServer服务 器出现了问题,这时会返回一个IOException,操作会被放弃.不过插 入成功的数据不会被回滚,还是成功插入了
- 对于插入失败的数据,服务器会再次尝试插入或者换一个RegionServer,如果尝试次数比定义的次数多了,会出现
RetriesExhaustedWithDetailsException
,这个异常包含了有多少操作失败了以及失败的原因之类的详细信息 - 插入失败的数据会继续被放到本地的写缓冲区,并在下次插入的时 候重试,不过本文不介绍,因为我参考的资料以后讲哈哈
-
下面就只剩下代码实现了
public void test() throws IOException, InterruptedException { Connection connection = ConnectionFactory.createConnection(); Table table = connection.getTable(TableName.valueOf("default:test")); Put put = new Put(Bytes.toBytes("row1")); put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes("wangziqiang")); put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("addr"),Bytes.toBytes("hebei")); put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("heigth"),Bytes.toBytes(182)); put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("garden"),Bytes.toBytes("nan")); ArrayList<Row> list = new ArrayList<>(); list.add(put); Object[] result = new Object[list.size()]; table.batch(list,result); table.close(); connection.close(); }
批量get和批量delete
- 代码将不再贴出来了,因为只是将Put更改为Get而已,并且他会返回一个Result,那么这个Result的处理与单条get返回的是一样的处理方式
- 说明一下:对于批量操作的失败处理,我只是照搬参考资料,因为我还没有办法模拟出半路出错,所以大家看到需要谨慎对待一下,下面我还是贴出资料的对于失败get和失败delete的说明
-
对于失败get
- 如果查询失败,整个get方法都会失败并抛出异常,一点返 回结果都没有.所以如果你想即使失败了也返回一部分数据,那么建议 你使用batch方法
-对于失败delete - 如果删除失败了,这个操作还是会保留在delete的传参deletes列 表中,并且还同时会抛出一个异常,所以你可以捕捉异常后,再检查删 除失败后剩下的这些Delete操作,然后再对这些剩下的Delete对象进行下一步的操作
- 如果查询失败,整个get方法都会失败并抛出异常,一点返 回结果都没有.所以如果你想即使失败了也返回一部分数据,那么建议 你使用batch方法
关于setAutoFlush
- 这一小节可以过眼了解一下就可以,setAutoFlush在老版本的Hbase中,是用来进行客户端缓存的,也就说,你执行的操作可以先缓存到客户端,然后批量提交到服务器,这个方法就是这个作用
- 已经弃用了,在网上一些文章中说的现在也过时了,因为他们是基于HBase0.9版本左右的,而我现在用的是2.1.1了已经不是一个时代了
-
弃用理由
- 客户端会维护一个HTablePool,这是一个存放HTable实例的线程 池.
- HTable实例不会每次都创建新的,而是从HTablePool中尝试获取实例,获取不到再打开连接.
- 每一个HTable都有一个写缓冲区,用来加速批量操作
- 在老版本中,HTable的生命周期都很长,所以创建也是耗时耗力的,不应该一个HTable对象就带一个缓冲区,因为这样会造成耗费内存,而且这个对象不是线程安全的,并且HTablePool也废弃了,自己理解就是不安全且是重量级的对象
- 而现在的HTable更推荐的做法是用完就释放,每个HTable是一个轻量级对象
-
所以综上所述:setAutoFlush被废弃了,每个表自带的 writeBuffer也被废弃了,但是客户端写缓冲区还是存在的,只是结构 和调用方式并不是之前那样了,而是转而使用BufferedMutator对象
Connection connection = ConnectionFactory.createConnection(); BufferedMutator bufferedMutator = connection.getBufferedMutator(TableName.valueOf("test")); bufferedMutator.mutate(put); //用此对象提交put操作 //然后调用flush或者close将请求批量交给服务器
- 需要注意的是过这个类更多地是被HBase内部调用,所以不推荐直接使用,
- 推荐的做法:就是啥也不做,按照上面batch批量api使用方法使用就行,在这些批量方法的内部调用的也是 BufferedMutator接口,客户端已经默认帮你调用了写缓存