大数据技术之HBase2

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据技术之HBase2

3 HBase API

3.1、环境准备

新建项目后在pom.xml 中添加依赖

注意:会报错javax.el 包不存在,是一个测试用的依赖,不影响使用

<dependencies>
    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>2.4.11</version>
        <exclusions> 
            <exclusion>
                <groupId>org.glassfish</groupId>
                <artifactId>javax.el</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.glassfish</groupId>
        <artifactId>javax.el</artifactId>
        <version>3.0.1-b06</version>
    </dependency>
</dependencies>

3.2、创建连接

根据官方API 介绍,HBase 的客户端连接由ConnectionFactory 类来创建,用户使用完成之后需要手动关闭连接。同时连接是一个重量级的,推荐一个进程使用一个连接,对HBase 的命令通过连接中的两个属性Admin 和Table 来实现


3.2.1、单线程创建连接

package com.atguigu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection; 
import  org.apache.hadoop.hbase.client.Connection; 
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class HBaseConnect {
 public static void main(String[] args) throws IOException {
     // 1. 创建配置对象
     Configuration conf = new Configuration();
     // 2. 添加配置参数
    conf.set("hbase.zookeeper.quorum","hadoop102,hadoop103,hadoop104");
     // 3. 创建 hbase 的连接
     // 默认使用同步连接
     Connection connection = ConnectionFactory.createConnection(conf);
     // 可以使用异步连接
     // 主要影响后续的 DML 操作
     CompletableFuture<AsyncConnection> asyncConnection = ConnectionFactory.createAsyncConnection(conf);
     // 4. 使用连接
     System.out.println(connection);
     // 5. 关闭连接
     connection.close();
     }
}


3.2.2、多线程创建连接

使用类单例模式,确保使用一个连接,可以同时用于多个线程。

 package com.atguigu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class HBaseConnect {
     // 设置静态属性 hbase 连接
     public static Connection connection = null;
     static {
         // 创建 hbase 的连接
         try {
             // 使用配置文件的方法
             connection = ConnectionFactory.createConnection();
         } catch (IOException e) {
             System.out.println("连接获取失败");
             e.printStackTrace();
         }
     }
     /**
     * 连接关闭方法,用于进程关闭时调用
     * @throws IOException
     */
     public static void closeConnection() throws IOException {
         if (connection != null) {
             connection.close();
         }
     }
}

在 resources 文件夹中创建配置文件 hbase-site.xml,添加以下内容

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
     <property>
         <name>hbase.zookeeper.quorum</name>
         <value>hadoop102,hadoop103,hadoop104</value>
     </property>
</configuration>


3.3 、DDL

创建 HBaseDDL 类,添加静态方法即可作为工具类

public class HBaseDDL {
     // 添加静态属性 connection 指向单例连接
     public static Connection connection = HBaseConnect.connection;
}

3.3.1 、创建命名空间

