1.HBase-client和HBase是如何连接的?
这个问题实际上在我之前的文章 深入HBase读写 中介绍过。
当HBase-client第一次请求读写的时候,需要三步走:
1)HBase-client从zk中获取保存meta table的位置信息,知道meta table保存在了哪个region server,然后缓存这个位置信息;
2)HBase-client会查询这个保存meta table的特定的region server,查询meta table信息,在table中获取自己想要访问的row key所在的region在哪个region server上。
3)客户端直接访问目标region server,获取对应的row
所以,我们知道hbase-client实际上包含三部分连接:
- 跟zk连接,获取相关元信息
- 跟HMaster连接,做相关DDL操作
- 直接跟各个region server进行连接,进行增删改查
2.HBase客户端连接原理
常规一次请求的写法是这样的
Connection connection = ConnectionFactory.createConnection(conf); try { Table table = connection.getTable(TableName.valueOf("tablename”)); // 插入数据 Put put = new Put(Bytes.toBytes("row")); put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value")); table.put(put); // 单行读取 Get get = new Get(Bytes.toBytes("row")); Result res = table.get(get); // 删除一行数据 Delete delete = new Delete(Bytes.toBytes("row")); table.delete(delete); }catch (IOException e) { //..... } finally { table.close(); connection.close(); }
我们不禁有这样的疑问:
1)HBase没有连接池吗?
2)connection表示的是一个连接吗?
3)connection每个线程都得创建吗?线程安全吗?
4)table每个线程都得创建吗?线程安全吗?
下面一一解答。
首先,Connection是线程安全的,而Table和Admin则不是线程安全的。
因此正确的做法是一个进程(或服务)使用一个Connection对象,而在不同的线程中使用单独的Table和Admin对象。
Connection持有RpcClient,RpcClient管理了一个连接池poolMap
protected final PoolMap<ConnectionId, T> connections; //…. this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
通过AbstractRpcClient的getConnection看到,连接T继承RpcConnection,使用了NettyRpcConnection。
这里顺便通过getPoolType和getPoolSize看了下线程池的大小和类型。
在枚举类PoolType中有三种线程池类型Reusable, ThreadLocal, RoundRobin,用户可以用hbase.client.ipc.pool.type指定线程池类型,通过hbase.client.ipc.pool.size指定线程池大小(默认是1)。
3.优化实践
搞清楚上面的原理后,下面就可以开始优化我们的HBase管理平台了。
只需要对每个HBase集群的connection使用Map保存下来,每次请求的时候拿出对应的connection进去相关操作即可。然后需要注意在系统退出的时候关闭所有的connection。
上代码:
public class ConnectionManager { private Map<String, Connection> connectionMap = new ConcurrentHashMap<>(); public Connection getConnection(String resourceId, Configuration configuration) { ResourceInfo resourceInfo = ResourceInfoCache.getResourceInfoByCache(resourceId); if (resourceInfo == null) { throw new IllegalArgumentException("error resourceid: " + resourceId); } String key = getClusterKey(resourceInfo); if (connectionMap.containsKey(key)) { return connectionMap.get(key); } synchronized (this) { //做个DCL检查 if (connectionMap.containsKey(key)) { return connectionMap.get(key); } Connection connection = null; try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { return null; } connectionMap.put(key, connection); return connection; } } @PreDestroy public void doDestroy() { for (Map.Entry<String, Connection> entry : connectionMap.entrySet()) { Connection connection = entry.getValue(); if (connection != null) { try { connection.close(); } catch (IOException e) { //。。。。 } } } } }
这里有几个注意点:
- 将ConnectionManager注册为bean,交给spring容器管理生命周期,同时保证单例。
- 使用@PreDestroy保证应用关闭时,能正确释放所有连接,避免连接泄漏
- connectionMap使用ConcurrentHashMap保证线程安全
- DCL检查,避免重复创建同一个connection,浪费资源;同时避免重复创建connection后,无法关闭导致连接泄漏。
在需要查询时,只需要通过getConnection获取已经存在的connection即可。
当然,如果是普通的应用使用HBase-client,一般只需要对一个HBase的集群创建全局唯一的一个Connection即可(一般交给spring容器管理),每次请求的时候,创建对应的Table进行CRUD。