HBase优化
查询优化
设置Scan缓存
在HBase中,可以通过设置Scan
对象的setCaching()
方法来调整Scan
缓存的大小。Scan
缓存用于指定每次扫描操作从RegionServer返回给客户端的行数。通过调整缓存大小,可以在一定程度上控制数据的读取性能和网络传输的开销。
以下是设置Scan
缓存的示例代码:
Scan scan = new Scan(); scan.setCaching(500); // 设置缓存大小为500行 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // 处理扫描结果 } scanner.close();
在上述示例中,setCaching()
方法将缓存大小设置为500行。可以根据实际需求调整这个值,需要根据数据大小、网络带宽和性能要求进行权衡。较大的缓存大小可以减少客户端与RegionServer之间的通信次数,提高读取性能,但同时也会增加内存消耗。较小的缓存大小可以减少内存消耗,但可能会增加通信次数和网络传输开销。
需要注意的是,setCaching()
方法设置的是每次扫描的缓存大小,并不是全局的设置。如果需要对整个表的扫描操作生效,需要在每次扫描时都设置缓存大小。
此外,还可以通过调整HBase的配置参数来全局设置缓存大小。在hbase-site.xml
配置文件中添加以下参数可以设置默认的缓存大小:
<property> <name>hbase.client.scanner.caching</name> <value>500</value> <!-- 设置默认的缓存大小为500行 --> </property>
以上是通过代码和配置文件来设置Scan
缓存大小的方法,根据具体的应用场景和需求,可以选择适当的方式进行设置。
显示指定列
当使用Scan或者GET获取大量的行时,最好指定所需要的列,因为服务端通过网络传输到客户端,数据量太大可能是瓶颈。如果能有效过滤部分数据,能很大程度的减少网络I/O的花费。
在HBase中,可以使用Scan
或Get
操作来显示指定的列。下面分别介绍两种方式的用法:
- 使用
Scan
操作显示指定列:
Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定列族(cf)和列(col1) ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 处理列(col1)的值 } scanner.close();
在上述示例中,使用scan.addColumn()
方法来指定要显示的列族和列。在for
循环中,通过result.getValue()
方法获取指定列的值。
- 使用
Get
操作显示指定列:
Get get = new Get(Bytes.toBytes("row1")); // 指定行键(row1) get.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定列族(cf)和列(col1) Result result = table.get(get); byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 处理列(col1)的值
在上述示例中,使用get.addColumn()
方法来指定要显示的列族和列。通过table.get()
方法获取行数据,并通过result.getValue()
方法获取指定列的值。
无论是使用Scan
还是Get
,都可以通过addColumn()
方法来指定要显示的列族和列。可以根据具体的需求,多次调用addColumn()
方法来显示多个列。
需要注意的是,HBase中的列是以字节数组(byte[]
)形式表示的,因此在使用addColumn()
和getValue()
方法时,需要将列族和列名转换为字节数组。
禁用块缓存
如果批量进行全表扫描,默认是有缓存的,如果此时有缓存,会降低扫描的效率。
在HBase中,可以通过设置Scan
对象的setCacheBlocks()
方法来禁用块缓存。块缓存是HBase中的一种缓存机制,用于加快数据的读取操作。然而,在某些情况下,禁用块缓存可能是有益的,例如对于某些热点数据或者需要立即获取最新数据的场景。
以下是禁用Scan
块缓存的示例代码:
Scan scan = new Scan(); scan.setCacheBlocks(false); // 禁用块缓存 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // 处理扫描结果 } scanner.close();
在上述示例中,setCacheBlocks(false)
方法将禁用Scan
操作的块缓存。
需要注意的是,禁用块缓存可能会增加对HBase存储的实际磁盘读取次数,并且在一些场景下可能导致性能下降。因此,在禁用块缓存之前,建议仔细评估应用需求和场景,确保禁用块缓存的决策是合理的。
对于经常读到的数据,建议使用默认值,开启块缓存。
写入优化
设置AutoFlush
Htable有一个属性是AutoFlush,该属性用于支持客户端的批量更新,默认是true,当客户端每收到一条数据,立刻发送到服务端,如果设置为false,当客户端提交put请求时候,先将该请求在客户端缓存,到达阈值的时候或者执行hbase.flushcommits(),才向RegionServer提交请求。
在HBase中,可以通过设置Table
对象的setAutoFlush()
方法来控制自动刷新(AutoFlush)行为。AutoFlush决定了在何时将数据从客户端发送到RegionServer并写入到存储中。
以下是设置AutoFlush的示例代码:
// 创建HBase配置对象 Configuration conf = HBaseConfiguration.create(); // 创建HBase连接 Connection connection = ConnectionFactory.createConnection(conf); // 获取表对象 TableName tableName = TableName.valueOf("your_table_name"); Table table = connection.getTable(tableName); // 设置AutoFlush table.setAutoFlush(false); // 关闭AutoFlush // 执行写入操作 Put put = new Put(Bytes.toBytes("row1")); put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("value1")); table.put(put); // 手动刷新数据 table.flushCommits(); // 手动刷新数据到RegionServer // 关闭表和连接 table.close(); connection.close();
在上述示例中,table.setAutoFlush(false)
方法将关闭AutoFlush。这意味着在执行写操作时,数据不会立即被刷新到RegionServer和存储中,而是先缓存在客户端的内存中。只有当调用table.flushCommits()
方法时,数据才会被手动刷新到RegionServer。
需要注意的是,关闭AutoFlush可以提高写入性能,尤其是在批量写入或者频繁写入的场景中。但是,关闭AutoFlush也会增加数据在客户端内存中的暂存时间,并增加了数据丢失的风险。因此,在关闭AutoFlush时,需要在适当的时机手动调用flushCommits()
方法来确保数据的持久性。
同时,还可以通过设置table.setWriteBufferSize()
方法来指定客户端写缓冲区的大小。这可以帮助在缓存中存储更多的数据,减少刷新到RegionServer的次数,提高写入性能。例如:
table.setWriteBufferSize(1024 * 1024); // 设置写缓冲区大小为1MB
在上述示例中,将写缓冲区大小设置为1MB。
总之,通过设置table.setAutoFlush(false)
和table.setWriteBufferSize()
方法,可以控制AutoFlush行为和客户端写缓冲区大小,以优化写入性能和数据刷新的策略。根据具体的应用需求和场景,可以进行适当的配置调整。
参数优化
Zookeeper 会话超时时间
属性:zookeeper.session.timeout
解释:默认值为 90000 毫秒(90s)。当某个 RegionServer 挂掉,90s 之后 Master 才能察觉到。可适当减小此值,尽可能快地检测 regionserver 故障,可调整至 20-30s。看你能有都能忍耐超时,同时可以调整重试时间和重试次数
hbase.client.pause(默认值 100ms)
hbase.client.retries.number(默认 15 次)
设置 RPC 监听数量
属性:hbase.regionserver.handler.count
解释:默认值为 30,用于指定 RPC 监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。
手动控制 Major Compaction
属性:hbase.hregion.majorcompaction
解释:默认值:604800000 秒(7 天), Major Compaction 的周期,若关闭自动 Major Compaction,可将其设为 0。如果关闭一定记得自己手动合并,因为大合并非常有意义。
优化 HStore 文件大小
属性:hbase.hregion.max.filesize
解释:默认值 10737418240(10GB),如果需要运行 HBase 的 MR 任务,可以减小此值,因为一个 region 对应一个 map 任务,如果单个 region 过大,会导致 map 任务执行时间。过长。该值的意思就是,如果 HFile 的大小达到这个数值,则这个 region 会被切分为两个 Hfile。
优化 HBase 客户端缓存
属性:hbase.client.write.buffer
解释:默认值 2097152bytes(2M)用于指定 HBase 客户端缓存,增大该值可以减少 RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少 RPC 次数的目的。
指定 scan.next 扫描 HBase 所获取的行数
属性:hbase.client.scanner.caching
解释:用于指定 scan.next 方法获取的默认行数,值越大,消耗内存越大。
SpringBoot中使用HBase
添加 Maven 依赖:
<!-- HBase 2.4.3 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.4.3</version> </dependency>
配置 HBase 连接:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @Configuration public class HBaseConfig { @Bean public Connection hbaseConnection() throws IOException { Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "localhost"); // HBase ZooKeeper 地址 config.set("hbase.zookeeper.property.clientPort", "2181"); // HBase ZooKeeper 端口 return ConnectionFactory.createConnection(config); } }
编写增删改查代码:
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class HBaseService { @Autowired private Connection hbaseConnection; //添加数据 public void putData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException { Table table = hbaseConnection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); table.put(put); table.close(); } //删除数据 public void deleteData(String tableName, String rowKey) throws IOException { Table table = hbaseConnection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); table.close(); } //获取数据 public String getData(String tableName, String rowKey, String columnFamily, String column) throws IOException { Table table = hbaseConnection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); byte[] valueBytes = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); table.close(); return Bytes.toString(valueBytes); } }
在上述代码中,HBaseConfig
类配置了 HBase 连接,通过 hbaseConnection()
方法创建 HBase 连接。HBaseService
类提供了 putData()
、deleteData()
和 getData()
方法,分别用于插入数据、删除数据和获取数据。
Scan
以下是使用Scan 操作的示例代码:
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class HBaseScanExample { public static void main(String[] args) throws IOException { // 创建 HBase 配置对象 Configuration conf = HBaseConfiguration.create(); // 创建 HBase 连接 Connection connection = ConnectionFactory.createConnection(conf); // 获取表对象 TableName tableName = TableName.valueOf("your_table_name"); Table table = connection.getTable(tableName); // 创建 Scan 对象 Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定要查询的列族和列 // 执行 Scan 操作 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // 处理每一行数据 byte[] row = result.getRow(); byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")); System.out.println("Row key: " + Bytes.toString(row) + ", Value: " + Bytes.toString(value)); } // 关闭资源 scanner.close(); table.close(); connection.close(); } }
在上述代码中,首先创建 HBase 配置对象 Configuration
,然后通过 ConnectionFactory
创建 HBase 连接 Connection
。接下来,通过连接获取表对象 Table
,指定要进行 Scan 操作的表名。然后创建 Scan
对象,并使用 addColumn
方法指定要查询的列族和列。最后,使用 getScanner
方法执行 Scan 操作,并遍历 ResultScanner
获取每一行的数据,并进行处理。
Phoenix
Phoenix是一个开源的基于Apache HBase的关系型数据库引擎,它提供了SQL接口来访问HBase中存储的数据。它在HBase的基础上添加了SQL查询和事务功能,使得使用HBase的开发者可以使用熟悉的SQL语言进行数据操作和查询。
Phoenix在HBase中的主要用途包括:
- SQL查询:Phoenix允许开发者使用标准的SQL语句来查询和操作HBase中的数据,无需编写复杂的HBase API代码。这简化了开发过程,降低了使用HBase进行数据访问的门槛。
- 索引支持:Phoenix提供了对HBase数据的二级索引支持,开发者可以使用SQL语句创建索引,从而加快查询速度。索引在数据查询和过滤中起到重要的作用,提高了数据的检索效率。
- 事务支持:Phoenix引入了基于MVCC(多版本并发控制)的事务机制,使得在HBase中进行复杂的事务操作成为可能。开发者可以通过Phoenix的事务功能来保证数据的一致性和可靠性。
- SQL函数和聚合:Phoenix支持各种内置的SQL函数和聚合函数,如SUM、COUNT、MAX、MIN等,使得在HBase上进行数据统计和分析变得更加方便。
要在HBase中使用Phoenix,需要先安装并配置好Phoenix。以下是一个在HBase中使用Phoenix的示例代码:
- 添加 Maven 依赖: 在 Maven 项目的
pom.xml
文件中添加以下依赖:
<!-- Phoenix 依赖 --> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>4.16.0-HBase-2.4</version> </dependency>
- 创建 Phoenix 表: 在 HBase 中创建 Phoenix 表。可以使用 Phoenix 提供的 SQL 语法创建表和定义模式。例如,创建一个名为
users
的表:
CREATE TABLE users ( id BIGINT PRIMARY KEY, name VARCHAR, age INTEGER );
- 使用 Phoenix 进行操作: 在 Java 代码中,可以使用 Phoenix 提供的
PhoenixConnection
和PhoenixStatement
来执行 SQL 操作。
import java.sql.*; public class PhoenixExample { public static void main(String[] args) throws SQLException { // 创建 Phoenix 连接 String url = "jdbc:phoenix:<HBase ZooKeeper Quorum>:<HBase ZooKeeper Port>"; Connection connection = DriverManager.getConnection(url); // 执行 SQL 查询 String query = "SELECT * FROM users"; Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(query); // 处理查询结果 while (resultSet.next()) { long id = resultSet.getLong("ID"); String name = resultSet.getString("NAME"); int age = resultSet.getInt("AGE"); System.out.println("ID: " + id + ", Name: " + name + ", Age: " + age); } // 关闭资源 resultSet.close(); statement.close(); connection.close(); } }
在上述代码中,需要将 <HBase ZooKeeper Quorum>
和 <HBase ZooKeeper Port>
替换为你的 HBase ZooKeeper 地址和端口。
通过创建 PhoenixConnection
并传递正确的 JDBC URL,可以获得连接对象。接下来,可以使用 createStatement()
方法创建 PhoenixStatement
对象,并使用 executeQuery()
方法执行 SQL 查询。
然后,可以使用 ResultSet
对象遍历查询结果,并提取所需的字段。在此示例中,遍历了 users
表的结果,并打印了每行的 ID、Name 和 Age。