/**
 * 创建命名空间
 * @param namespace 命名空间名称
 */
 public static void createNamespace(String namespace) throws IOException {
     // 1. 获取 admin
     // 此处的异常先不要抛出 等待方法写完 再统一进行处理
     // admin 的连接是轻量级的 不是线程安全的 不推荐池化或者缓存这个连接
     Admin admin = connection.getAdmin();
     // 2. 调用方法创建命名空间
     // 代码相对 shell 更加底层 所以 shell 能够实现的功能 代码一定能实现
     // 所以需要填写完整的命名空间描述
     // 2.1 创建命令空间描述建造者 => 设计师
     NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
     // 2.2 给命令空间添加需求
     builder.addConfiguration("user","atguigu");
     // 2.3 使用 builder 构造出对应的添加完参数的对象 完成创建
     // 创建命名空间出现的问题 都属于本方法自身的问题 不应该抛出
     try {
         admin.createNamespace(builder.build());
     } catch (IOException e) {
         System.out.println("命令空间已经存在");
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
 }


骚戴理解: admin.createNamespace(builder.build());的异常是同try-catch处理的,为什么不直接抛出去呢?这是因为直接抛出去的话,假如出现了异常,那么异常代码后面的代码就都不会执行了,如果是try-catch捕获处理了异常,那么异常代码后的代码都会继续正常的执行


3.3.2 、判断表格是否存在

/**
 * 判断表格是否存在
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @return ture 表示存在
 */
 public static boolean isTableExists(String namespace,String tableName) throws IOException {
     // 1. 获取 admin
     Admin admin = connection.getAdmin();
     // 2. 使用方法判断表格是否存在
     boolean b = false;
     try {
         b = admin.tableExists(TableName.valueOf(namespace, tableName));
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
     // 3. 返回结果
     return b;
     // 后面的代码不能生效
 }

骚戴理解:这里的admin.close();要写在 return b;的前面,因为return表示方法结束了,就直接退出方法了,那写在后面的话是不会执行的!!获取TableName对象的时候不能直接通过new来获取,因为这类的构造方法是私有的,所以通过静态方法TableName.valueOf(namespace, tableName)来获取TableName对象

3.3.3 、创建表

/**
 * 创建表格
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param columnFamilies 列族名称 可以有多个
 */
 public static void createTable(String namespace , String tableName , String...columnFamilies) throws IOException {
     // 判断是否有至少一个列族
     if (columnFamilies.length == 0){
         System.out.println("创建表格至少有一个列族");
         return;
     }
     // 判断表格是否存在
     if (isTableExists(namespace,tableName)){
         System.out.println("表格已经存在");
         return;
     }
     // 1.获取 admin
     Admin admin = connection.getAdmin();
     // 2. 调用方法创建表格
     // 2.1 创建表格描述的建造者
     TableDescriptorBuilder tableDescriptorBuilder = 
     TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));
     // 2.2 添加参数
     for (String columnFamily : columnFamilies) {
     // 2.3 创建列族描述的建造者
         ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
         // 2.4 对应当前的列族添加参数
         // 添加版本参数
         columnFamilyDescriptorBuilder.setMaxVersions(5);
         // 2.5 创建添加完参数的列族描述
        tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
     }
     // 2.6 创建对应的表格描述
     try {
         admin.createTable(tableDescriptorBuilder.build());
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
 }

骚戴理解: String...columnFamilies是可变参数,可以理解为其实就是一个数组,可以传入多个值

3.3.4、 修改表

/**
 * 修改表格中一个列族的版本
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param columnFamily 列族名称
 * @param version 版本
 */
 public static void modifyTable(String namespace ,String tableName,String columnFamily,int version) throws IOException {
     // 判断表格是否存在
     if (!isTableExists(namespace,tableName)){
         System.out.println("表格不存在无法修改");
         return;
     }
     // 1. 获取 admin
     Admin admin = connection.getAdmin();
     try {
         // 2. 调用方法修改表格
         // 2.0 获取之前的表格描述
         TableDescriptor descriptor = 
        admin.getDescriptor(TableName.valueOf(namespace, tableName));
         // 2.1 创建一个表格描述建造者
         // 如果使用填写 tableName 的方法 相当于创建了一个新的表格描述建造
        者 没有之前的信息
         // 如果想要修改之前的信息 必须调用方法填写一个旧的表格描述
         TableDescriptorBuilder tableDescriptorBuilder = 
        TableDescriptorBuilder.newBuilder(descriptor);
         // 2.2 对应建造者进行表格数据的修改
         ColumnFamilyDescriptor columnFamily1 = 
        descriptor.getColumnFamily(Bytes.toBytes(columnFamily));
         // 创建列族描述建造者
         // 需要填写旧的列族描述
         ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 
        ColumnFamilyDescriptorBuilder.newBuilder(columnFamily1);
         // 修改对应的版本
         columnFamilyDescriptorBuilder.setMaxVersions(version);
         // 此处修改的时候 如果填写的新创建 那么别的参数会初始化
        tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorB
        uilder.build());
        admin.modifyTable(tableDescriptorBuilder.build());
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 admin
     admin.close();
 }

骚戴理解:如果想要修改之前的信息 必须调用方法填写一个旧的表格描述,也就是不能像上面创建表描述一样直接新建一个TableDescriptorBuilder表格描述对象,而是通过下面的语句来获取旧的表描述

  TableDescriptor descriptor =admin.getDescriptor(TableName.valueOf(namespace,tableName));

同样,修改列值也要用原来旧的表描述的列族,通过下面的语句来获取旧的列族描述

 ColumnFamilyDescriptor columnFamily1 = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));

