HBase实现简单聚合计算

简介:

本文主要记录如何通过打补丁的方式将“hbase中实现简单聚合计算”的特性引入hbase源代码中,并介绍通过命令行和java代码的使用方法。

支持的简单聚合计算,包括:

  • rowcount
  • min
  • max
  • sum
  • std
  • avg
  • median

1、 下载并编译hbase源代码

我这里使用的HBase源代码版本是:cdh4-0.94.6_4.3.0,如果你使用其他版本,有可能patch打不上。

2、 引入patch

基于提交日志add-aggregate-support-in-hbase-shell生成patch文件,然后打patch,或者也可以使用其他方法:

$ git apply add-aggregate-in-hbase-shell.patch

该patch所做修改包括如下文件:

src/main/java/org/apache/hadoop/hbase/client/coprocessor/AbstractDoubleColumnInterpreter.java
src/main/java/org/apache/hadoop/hbase/client/coprocessor/AbstractLongColumnInterpreter.java
src/main/java/org/apache/hadoop/hbase/client/coprocessor/CompositeDoubleStrColumnInterpreter.java
src/main/java/org/apache/hadoop/hbase/client/coprocessor/CompositeLongStrColumnInterpreter.java
src/main/java/org/apache/hadoop/hbase/client/coprocessor/DoubleColumnInterpreter.java
src/main/java/org/apache/hadoop/hbase/client/coprocessor/DoubleStrColumnInterpreter.java
src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongStrColumnInterpreter.java
src/main/ruby/hbase.rb
src/main/ruby/hbase/coprocessor.rb
src/main/ruby/hbase/hbase.rb
src/main/ruby/shell.rb
src/main/ruby/shell/commands.rb
src/main/ruby/shell/commands/aggregate.rb

3、 编译源代码

为了使编译花费时间不会太长,请运行如下命令编译代码,你也可以自己修改下面命令:

$ MAVEN_OPTS="-Xmx2g" mvn clean install  -DskipTests -Prelease,security -Drat.numUnapprovedLicenses=200 -Dhadoop.profile=2.0

4、测试

然后将target目录下生成的jar包拷贝到集群中每个hbase节点的/usr/lib/hbase目录。

修改hbase-site.xml配置文件,添加如下配置:

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>

重启hbase服务:

$ /etc/init.d/hbase-master restart
$ /etc/init.d/hbase-regionserver restart

a)运行hbase shell进行测试

首先创建表并插入几条记录:

create 't','f'
 
put 't','1','f:id','1'
put 't','2','f:id','2'
put 't','2','f:id','3'
put 't','3','f:id','4'

在hbase shell命令行中输入agg并按tab键自动提示:

hbase(main):004:0> aggregate

什么参数不输入,提示如下:

hbase(main):004:0> aggregate
 
ERROR: wrong number of arguments (0 for 2)
 
Here is some help for this command:
Execute a Coprocessor aggregation function; pass aggregation function name, table name, column name, column interpreter and optionally a dictionary of aggregation specifications. Aggregation specifications may include STARTROW, STOPROW or FILTER. For a cross-site big table, if no clusters are specified, all clusters will be counted for aggregation.
Usage: aggregate 'subcommand','table','column',[{COLUMN_INTERPRETER => org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter.new, STARTROW => 'abc', STOPROW => 'def', FILTER => org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)}]
Available subcommands:
rowcount
min
max
sum
std
avg
median
Available COLUMN_INTERPRETER:
org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter.new
org.apache.hadoop.hbase.client.coprocessor.LongStrColumnInterpreter.new
org.apache.hadoop.hbase.client.coprocessor.CompositeLongStrColumnInterpreter.new(",", 0)
The default COLUMN_INTERPRETER is org.apache.hadoop.hbase.client.coprocessor.LongStrColumnInterpreter.new.
 
Some examples:
 
hbase> aggregate 'min','t1','f1:c1'
hbase> aggregate 'sum','t1','f1:c1','f1:c2'
hbase> aggregate 'rowcount','t1','f1:c1' ,{COLUMN_INTERPRETER => org.apache.hadoop.hbase.client.coprocessor.CompositeLongStrColumnInterpreter.new(",", 0)}
hbase> aggregate 'min','t1','f1:c1',{STARTROW => 'abc', STOPROW => 'def'}

从上可以看到aggregate的帮助说明。

接下来进行测试,例如求id列的最小值:

hbase(main):005:0> aggregate 'min','t','f:id'
The result of min for table t is 1
0 row(s) in 0.0170 seconds
 
hbase(main):006:0> aggregate 'avg','t','f:id'
The result of avg for table t is 2.5
0 row(s) in 0.0170 seconds

正确输出结果,表明测试成功。

b)通过hbase client测试

创建AggregateTest.java类并添加如下代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongStrColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;
public class AggregateTest {
  public static void main(String[] args) {
    Configuration conf = HBaseConfiguration.create();
    conf.setInt("hbase.client.retries.number", 1);
    conf.setInt("ipc.client.connect.max.retries", 1);
     
    byte[] table = Bytes.toBytes("t");
    Scan scan = new Scan();
    scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("id"));
    final ColumnInterpreter<Long, Long> columnInterpreter = new LongStrColumnInterpreter();
    try {
      AggregationClient aClient = new AggregationClient(conf);
      Long rowCount = aClient.min(table, columnInterpreter, scan);
      System.out.println("The result is " + rowCount);
    } catch (Throwable e) {
      e.printStackTrace();
    }
  }
}

运行该类并查看输出结果。

以上源代码及所做的修改我已提交到我github仓库上hbase的cdh4-0.94.15_4.7.0分支,见提交日志add-aggregate-support-in-hbase-shell

相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
6月前
|
SQL 存储 分布式数据库
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
|
SQL 分布式计算 监控
云HBase X-Pack解决传统数据仓库瓶颈,赋能客户计算分析业务
某游戏公司随着业务快速发展,用户行为日志快速增长,需要从海量的点击流日志和激活日志中挖掘数据的价值,比如广告转化率、激活率,每日安装用户成本等等。原来使用GreenPlum做实时计算和统计分析遇到一些瓶颈,最终使用阿里云HBase X-Pack构建了满足业务需求的数据处理平台。
8544 0
|
分布式数据库 Hbase
HBase计算表的总count
不解释,看代码去吧,很简单。 private long findBatterHisDataCount(BatteryHisDto dto) throws Exception{ long totalSize=0; Table table=runner.
974 0
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
82 0
|
8月前
|
SQL 分布式计算 Hadoop
Hadoop集群hbase的安装
Hadoop集群hbase的安装
140 0
|
4月前
|
分布式计算 Hadoop 关系型数据库
Hadoop任务scan Hbase 导出数据量变小分析
Hadoop任务scan Hbase 导出数据量变小分析
53 0
|
3月前
|
存储 分布式计算 Hadoop
Hadoop中的HBase是什么?请解释其作用和用途。
Hadoop中的HBase是什么?请解释其作用和用途。
40 0
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.16 安装HBase
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
82 1
Hadoop学习笔记(HDP)-Part.16 安装HBase
|
8月前
|
分布式计算 Hadoop 分布式数据库
开机时监听Hadoop和Zookpeer启动之后再启动Hbase
开机时监听Hadoop和Zookpeer启动之后再启动Hbase
|
8月前
|
存储 分布式计算 Hadoop
Hadoop之Hbase安装和配置
Hadoop之Hbase安装和配置
714 0