Hbase Java编程
6.1 需求与数据集
某某自来水公司,需要存储大量的缴费明细数据。以下截取了缴费明细的一部分内容。
用户id 姓名 用户地址 性别 缴费时间 表示数(本次) 表示数(上次) 用量(立方) 合计金额 查表日期 最迟缴费日期
4944191 登卫红 贵州省铜仁市德江县7单元267室 男 2020-05-10 308.1 283.1 25 150 2020-04-25 2020-06-09
因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。
6.2 准备工作
6.2.1 创建IDEA Maven项目
groupId cn.itcast
artifactId hbase_op
6.2.2 导入pom依赖
<repositories><!-- 代码库 --> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>6.14.3</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <target>1.8</target> <source>1.8</source> </configuration> </plugin> </plugins> </build>
6.2.3 复制HBase和Hadoop配置文件
将以下三个配置文件复制到resource目录中
hbase-site.xml
从Linux中下载:sz /export/server/hbase-2.1.0/conf/hbase-site.xml
core-site.xml
从Linux中下载:sz /export/server/hadoop-2.7.5/etc/hadoop/core-site.xml
log4j.properties
注意:请确认配置文件中的服务器节点hostname/ip地址配置正确
6.2.4 创建包结构和类
1.在test目录创建 cn.itcast.hbase.admin.api_test 包结构
2.创建TableAmdinTest类
6.2.5 创建Hbase连接以及admin管理对象
要操作Hbase也需要建立Hbase的连接。此处我们仍然使用TestNG来编写测试。使用@BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。
实现步骤:
1.使用HbaseConfiguration.create()创建Hbase配置
2.使用ConnectionFactory.createConnection()创建Hbase连接
3.要创建表,需要基于Hbase连接获取admin管理对象
4.使用admin.close、connection.close关闭连接
参考代码:
public class TableAmdinTest { private Configuration configuration; private Connection connection; private Admin admin; @BeforeTest public void beforeTest() throws IOException { configuration = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } @AfterTest public void afterTest() throws IOException { admin.close(); connection.close(); } }
6.3 需求一:使用Java代码创建表
创建一个名为WATER_BILL的表,包含一个列蔟C1。
实现步骤:
1.判断表是否存在
a)存在,则退出
2.使用TableDescriptorBuilder.newBuilder构建表描述构建器
3.使用ColumnFamilyDescriptorBuilder.newBuilder构建列蔟描述构建器
4.构建列蔟描述,构建表描述
5.创建表
参考代码:
// 创建一个名为WATER_BILL的表,包含一个列蔟C1 @Test public void createTableTest() throws IOException { // 表名 String TABLE_NAME = "WATER_BILL"; // 列蔟名 String COLUMN_FAMILY = "C1"; // 1. 判断表是否存在 if(admin.tableExists(TableName.valueOf(TABLE_NAME))) { return; } // 2. 构建表描述构建器 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME)); // 3. 构建列蔟描述构建器 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY)); // 4. 构建列蔟描述 ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build(); // 5. 构建表描述 // 添加列蔟 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); // 6. 创建表 admin.createTable(tableDescriptor); }
6.4 需求三:使用Java代码删除表
实现步骤:
1.判断表是否存在
2.如果存在,则禁用表
3.再删除表
参考代码:
// 删除表 @Test public void dropTable() throws IOException { // 表名 TableName tableName = TableName.valueOf("WATER_BILL"); // 1. 判断表是否存在 if(admin.tableExists(tableName)) { // 2. 禁用表 admin.disableTable(tableName); // 3. 删除表 admin.deleteTable(tableName); } }
6.5 需求二:往表中插入一条数据
6.5.1 创建包
1.在 test 目录中创建 cn.itcast.hbase.data.api_test 包
2.创建DataOpTest类
6.5.2 初始化Hbase连接
在@BeforeTest中初始化HBase连接,在@AfterTest中关闭Hbase连接。
参考代码:
public class DataOpTest { private Configuration configuration; private Connection connection; @BeforeTest public void beforeTest() throws IOException { configuration = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(configuration); } @AfterTest public void afterTest() throws IOException { connection.close(); } }
6.5.3 插入姓名列数据
在表中插入一个行,该行只包含一个列。
ROWKEY 姓名(列名:NAME) 4944191 登卫红
实现步骤:
1.使用Hbase连接获取Htable
2.构建ROWKEY、列蔟名、列名
3.构建Put对象(对应put命令)
4.添加姓名列
5.使用Htable表对象执行put操作
6.关闭Htable表对象
参考代码:
@Test public void addTest() throws IOException { // 1.使用Hbase连接获取Htable TableName waterBillTableName = TableName.valueOf("WATER_BILL"); Table waterBillTable = connection.getTable(waterBillTableName); // 2.构建ROWKEY、列蔟名、列名 String rowkey = "4944191"; String cfName = "C1"; String colName = "NAME"; // 3.构建Put对象(对应put命令) Put put = new Put(Bytes.toBytes(rowkey)); // 4.添加姓名列 put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colName) , Bytes.toBytes("登卫红")); // 5.使用Htable表对象执行put操作 waterBillTable.put(put); // 6. 关闭表 waterBillTable.close(); }
6.5.4 查看HBase中的数据
get 'WATER_BILL','4944191',{FORMATTER => 'toString'}
6.5.5 插入其他列
列名 说明 值
ADDRESS 用户地址 贵州省铜仁市德江县7单元267室
SEX 性别 男
PAY_DATE 缴费时间 2020-05-10
NUM_CURRENT 表示数(本次) 308.1
NUM_PREVIOUS 表示数(上次) 283.1
NUM_USAGE 用量(立方) 25
TOTAL_MONEY 合计金额 150
RECORD_DATE 查表日期 2020-04-25
LATEST_DATE 最迟缴费日期 2020-06-09
参考代码:
@Test public void addTest() throws IOException { // 1.使用Hbase连接获取Htable TableName waterBillTableName = TableName.valueOf("WATER_BILL"); Table waterBillTable = connection.getTable(waterBillTableName); // 2.构建ROWKEY、列蔟名、列名 String rowkey = "4944191"; String cfName = "C1"; String colName = "NAME"; String colADDRESS = "ADDRESS"; String colSEX = "SEX"; String colPAY_DATE = "PAY_DATE"; String colNUM_CURRENT = "NUM_CURRENT"; String colNUM_PREVIOUS = "NUM_PREVIOUS"; String colNUM_USAGE = "NUM_USAGE"; String colTOTAL_MONEY = "TOTAL_MONEY"; String colRECORD_DATE = "RECORD_DATE"; String colLATEST_DATE = "LATEST_DATE"; // 3.构建Put对象(对应put命令) Put put = new Put(Bytes.toBytes(rowkey)); // 4.添加姓名列 put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colName) , Bytes.toBytes("登卫红")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colADDRESS) , Bytes.toBytes("贵州省铜仁市德江县7单元267室")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colSEX) , Bytes.toBytes("男")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colPAY_DATE) , Bytes.toBytes("2020-05-10")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colNUM_CURRENT) , Bytes.toBytes("308.1")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colNUM_PREVIOUS) , Bytes.toBytes("283.1")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colNUM_USAGE) , Bytes.toBytes("25")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colTOTAL_MONEY) , Bytes.toBytes("150")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colRECORD_DATE) , Bytes.toBytes("2020-04-25")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colLATEST_DATE) , Bytes.toBytes("2020-06-09")); // 5.使用Htable表对象执行put操作 waterBillTable.put(put); // 6. 关闭表 waterBillTable.close(); }
6.6 需求三:查看一条数据
查询rowkey为4944191的所有列的数据,并打印出来。
实现步骤:
1.获取HTable
2.使用rowkey构建Get对象
3.执行get请求
4.获取所有单元格
5.打印rowkey
6.迭代单元格列表
7.关闭表
参考代码:
@Test public void getOneTest() throws IOException { // 1. 获取HTable TableName waterBillTableName = TableName.valueOf("WATER_BILL"); Table waterBilltable = connection.getTable(waterBillTableName); // 2. 使用rowkey构建Get对象 Get get = new Get(Bytes.toBytes("4944191")); // 3. 执行get请求 Result result = waterBilltable.get(get); // 4. 获取所有单元格 List<Cell> cellList = result.listCells(); // 打印rowkey System.out.println("rowkey => " + Bytes.toString(result.getRow())); // 5. 迭代单元格列表 for (Cell cell : cellList) { // 打印列蔟名 System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } // 6. 关闭表 waterBilltable.close(); }
6.7 需求四:删除一条数据
删除rowkey为4944191的整条数据。
实现步骤:
1.获取HTable对象
2.根据rowkey构建delete对象
3.执行delete请求
4.关闭表
参考代码:
// 删除rowkey为4944191的整条数据 @Test public void deleteOneTest() throws IOException { // 1. 获取HTable对象 Table waterBillTable = connection.getTable(TableName.valueOf("WATER_BILL")); // 2. 根据rowkey构建delete对象 Delete delete = new Delete(Bytes.toBytes("4944191")); // 3. 执行delete请求 waterBillTable.delete(delete); // 4. 关闭表 waterBillTable.close(); }
6.8 需求五:导入数据
6.8.1 需求
在资料中,有一份10W的抄表数据文件,我们需要将这里面的数据导入到HBase中。
6.8.2 Import JOB
在HBase中,有一个Import的MapReduce作业,可以专门用来将数据文件导入到HBase中。
用法
hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径
6.8.3 导入数据
1.将资料中数据文件上传到Linux中
2.再将文件上传到hdfs中
hadoop fs -mkdir -p /water_bill/output_ept_10W hadoop fs -put part-m-00000_10w /water_bill/output_ept_10W
3.启动YARN集群
start-yarn.sh
4.使用以下方式来进行数据导入
hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/output_ept_10W
6.8.4 导出数据
hbase org.apache.hadoop.hbase.mapreduce.Export WATER_BILL /water_bill/output_ept_10W_export
6.9 需求六:查询2020年6月份所有用户的用水量
6.9.1 需求分析
在Java API中,我们也是使用scan + filter来实现过滤查询。2020年6月份其实就是从2020年6月1日到2020年6月30日的所有抄表数据。
6.9.2 准备工作
1.在cn.itcast.hbase.data.api_test包下创建ScanFilterTest类
2.使用@BeforeTest、@AfterTest构建HBase连接、以及关闭HBase连接
6.9.3 实现
实现步骤:
1.获取表
2.构建scan请求对象
3.构建两个过滤器
a)构建两个日期范围过滤器(注意此处请使用RECORD_DATE——抄表日期比较
b)构建过滤器列表
4.执行scan扫描请求
5.迭代打印result
6.迭代单元格列表
7.关闭ResultScanner(这玩意把转换成一个个的类似get的操作,注意要关闭释放资源)
8.关闭表
参考代码:
// 查询2020年6月份所有用户的用水量数据 @Test public void queryTest1() throws IOException { // 1. 获取表 Table waterBillTable = connection.getTable(TableName.valueOf("WATER_BILL")); // 2. 构建scan请求对象 Scan scan = new Scan(); // 3. 构建两个过滤器 // 3.1 构建日期范围过滤器(注意此处请使用RECORD_DATE——抄表日期比较 SingleColumnValueFilter startDateFilter = new SingleColumnValueFilter(Bytes.toBytes("C1") , Bytes.toBytes("RECORD_DATE") , CompareOperator.GREATER_OR_EQUAL , Bytes.toBytes("2020-06-01")); SingleColumnValueFilter endDateFilter = new SingleColumnValueFilter(Bytes.toBytes("C1") , Bytes.toBytes("RECORD_DATE") , CompareOperator.LESS_OR_EQUAL , Bytes.toBytes("2020-06-30")); // 3.2 构建过滤器列表 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL , startDateFilter , endDateFilter); scan.setFilter(filterList); // 4. 执行scan扫描请求 ResultScanner resultScan = waterBillTable.getScanner(scan); // 5. 迭代打印result for (Result result : resultScan) { System.out.println("rowkey -> " + Bytes.toString(result.getRow())); System.out.println("------"); List<Cell> cellList = result.listCells(); // 6. 迭代单元格列表 for (Cell cell : cellList) { // 打印列蔟名 System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } System.out.println("------"); } resultScanner.close(); // 7. 关闭表 waterBillTable.close(); }
6.9.4 解决乱码问题
因为前面我们的代码,在打印所有的列时,都是使用字符串打印的,Hbase中如果存储的是int、double,那么有可能就会乱码了。
System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
要解决的话,我们可以根据列来判断,使用哪种方式转换字节码。如下:
1.NUM_CURRENT
2.NUM_PREVIOUS
3.NUM_USAGE
4.TOTAL_MONEY
这4列使用double类型展示,其他的使用string类型展示。
参考代码:
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); System.out.print(colName); if(colName.equals("NUM_CURRENT") || colName.equals("NUM_PREVIOUS") || colName.equals("NUM_USAGE") || colName.equals("TOTAL_MONEY")) { System.out.println(" => " + Bytes.toDouble(cell.getValueArray(), cell.getValueOffset())); } else { System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); }