3.3.5 、删除表

/**
 * 删除表格
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @return true 表示删除成功
 */
 public static boolean deleteTable(String namespace ,String tableName) throws IOException {
     // 1. 判断表格是否存在
     if (!isTableExists(namespace,tableName)){
         System.out.println("表格不存在 无法删除");
         return false;
     }
     // 2. 获取 admin
     Admin admin = connection.getAdmin();
     // 3. 调用相关的方法删除表格
     try {
         // HBase 删除表格之前 一定要先标记表格为不可以
         TableName tableName1 = TableName.valueOf(namespace, tableName);
         admin.disableTable(tableName1);
         admin.deleteTable(tableName1);
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 4. 关闭 admin
     admin.close();
     return true;
 }


骚戴理解: HBase 删除表格之前 一定要先标记表格为不可以,通过admin.disableTable(tableName1);语句来设置表格为不可用

3.4 、DML

创建类 HBaseDML

public class HBaseDML {
 // 添加静态属性 connection 指向单例连接
 public static Connection connection = HBaseConnect.connection;
}


3.4.1、 插入数据

/**
 * 插入数据
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param rowKey 主键
 * @param columnFamily 列族名称
 * @param columnName 列名
 * @param value 值
 */
 public static void putCell(String namespace,String tableName,String rowKey, String columnFamily,String columnName,String value) throws IOException {
     // 1. 获取 table
     Table table = connection.getTable(TableName.valueOf(namespace, tableName));
     // 2. 调用相关方法插入数据
     // 2.1 创建 put 对象
     Put put = new Put(Bytes.toBytes(rowKey));
     // 2.2. 给 put 对象添加数据
    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));
     // 2.3 将对象写入对应的方法
     try {
         table.put(put);
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 table
     table.close();
 }

3.4.2 、读取数据(读取对应的一行中的某一列)

