Hbase Java编程

简介: Hbase Java编程

Hbase Java编程

6.1 需求与数据集

某某自来水公司,需要存储大量的缴费明细数据。以下截取了缴费明细的一部分内容。

用户id 姓名 用户地址 性别 缴费时间 表示数(本次) 表示数(上次) 用量(立方) 合计金额 查表日期 最迟缴费日期

4944191 登卫红 贵州省铜仁市德江县7单元267室 男 2020-05-10 308.1 283.1 25 150 2020-04-25 2020-06-09

因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。

6.2 准备工作

6.2.1 创建IDEA Maven项目

groupId cn.itcast

artifactId hbase_op

6.2.2 导入pom依赖

<repositories><!-- 代码库 -->
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
        </snapshots>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testng</groupId>
        <artifactId>testng</artifactId>
        <version>6.14.3</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <target>1.8</target>
                <source>1.8</source>
            </configuration>
        </plugin>
    </plugins>
</build>

6.2.3 复制HBase和Hadoop配置文件

将以下三个配置文件复制到resource目录中

hbase-site.xml

从Linux中下载:sz /export/server/hbase-2.1.0/conf/hbase-site.xml

core-site.xml

从Linux中下载:sz /export/server/hadoop-2.7.5/etc/hadoop/core-site.xml

log4j.properties

注意:请确认配置文件中的服务器节点hostname/ip地址配置正确

6.2.4 创建包结构和类

1.在test目录创建 cn.itcast.hbase.admin.api_test 包结构

2.创建TableAmdinTest类

6.2.5 创建Hbase连接以及admin管理对象

要操作Hbase也需要建立Hbase的连接。此处我们仍然使用TestNG来编写测试。使用@BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。

实现步骤:

1.使用HbaseConfiguration.create()创建Hbase配置

2.使用ConnectionFactory.createConnection()创建Hbase连接

3.要创建表,需要基于Hbase连接获取admin管理对象

4.使用admin.close、connection.close关闭连接

参考代码:

public class TableAmdinTest {
    private Configuration configuration;
    private Connection connection;
    private Admin admin;
    @BeforeTest
    public void beforeTest() throws IOException {
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        admin = connection.getAdmin();
    }
    @AfterTest
    public void afterTest() throws IOException {
        admin.close();
        connection.close();
    }
}

6.3 需求一:使用Java代码创建表

创建一个名为WATER_BILL的表,包含一个列蔟C1。

实现步骤:

1.判断表是否存在

a)存在,则退出

2.使用TableDescriptorBuilder.newBuilder构建表描述构建器

3.使用ColumnFamilyDescriptorBuilder.newBuilder构建列蔟描述构建器

4.构建列蔟描述,构建表描述

5.创建表

参考代码:

// 创建一个名为WATER_BILL的表,包含一个列蔟C1
@Test
public void createTableTest() throws IOException {
    // 表名
    String TABLE_NAME = "WATER_BILL";
    // 列蔟名
    String COLUMN_FAMILY = "C1";
    // 1. 判断表是否存在
    if(admin.tableExists(TableName.valueOf(TABLE_NAME))) {
        return;
    }
    // 2. 构建表描述构建器
    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME));
    // 3. 构建列蔟描述构建器
    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY));
    // 4. 构建列蔟描述
    ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
    // 5. 构建表描述
    // 添加列蔟
    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
    TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
    // 6. 创建表
    admin.createTable(tableDescriptor);
}

6.4 需求三:使用Java代码删除表

实现步骤:

1.判断表是否存在

2.如果存在,则禁用表

3.再删除表

参考代码:

// 删除表
@Test
public void dropTable() throws IOException {
    // 表名
    TableName tableName = TableName.valueOf("WATER_BILL");
    // 1. 判断表是否存在
    if(admin.tableExists(tableName)) {
        // 2. 禁用表
        admin.disableTable(tableName);
        // 3. 删除表
        admin.deleteTable(tableName);
    }
}

6.5 需求二:往表中插入一条数据

6.5.1 创建包

1.在 test 目录中创建 cn.itcast.hbase.data.api_test 包

2.创建DataOpTest类

6.5.2 初始化Hbase连接

在@BeforeTest中初始化HBase连接,在@AfterTest中关闭Hbase连接。

参考代码:

public class DataOpTest {
    private Configuration configuration;
    private Connection connection;
    @BeforeTest
    public void beforeTest() throws IOException {
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
    }
    @AfterTest
    public void afterTest() throws IOException {
        connection.close();
    }
}

6.5.3 插入姓名列数据

在表中插入一个行,该行只包含一个列。

ROWKEY  姓名(列名:NAME)
4944191 登卫红

实现步骤:

1.使用Hbase连接获取Htable

2.构建ROWKEY、列蔟名、列名

3.构建Put对象(对应put命令)

4.添加姓名列

5.使用Htable表对象执行put操作

6.关闭Htable表对象

参考代码:

@Test
public void addTest() throws IOException {
    // 1.使用Hbase连接获取Htable
    TableName waterBillTableName = TableName.valueOf("WATER_BILL");
    Table waterBillTable = connection.getTable(waterBillTableName);
    // 2.构建ROWKEY、列蔟名、列名
    String rowkey = "4944191";
    String cfName = "C1";
    String colName = "NAME";
    // 3.构建Put对象(对应put命令)
    Put put = new Put(Bytes.toBytes(rowkey));
    // 4.添加姓名列
    put.addColumn(Bytes.toBytes(cfName)
        , Bytes.toBytes(colName)
        , Bytes.toBytes("登卫红"));
    // 5.使用Htable表对象执行put操作
    waterBillTable.put(put);
    // 6. 关闭表
    waterBillTable.close();
}

6.5.4 查看HBase中的数据

get 'WATER_BILL','4944191',{FORMATTER => 'toString'}

6.5.5 插入其他列

列名 说明 值

ADDRESS 用户地址 贵州省铜仁市德江县7单元267室

SEX 性别 男

PAY_DATE 缴费时间 2020-05-10

NUM_CURRENT 表示数(本次) 308.1

NUM_PREVIOUS 表示数(上次) 283.1

NUM_USAGE 用量(立方) 25

TOTAL_MONEY 合计金额 150

RECORD_DATE 查表日期 2020-04-25

LATEST_DATE 最迟缴费日期 2020-06-09

参考代码:

@Test
public void addTest() throws IOException {
    // 1.使用Hbase连接获取Htable
    TableName waterBillTableName = TableName.valueOf("WATER_BILL");
    Table waterBillTable = connection.getTable(waterBillTableName);
    // 2.构建ROWKEY、列蔟名、列名
    String rowkey = "4944191";
    String cfName = "C1";
    String colName = "NAME";
    String colADDRESS = "ADDRESS";
    String colSEX = "SEX";
    String colPAY_DATE = "PAY_DATE";
    String colNUM_CURRENT = "NUM_CURRENT";
    String colNUM_PREVIOUS = "NUM_PREVIOUS";
    String colNUM_USAGE = "NUM_USAGE";
    String colTOTAL_MONEY = "TOTAL_MONEY";
    String colRECORD_DATE = "RECORD_DATE";
    String colLATEST_DATE = "LATEST_DATE";
    // 3.构建Put对象(对应put命令)
    Put put = new Put(Bytes.toBytes(rowkey));
    // 4.添加姓名列
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colName)
            , Bytes.toBytes("登卫红"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colADDRESS)
            , Bytes.toBytes("贵州省铜仁市德江县7单元267室"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colSEX)
            , Bytes.toBytes("男"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colPAY_DATE)
            , Bytes.toBytes("2020-05-10"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colNUM_CURRENT)
            , Bytes.toBytes("308.1"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colNUM_PREVIOUS)
            , Bytes.toBytes("283.1"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colNUM_USAGE)
            , Bytes.toBytes("25"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colTOTAL_MONEY)
            , Bytes.toBytes("150"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colRECORD_DATE)
            , Bytes.toBytes("2020-04-25"));
    put.addColumn(Bytes.toBytes(cfName)
            , Bytes.toBytes(colLATEST_DATE)
            , Bytes.toBytes("2020-06-09"));
    // 5.使用Htable表对象执行put操作
    waterBillTable.put(put);
    // 6. 关闭表
    waterBillTable.close();
}

6.6 需求三:查看一条数据

查询rowkey为4944191的所有列的数据,并打印出来。

实现步骤:

1.获取HTable

2.使用rowkey构建Get对象

3.执行get请求

4.获取所有单元格

5.打印rowkey

6.迭代单元格列表

7.关闭表

参考代码:

@Test
public void getOneTest() throws IOException {
    // 1. 获取HTable
    TableName waterBillTableName = TableName.valueOf("WATER_BILL");
    Table waterBilltable = connection.getTable(waterBillTableName);
    // 2. 使用rowkey构建Get对象
    Get get = new Get(Bytes.toBytes("4944191"));
    // 3. 执行get请求
    Result result = waterBilltable.get(get);
    // 4. 获取所有单元格
    List<Cell> cellList = result.listCells();
    // 打印rowkey
    System.out.println("rowkey => " + Bytes.toString(result.getRow()));
    // 5. 迭代单元格列表
    for (Cell cell : cellList) {
        // 打印列蔟名
        System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
        System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
    }
    // 6. 关闭表
    waterBilltable.close();
}

6.7 需求四:删除一条数据

删除rowkey为4944191的整条数据。

实现步骤:

1.获取HTable对象

2.根据rowkey构建delete对象

3.执行delete请求

4.关闭表

参考代码:

// 删除rowkey为4944191的整条数据
@Test
public void deleteOneTest() throws IOException {
    // 1. 获取HTable对象
    Table waterBillTable = connection.getTable(TableName.valueOf("WATER_BILL"));
    // 2. 根据rowkey构建delete对象
    Delete delete = new Delete(Bytes.toBytes("4944191"));
    // 3. 执行delete请求
    waterBillTable.delete(delete);
    // 4. 关闭表
    waterBillTable.close();
}

