【大数据】HBase入门学习 3

本文涉及的产品
云原生网关 MSE Higress,422元/月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 【大数据】HBase入门学习

HBase优化

查询优化

设置Scan缓存

在HBase中,可以通过设置Scan对象的setCaching()方法来调整Scan缓存的大小。Scan缓存用于指定每次扫描操作从RegionServer返回给客户端的行数。通过调整缓存大小,可以在一定程度上控制数据的读取性能和网络传输的开销。

以下是设置Scan缓存的示例代码:

Scan scan = new Scan();
scan.setCaching(500); // 设置缓存大小为500行
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
    // 处理扫描结果
}
scanner.close();

在上述示例中,setCaching()方法将缓存大小设置为500行。可以根据实际需求调整这个值,需要根据数据大小、网络带宽和性能要求进行权衡。较大的缓存大小可以减少客户端与RegionServer之间的通信次数,提高读取性能,但同时也会增加内存消耗。较小的缓存大小可以减少内存消耗,但可能会增加通信次数和网络传输开销。

需要注意的是,setCaching()方法设置的是每次扫描的缓存大小,并不是全局的设置。如果需要对整个表的扫描操作生效,需要在每次扫描时都设置缓存大小。

此外,还可以通过调整HBase的配置参数来全局设置缓存大小。在hbase-site.xml配置文件中添加以下参数可以设置默认的缓存大小:

<property>
  <name>hbase.client.scanner.caching</name>
  <value>500</value> <!-- 设置默认的缓存大小为500行 -->
</property>

以上是通过代码和配置文件来设置Scan缓存大小的方法,根据具体的应用场景和需求,可以选择适当的方式进行设置。

显示指定列

当使用Scan或者GET获取大量的行时,最好指定所需要的列,因为服务端通过网络传输到客户端,数据量太大可能是瓶颈。如果能有效过滤部分数据,能很大程度的减少网络I/O的花费。

在HBase中,可以使用ScanGet操作来显示指定的列。下面分别介绍两种方式的用法:

  1. 使用Scan操作显示指定列:
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定列族(cf)和列(col1)
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
    byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1"));
    // 处理列(col1)的值
}
scanner.close();

在上述示例中,使用scan.addColumn()方法来指定要显示的列族和列。在for循环中,通过result.getValue()方法获取指定列的值。

  1. 使用Get操作显示指定列:
Get get = new Get(Bytes.toBytes("row1")); // 指定行键(row1)
get.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定列族(cf)和列(col1)
Result result = table.get(get);
byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1"));
// 处理列(col1)的值

在上述示例中,使用get.addColumn()方法来指定要显示的列族和列。通过table.get()方法获取行数据,并通过result.getValue()方法获取指定列的值。

无论是使用Scan还是Get,都可以通过addColumn()方法来指定要显示的列族和列。可以根据具体的需求,多次调用addColumn()方法来显示多个列。

需要注意的是,HBase中的列是以字节数组(byte[])形式表示的,因此在使用addColumn()getValue()方法时,需要将列族和列名转换为字节数组。

禁用块缓存

如果批量进行全表扫描,默认是有缓存的,如果此时有缓存,会降低扫描的效率。

在HBase中,可以通过设置Scan对象的setCacheBlocks()方法来禁用块缓存。块缓存是HBase中的一种缓存机制,用于加快数据的读取操作。然而,在某些情况下,禁用块缓存可能是有益的,例如对于某些热点数据或者需要立即获取最新数据的场景。

以下是禁用Scan块缓存的示例代码:

Scan scan = new Scan();
scan.setCacheBlocks(false); // 禁用块缓存
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
    // 处理扫描结果
}
scanner.close();

在上述示例中,setCacheBlocks(false)方法将禁用Scan操作的块缓存。

需要注意的是,禁用块缓存可能会增加对HBase存储的实际磁盘读取次数,并且在一些场景下可能导致性能下降。因此,在禁用块缓存之前,建议仔细评估应用需求和场景,确保禁用块缓存的决策是合理的。

对于经常读到的数据,建议使用默认值,开启块缓存。

写入优化

设置AutoFlush

Htable有一个属性是AutoFlush,该属性用于支持客户端的批量更新,默认是true,当客户端每收到一条数据,立刻发送到服务端,如果设置为false,当客户端提交put请求时候,先将该请求在客户端缓存,到达阈值的时候或者执行hbase.flushcommits(),才向RegionServer提交请求。

在HBase中,可以通过设置Table对象的setAutoFlush()方法来控制自动刷新(AutoFlush)行为。AutoFlush决定了在何时将数据从客户端发送到RegionServer并写入到存储中。

以下是设置AutoFlush的示例代码:

// 创建HBase配置对象
Configuration conf = HBaseConfiguration.create();
// 创建HBase连接
Connection connection = ConnectionFactory.createConnection(conf);
// 获取表对象
TableName tableName = TableName.valueOf("your_table_name");
Table table = connection.getTable(tableName);
// 设置AutoFlush
table.setAutoFlush(false);  // 关闭AutoFlush
// 执行写入操作
Put put = new Put(Bytes.toBytes("row1"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("value1"));
table.put(put);
// 手动刷新数据
table.flushCommits();  // 手动刷新数据到RegionServer
// 关闭表和连接
table.close();
connection.close();

在上述示例中,table.setAutoFlush(false)方法将关闭AutoFlush。这意味着在执行写操作时,数据不会立即被刷新到RegionServer和存储中,而是先缓存在客户端的内存中。只有当调用table.flushCommits()方法时,数据才会被手动刷新到RegionServer。

需要注意的是,关闭AutoFlush可以提高写入性能,尤其是在批量写入或者频繁写入的场景中。但是,关闭AutoFlush也会增加数据在客户端内存中的暂存时间,并增加了数据丢失的风险。因此,在关闭AutoFlush时,需要在适当的时机手动调用flushCommits()方法来确保数据的持久性。

同时,还可以通过设置table.setWriteBufferSize()方法来指定客户端写缓冲区的大小。这可以帮助在缓存中存储更多的数据,减少刷新到RegionServer的次数,提高写入性能。例如:

table.setWriteBufferSize(1024 * 1024); // 设置写缓冲区大小为1MB

在上述示例中,将写缓冲区大小设置为1MB。

总之,通过设置table.setAutoFlush(false)table.setWriteBufferSize()方法,可以控制AutoFlush行为和客户端写缓冲区大小,以优化写入性能和数据刷新的策略。根据具体的应用需求和场景,可以进行适当的配置调整。

参数优化

Zookeeper 会话超时时间

属性:zookeeper.session.timeout

解释:默认值为 90000 毫秒(90s)。当某个 RegionServer 挂掉,90s 之后 Master 才能察觉到。可适当减小此值,尽可能快地检测 regionserver 故障,可调整至 20-30s。看你能有都能忍耐超时,同时可以调整重试时间和重试次数

hbase.client.pause(默认值 100ms)

hbase.client.retries.number(默认 15 次)

设置 RPC 监听数量

属性:hbase.regionserver.handler.count

解释:默认值为 30,用于指定 RPC 监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。

手动控制 Major Compaction

属性:hbase.hregion.majorcompaction

解释:默认值:604800000 秒(7 天), Major Compaction 的周期,若关闭自动 Major Compaction,可将其设为 0。如果关闭一定记得自己手动合并,因为大合并非常有意义。

优化 HStore 文件大小

属性:hbase.hregion.max.filesize

解释:默认值 10737418240(10GB),如果需要运行 HBase 的 MR 任务,可以减小此值,因为一个 region 对应一个 map 任务,如果单个 region 过大,会导致 map 任务执行时间。过长。该值的意思就是,如果 HFile 的大小达到这个数值,则这个 region 会被切分为两个 Hfile。

优化 HBase 客户端缓存

属性:hbase.client.write.buffer

解释:默认值 2097152bytes(2M)用于指定 HBase 客户端缓存,增大该值可以减少 RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少 RPC 次数的目的。

指定 scan.next 扫描 HBase 所获取的行数

属性:hbase.client.scanner.caching

解释:用于指定 scan.next 方法获取的默认行数,值越大,消耗内存越大。

SpringBoot中使用HBase

添加 Maven 依赖:

<!-- HBase 2.4.3 依赖 -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.4.3</version>
</dependency>

配置 HBase 连接:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@Configuration
public class HBaseConfig {
    @Bean
    public Connection hbaseConnection() throws IOException {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "localhost");  // HBase ZooKeeper 地址
        config.set("hbase.zookeeper.property.clientPort", "2181");  // HBase ZooKeeper 端口
        return ConnectionFactory.createConnection(config);
    }
}

编写增删改查代码:

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class HBaseService {
    @Autowired
    private Connection hbaseConnection;
    //添加数据
    public void putData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException {
        Table table = hbaseConnection.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        table.put(put);
        table.close();
    }
    //删除数据
    public void deleteData(String tableName, String rowKey) throws IOException {
        Table table = hbaseConnection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
        table.close();
    }
    //获取数据
    public String getData(String tableName, String rowKey, String columnFamily, String column) throws IOException {
        Table table = hbaseConnection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        byte[] valueBytes = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
        table.close();
        return Bytes.toString(valueBytes);
    }
}

在上述代码中,HBaseConfig 类配置了 HBase 连接,通过 hbaseConnection() 方法创建 HBase 连接。HBaseService 类提供了 putData()deleteData()getData() 方法,分别用于插入数据、删除数据和获取数据。

Scan

以下是使用Scan 操作的示例代码:

import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseScanExample {
    public static void main(String[] args) throws IOException {
        // 创建 HBase 配置对象
        Configuration conf = HBaseConfiguration.create();
        // 创建 HBase 连接
        Connection connection = ConnectionFactory.createConnection(conf);
        // 获取表对象
        TableName tableName = TableName.valueOf("your_table_name");
        Table table = connection.getTable(tableName);
        // 创建 Scan 对象
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1")); // 指定要查询的列族和列
        // 执行 Scan 操作
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            // 处理每一行数据
            byte[] row = result.getRow();
            byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1"));
            System.out.println("Row key: " + Bytes.toString(row) + ", Value: " + Bytes.toString(value));
        }
        // 关闭资源
        scanner.close();
        table.close();
        connection.close();
    }
}