/**
 * 读取数据 读取对应的一行中的某一列
 *
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param rowKey 主键
 * @param columnFamily 列族名称
 * @param columnName 列名
 */
 public static void getCells(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {
     // 1. 获取 table
     Table table = connection.getTable(TableName.valueOf(namespace, tableName));
     // 2. 创建 get 对象
     Get get = new Get(Bytes.toBytes(rowKey));
     // 如果直接调用 get 方法读取数据 此时读一整行数据
     // 如果想读取某一列的数据 需要添加对应的参数
     get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
     // 设置读取数据的版本
     get.readAllVersions();
     try {
     // 读取数据 得到 result 对象
     Result result = table.get(get);
     // 处理数据
     Cell[] cells = result.rawCells();
     // 测试方法: 直接把读取的数据打印到控制台
     // 如果是实际开发 需要再额外写方法 对应处理数据
     for (Cell cell : cells) {
         // cell 存储数据比较底层
         String value = new String(CellUtil.cloneValue(cell));
         System.out.println(value);
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 关闭 table
     table.close();
 }


骚戴理解:String value = new String(CellUtil.cloneValue(cell));这句是为了防止乱码,通过这句代码把底层的cell转成字符串来方便打印出来

3.4.3 、扫描数据

/**
 * 扫描数据
 *
 * @param namespace 命名空间
 * @param tableName 表格名称
 * @param startRow 开始的 row 包含的
 * @param stopRow 结束的 row 不包含
 */
 public static void scanRows(String namespace, String tableName, String startRow, String stopRow) throws IOException {
     // 1. 获取 table
     Table table = connection.getTable(TableName.valueOf(namespace, tableName));
     // 2. 创建 scan 对象
     Scan scan = new Scan();
     // 如果此时直接调用 会直接扫描整张表
     // 添加参数 来控制扫描的数据
     // 默认包含
     scan.withStartRow(Bytes.toBytes(startRow));
     // 默认不包含
     scan.withStopRow(Bytes.toBytes(stopRow));
     try {
     // 读取多行数据 获得 scanner
     ResultScanner scanner = table.getScanner(scan);
     // result 来记录一行数据 cell 数组
     // ResultScanner 来记录多行数据 result 的数组
     for (Result result : scanner) {
        Cell[] cells = result.rawCells();
     for (Cell cell : cells) {
         System.out.print (new 
            String(CellUtil.cloneRow(cell)) + "-" + new 
            String(CellUtil.cloneFamily(cell)) + "-" + new 
            String(CellUtil.cloneQualifier(cell)) + "-" + new 
            String(CellUtil.cloneValue(cell)) + "\t");
             }
         System.out.println();
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
     // 3. 关闭 table
     table.close();
 }

骚戴理解:result 来记录一行数据 cell 数组, ResultScanner 来记录多行数据 result 的数组这两句话说的非常精准,这里要理解什么是cell才能理解那个双重for循环


Cell由{rowkey, column Family:column Qualifier, timestamp} 唯一确定的单元。cell 中的数据全部是字节码形式存贮。下面就是一个cell,对照下面的图片和代码比较一下

   // result 来记录一行数据 cell 数组
     // ResultScanner 来记录多行数据 result 的数组
     for (Result result : scanner) {
        Cell[] cells = result.rawCells();
     for (Cell cell : cells) {
         System.out.print (new 
            String(CellUtil.cloneRow(cell)) + "-" + new 
            String(CellUtil.cloneFamily(cell)) + "-" + new 
            String(CellUtil.cloneQualifier(cell)) + "-" + new 
            String(CellUtil.cloneValue(cell)) + "\t");
             }
         System.out.println();
         }

对比就可以知道这个Cell[]数组其实里面就是每一行的每一个列值,而不是所有的行!

看下面的打印结果就能知道了

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
153 2
|
2月前
|
存储 分布式计算 NoSQL
【赵渝强老师】大数据技术的理论基础
本文介绍了大数据平台的核心思想,包括Google的三篇重要论文:Google文件系统(GFS)、MapReduce分布式计算模型和BigTable大表。这些论文奠定了大数据生态圈的技术基础,进而发展出了Hadoop、Spark和Flink等生态系统。文章详细解释了GFS的架构、MapReduce的计算过程以及BigTable的思想和HBase的实现。
113 0
|
18天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
57 2
|
1月前
|
SQL 运维 大数据
轻量级的大数据处理技术
现代大数据应用架构中,数据中心作为核心,连接数据源与应用,承担着数据处理与服务的重要角色。然而,随着数据量的激增,数据中心面临运维复杂、体系封闭及应用间耦合性高等挑战。为缓解这些问题,一种轻量级的解决方案——esProc SPL应运而生。esProc SPL通过集成性、开放性、高性能、数据路由和敏捷性等特性,有效解决了现有架构的不足,实现了灵活高效的数据处理,特别适用于应用端的前置计算,降低了整体成本和复杂度。
|
2月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
99 4
|
2月前
|
机器学习/深度学习 存储 大数据
云计算与大数据技术的融合应用
云计算与大数据技术的融合应用
|
2月前
|
SQL 存储 大数据
单机顶集群的大数据技术来了
大数据时代,分布式数仓如MPP成为热门技术,但其高昂的成本让人望而却步。对于多数任务,数据量并未达到PB级,单体数据库即可胜任。然而,由于SQL语法的局限性和计算任务的复杂性,分布式解决方案显得更为必要。esProc SPL作为一种开源轻量级计算引擎,通过高效的算法和存储机制,实现了单机性能超越集群的效果,为低成本、高效能的数据处理提供了新选择。
|
2月前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
2月前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
66 3
|
2月前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
85 2

热门文章

最新文章