6.8 需求五:导入数据

6.8.1 需求

在资料中,有一份10W的抄表数据文件,我们需要将这里面的数据导入到HBase中。

6.8.2 Import JOB

在HBase中,有一个Import的MapReduce作业,可以专门用来将数据文件导入到HBase中。

用法

hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径

6.8.3 导入数据

1.将资料中数据文件上传到Linux中

2.再将文件上传到hdfs中

hadoop fs -mkdir -p /water_bill/output_ept_10W
hadoop fs -put part-m-00000_10w /water_bill/output_ept_10W

3.启动YARN集群

start-yarn.sh

4.使用以下方式来进行数据导入

hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/output_ept_10W

6.8.4 导出数据

hbase org.apache.hadoop.hbase.mapreduce.Export WATER_BILL /water_bill/output_ept_10W_export

6.9 需求六:查询2020年6月份所有用户的用水量

6.9.1 需求分析

在Java API中,我们也是使用scan + filter来实现过滤查询。2020年6月份其实就是从2020年6月1日到2020年6月30日的所有抄表数据。

6.9.2 准备工作

1.在cn.itcast.hbase.data.api_test包下创建ScanFilterTest类

2.使用@BeforeTest、@AfterTest构建HBase连接、以及关闭HBase连接

6.9.3 实现

实现步骤:

1.获取表

2.构建scan请求对象

3.构建两个过滤器

a)构建两个日期范围过滤器(注意此处请使用RECORD_DATE——抄表日期比较

b)构建过滤器列表

4.执行scan扫描请求

5.迭代打印result

6.迭代单元格列表

7.关闭ResultScanner(这玩意把转换成一个个的类似get的操作,注意要关闭释放资源)

8.关闭表

参考代码:

// 查询2020年6月份所有用户的用水量数据
@Test
public void queryTest1() throws IOException {
    // 1. 获取表
    Table waterBillTable = connection.getTable(TableName.valueOf("WATER_BILL"));
    // 2. 构建scan请求对象
    Scan scan = new Scan();
    // 3. 构建两个过滤器
    // 3.1 构建日期范围过滤器(注意此处请使用RECORD_DATE——抄表日期比较
    SingleColumnValueFilter startDateFilter = new SingleColumnValueFilter(Bytes.toBytes("C1")
            , Bytes.toBytes("RECORD_DATE")
            , CompareOperator.GREATER_OR_EQUAL
            , Bytes.toBytes("2020-06-01"));
    SingleColumnValueFilter endDateFilter = new SingleColumnValueFilter(Bytes.toBytes("C1")
            , Bytes.toBytes("RECORD_DATE")
            , CompareOperator.LESS_OR_EQUAL
            , Bytes.toBytes("2020-06-30"));
    // 3.2 构建过滤器列表
    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL
            , startDateFilter
            , endDateFilter);
    scan.setFilter(filterList);
    // 4. 执行scan扫描请求
    ResultScanner resultScan = waterBillTable.getScanner(scan);
    // 5. 迭代打印result
    for (Result result : resultScan) {
        System.out.println("rowkey -> " + Bytes.toString(result.getRow()));
        System.out.println("------");
        List<Cell> cellList = result.listCells();
        // 6. 迭代单元格列表
        for (Cell cell : cellList) {
            // 打印列蔟名
            System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
            System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
        }
        System.out.println("------");
    }
resultScanner.close();
    // 7. 关闭表
    waterBillTable.close();
}

6.9.4 解决乱码问题

因为前面我们的代码,在打印所有的列时,都是使用字符串打印的,Hbase中如果存储的是int、double,那么有可能就会乱码了。

System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));

要解决的话,我们可以根据列来判断,使用哪种方式转换字节码。如下:

1.NUM_CURRENT

2.NUM_PREVIOUS

3.NUM_USAGE

4.TOTAL_MONEY

这4列使用double类型展示,其他的使用string类型展示。

参考代码:

String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
System.out.print(colName);
if(colName.equals("NUM_CURRENT")
        || colName.equals("NUM_PREVIOUS")
        || colName.equals("NUM_USAGE")
        || colName.equals("TOTAL_MONEY")) {
    System.out.println(" => " + Bytes.toDouble(cell.getValueArray(), cell.getValueOffset()));
}
else {
    System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
2月前
|
安全 Java UED
深入浅出Java多线程编程
【10月更文挑战第40天】在Java的世界中,多线程是提升应用性能和响应能力的关键。本文将通过浅显易懂的方式介绍Java中的多线程编程,从基础概念到高级特性,再到实际应用案例,带你一步步深入了解如何在Java中高效地使用多线程。文章不仅涵盖了理论知识,还提供了实用的代码示例,帮助你在实际开发中更好地应用多线程技术。
74 5
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
30天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
1月前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
67 12
|
1月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
167 2
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
66 3

热门文章

最新文章