在上述代码中,首先创建 HBase 配置对象 Configuration,然后通过 ConnectionFactory 创建 HBase 连接 Connection。接下来,通过连接获取表对象 Table,指定要进行 Scan 操作的表名。然后创建 Scan 对象,并使用 addColumn 方法指定要查询的列族和列。最后,使用 getScanner 方法执行 Scan 操作,并遍历 ResultScanner 获取每一行的数据,并进行处理。

Phoenix

Phoenix是一个开源的基于Apache HBase的关系型数据库引擎,它提供了SQL接口来访问HBase中存储的数据。它在HBase的基础上添加了SQL查询和事务功能,使得使用HBase的开发者可以使用熟悉的SQL语言进行数据操作和查询

Phoenix在HBase中的主要用途包括:

  1. SQL查询:Phoenix允许开发者使用标准的SQL语句来查询和操作HBase中的数据,无需编写复杂的HBase API代码。这简化了开发过程,降低了使用HBase进行数据访问的门槛。
  2. 索引支持:Phoenix提供了对HBase数据的二级索引支持,开发者可以使用SQL语句创建索引,从而加快查询速度。索引在数据查询和过滤中起到重要的作用,提高了数据的检索效率。
  3. 事务支持:Phoenix引入了基于MVCC(多版本并发控制)的事务机制,使得在HBase中进行复杂的事务操作成为可能。开发者可以通过Phoenix的事务功能来保证数据的一致性和可靠性。
  4. SQL函数和聚合:Phoenix支持各种内置的SQL函数和聚合函数,如SUM、COUNT、MAX、MIN等,使得在HBase上进行数据统计和分析变得更加方便。

要在HBase中使用Phoenix,需要先安装并配置好Phoenix。以下是一个在HBase中使用Phoenix的示例代码:

  1. 添加 Maven 依赖: 在 Maven 项目的 pom.xml 文件中添加以下依赖:
<!-- Phoenix 依赖 -->
<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-core</artifactId>
    <version>4.16.0-HBase-2.4</version>
</dependency>
  1. 创建 Phoenix 表: 在 HBase 中创建 Phoenix 表。可以使用 Phoenix 提供的 SQL 语法创建表和定义模式。例如,创建一个名为 users 的表:
CREATE TABLE users (
    id BIGINT PRIMARY KEY,
    name VARCHAR,
    age INTEGER
);
  1. 使用 Phoenix 进行操作: 在 Java 代码中,可以使用 Phoenix 提供的 PhoenixConnectionPhoenixStatement 来执行 SQL 操作。
import java.sql.*;
public class PhoenixExample {
    public static void main(String[] args) throws SQLException {
        // 创建 Phoenix 连接
        String url = "jdbc:phoenix:<HBase ZooKeeper Quorum>:<HBase ZooKeeper Port>";
        Connection connection = DriverManager.getConnection(url);
        // 执行 SQL 查询
        String query = "SELECT * FROM users";
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery(query);
        // 处理查询结果
        while (resultSet.next()) {
            long id = resultSet.getLong("ID");
            String name = resultSet.getString("NAME");
            int age = resultSet.getInt("AGE");
            System.out.println("ID: " + id + ", Name: " + name + ", Age: " + age);
        }
        // 关闭资源
        resultSet.close();
        statement.close();
        connection.close();
    }
}

在上述代码中,需要将 <HBase ZooKeeper Quorum><HBase ZooKeeper Port> 替换为你的 HBase ZooKeeper 地址和端口。

通过创建 PhoenixConnection 并传递正确的 JDBC URL,可以获得连接对象。接下来,可以使用 createStatement() 方法创建 PhoenixStatement 对象,并使用 executeQuery() 方法执行 SQL 查询。

然后,可以使用 ResultSet 对象遍历查询结果,并提取所需的字段。在此示例中,遍历了 users 表的结果,并打印了每行的 ID、Name 和 Age。



相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
48 3
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
1月前
|
数据采集 数据可视化 大数据
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
这篇文章介绍了如何使用Python中的matplotlib和numpy库来创建箱线图,以检测和处理数据集中的异常值。
42 1
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
|
1月前
|
存储 大数据 关系型数据库
HBase系列学习:基础知识
HBase系列学习:基础知识
HBase系列学习:基础知识
|
25天前
|
存储 SQL 分布式计算
大数据学习
【10月更文挑战第15天】
30 1
|
1月前
|
分布式计算 大数据 Linux
大数据体系知识学习(二):WordCount案例实现及错误总结
这篇文章介绍了如何使用PySpark进行WordCount操作,包括环境配置、代码实现、运行结果和遇到的错误。作者在运行过程中遇到了Py4JJavaError和JAVA_HOME未设置的问题,并通过导入findspark初始化和设置环境变量解决了这些问题。文章还讨论了groupByKey和reduceByKey的区别。
26 1
|
1月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
53 1
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
3天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
42 7
|
3天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
14 2