开发者学堂课程【HBase 入门与实战:基于 HBase 的大数据在线分析】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/808/detail/13897
基于HBase的大数据在线分析
内容介绍
一.背景介绍
二.在线交互式分析
三.离线及流式分析
四.总结
一、背景介绍
1、课程预览
完成课程后整个 App 数据分析的架构图如下
可以看到在线分析、离线分析、流式分析三部分
HBase 作为开源分布式数据库,具有高性能,高可用,海量无线扩展的特点。但 HBase 的查询接口较简单,LOK、GET 复制组件执行,没办法满足丰富的数据分析需求,HBase 可以加时间。如果要获取一段时间的交易量,查询用客户端实现,组表扫描。因为要扫出所有数据,在过滤时间段的交易订单加交易量这时比较复杂。虽然 HBase 的客户端比较简单但伤害比较强大,有大量的开源组件对 HBase 进行数据分析。
这门课程会结合很多开源的组件,手把手教用户怎样进入 HBase 大数据生态构建 HBase 数据分析系统。
What ?
基于 HBase 存储的应用数据,构建一套大数据处理框架,使用不同方式对 HBase 数据进行分析。
Why ?
HBase 作为开源分布式数据库,具备高性能,高可用,海量无限扩展的特点。然而HBase 查询接口较简单,无法满足数据分析的需求,但 HBase 具有丰富数据生态,可以结合大量开源的组件对 HBase 进行数据分析。
Who ?
大数据开发,希望了解 HBase 数据分析及大数据生态的用户。
How?
本课程将从原理到代码编写,手把手教用户构建基于 HBase 的大数据分析系统
2、HBase 数据访问概览
HBase 整体架构图如下:
HBase 的访问图最基本的是通过 APi 访问,APi 是简单的,主要是 Put、Get、Scan三个接口,对于数据过滤、集合稍微复杂的需求可以用 fifter+coprocessor 结合 APi 处理。
第二个因为 HBase 是存储计算分离的架构,数据文件是储存在 HDFS 上的,可以直接用 HBase 提供的 Reader/Writer 两个公式直接读取 APi 上数据文件,或者直接写文件通过 Bulkload 到 HBase 表里。
第三个,HBase 提供了 Map Reduce 框架的包装,提供了几种常见的工具类。首先是 TableInputFormat、TableOutputFormat这两个底层获取访问 HBase 数据是通过 API 的方式。因为需要分任务底层会按照 HBase 分区自动分成多个并发,每个分区并发通过 API 访问数据。
HFile 封装也是一样的,按读写分区,可以直接读写 HFile。练习时可以直接用这几个 Format 来实现 Map Reduce 作业。
3、准备工作
首先买一个 HBase 实例,课程主要用 Lindorm 单机版做,它是完全借用了 HBase 可以在云上买单机版,可以用来做验证。买完后在控制台添加白名单,将访问 HBase 所在的服务器 IP加到白名单里。 第二准备客户端在数据库连接,宽表引擎,Lindorm shell 下载,在 HBase 下载到 Lindorm shell,在控制台获取配置后对 HBase 进行访问或执行操作。
最后准备好开发环境,整个课程详细操作步骤和代码提供在 demo 工程。后续可下载学习
二、在线交互式分析
在线交互式分析指在不动 HBase 数据的情况下直接访问数据进行分析。
1、在线分析 HBase 数据-Spark
Spark+HBase 架构
Spark 专门是为大规模数据处理而设计的快速通用的计算引擎,有 Spark RDD、Spark SQL引擎可以使用。
Spark怎样连 HBase,有两种,一种通过HBaseClient 直接访问 HBase,第二种有很多开源的 Spark HBase Connector,通过 Spark 引入 HBase Connector 后直接操作分析数据。
流程概览
1.准备 Spark
.下载安装 Spark
2. 开发
.配置开发环境
.下载 demo 工程
.开发 Spark 作业
3.打包运行
.打包项目
.提交 Spark 作业
首先准备 Spark,如果已经有 Spark 环境可以直接用,提供简单安装的操作。第二部分开发,需要开发 Spark 作业。第三个打包运行,提交 Spark 作业。 HadoopRDD-InputFormat
public static void main(String[] args) throws I0Exception{
//设置spark访问入口
SparkConf conf=new SparkConf().setAppName("spark rdd on hbase demo")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); SparkContext sparkContext=new SparkContext(conf);
//hbase 侧的表名,需要在 hbase 侧提前创建。hbase 表创建可参考:https://helpaliyun.com/docu String hbaseTable ="hbase_xunlianying_table"; // 获取 HbaseRDD
//HBase 集群配置文件名。使用时请把配置文件放到 spark 的 conf 目录,配置文件内容从 hbase 控制台获取
string hbaseConfiq=arqs[B];
Configuration config=HBaseConfiguration,create(): config.addResource(hbaseConfig): config.set(TableInputFormat.INPUT_TABLE,hbaseTable)
Job job=Job.getInstance(config);
RDD
rdd=sparkContext.newAPIHadoopRDD(job.getConfiguration(), TableInputFormat.class
ImmutableBytesWritable.class,
Result.class):
System.out.println("count result:"+rdd.count());
sparkContext.stop();
}
引入 HBase configuration 之后,RDD 不支持使用引入 configuration,依赖HBase 客户端,底层用 TableInputFarmat,底层 Spark 作业自动帮任务切分成微讯数量的并发,对每个微讯做微型计算,获取最终植入。
System.out.println("count result:"+rdd.count());
sparkContext.stop();
}
只是简单的做 count 的植入。
Spark Connector-SparkSQL
public static void nain(string[] args){
SparkSession sparkSession=SparkSession
.builder()
.appName("spark sql on HBase deno")
.getorCreate();
sparkSession.sal(sqIText:"show tables").show;
//Spark侧的表名。
string sparkTableName="spark_sql_hbase";
//hbase侧的表名,需要在hbase侧提前创建,
hbase表创建可参考:httos://helo.aliyun,com/document detail/174651,htnl String hbaseTableName="hbase_xunlianying_table";
//HBase集群配置文件名。使用时请把配置文件放到spark的conf目录,配置文件内容从hbase控制台获取
string hbaseConfig=args[8]:
//1.创建Spark表
string createCmd ="CREATE TABLE"+ sparkTableName +" USING org.apache.hadoop.hbase.spark\n”+
OPTIONS('catalog'= \n"+
"{\"table\":{\"nanespace\":\"default\",\"name\":\"" + hbaseTableName + “\"},\"rowkey\":\"rowkey1\", \n' \"coluans!":{\n"+
\"col0\":{\"cf\":\"rowkey\",\"col":\"rowkey1\",\"typel":\"string\"},\n"+
\"col1\":{\"cf\":\"cf\",\"col\":\"col1\",\"type\":\"string\"}}}',\n" + "hbase.spark.use.hbasecontext’= 'false',n"+ "hbase.spark.config.location'=’"+hbaseConfig+\n"+
)"; System.out.println("create table cad:\n"+create Cnd): sparkSession.sal(createCnd);
//2.counthbase表数据
string querySql="select count(*)fron"+spark TableNane; sparkSession.sql(querySql).show();
sparkSession.stop():
}
用 Spark SQL 访问 HBase,Spark SQL 是依赖 HBase Spark Connector。代码建 Spark 表,直接映射到 HBase,配置 HBase 字段 rowkey 对应到 Spark 表里的字段类型。
传到 HBase 配置。Spark 表建立后可执行 select count 访问 HBase 数据。
直接使用HBase客户端读写数据
valsc=newSparkContext("local","test")
valconfig=newHBaseConfiguration()
valhbaseContext=newHBaseContext(sc,config)
valssc=newStreamingContext(sc,Milliseconds(200))
valrdd1=...
valrdd2=...
valqueue=mutable.Queue[RDD(Array[Byte],Array{(Array[Byte],
Array[Byte],Array[Byte])])]]()
queue+=rdd1
queue+=rdd2
valdstream=ssc.queueStream(queue)
dstream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord)=>{
valput=newPut(putRecord._1)
putRecord._2.foreach((putValue)=>put.addColumn(putValue._1,
putValue._2,putvalue._3)
put
})
也可以在 Spark 代码直接使用 HBase 客户端读写数据。
2、在线分析 HBase 数据 -Hive Hive+HBase 架构
Hive 是基于 HBase 的数据仓库工具,用于对大规模数据进行处理。底层是将对应的 sql 转化成 Map Reduce 任务来执行,依赖 HDFS 作为 MetaStore。整个架构打起来。通过 sql 来执行 HBase 数据分析。
准备工作
1.安装 hadoop
.下载安装 Hadoop
.设置环境变量
.配置 Hadoop
.启动 hdfs 和 yarn
.验证
2.安装 mysql
.安装 MySql
.准备好 Hive 使用 MySql
.用户和数据库
3.安装配置 Hive
.下载安装 Hive
.配置 Hive
.添加和替换 hbase 依赖
.添加 Hbase 配置
.验证
准备 Hive 环境,Hive 依赖 mysql,安装 mysql,添加和替换 hbase 的依赖。circle 的 driver 添加 hbase 集群配置。
1.建HBase外表:
CREATE EXTERNAL TABLE hbase_hive_table (key int,value string) STOREDBY'org.apache.hadoop.hivehbase.HBaseStorageHandler’ WITHSERDEPROPERTIES(“hbase.columnsmapping”= “:keycf1:val”)TBLPROPERTIES(“hbase.table.name”=
“hbase_hive_table”,“hbase.mapred.output.output.table”= “hbase_hive_table”);
2.SQL 查询
select count(*)from hbase_hive_table
配置好后可以简单创建 HBase 外表,与 Spark 很像将 Hive 配置到 HBase 里。 “:keycf1:val”)TBLPROPERTIES(“hbase.table.name”=
为字段公式 表建好后可以用 select 直接访问,对 HBase 做数据分析 截图为运行测试样例,有Map Reduce 的进度,HBase 会转化成 Map Reduce 作业并且会到依赖的 依赖的 Hive上去。具体在https://cwikiapache.org/confluence/display/Hive/ HBaseIntegr ation上详细介绍。
在线分析弊端
.影响在线:占用HBase在线服务资源影响应用对HBase的正常访问
.存储格式:相对列存,HBase存储格式对分析不友好,分析性能差
.数据变化:分析过程中数据一直在变化 在线分析数据虽然很方便但有些副作用,当用 Hive Spark 分析 HBase 数据时,底层会调用 HBase API,此操作占用 HBase 在线服务资源,如果任务工作量很大直接影响应用对 HBase 的正常访问。集团内业务跑起来占用在线服务资源,影响正常访问。相对列存,HBase 存储格式对分析并不友好,分析性能差,相对列存是最好的方式。在分析过程中应用一直在读取 HBase 数据一直在变化,要想获取某一时间点的值,应用一直在读取数据,后面的数据又更新了,数据又不一样,支持一定时间段内的数据,不是某一时间点的数据。
三、离线及流式分析
1、HBase 数据离线处理分析
完整步骤
1.全量导出 Hbase 数据
使用 LTS 将 Hbase 数据全量导出到 HDFS,存储为 Parquet 格式
2.开发 Spark 作业 开发 Spark 作业分析导出的 Parquet 文件数据
3.提交Spark任务
离线处理分析首先把 HBase 数据全量导出到 HDFS,存储为 Parquet 格式后打开 Spark 工作处理导出的 Parquet 文件数据。
https://help.aliyun.com/document detail/156428.html# title-ofz-4ah-45z
运用 LTS 工具,LTS 是阿里云自研的移动 Service,是HBase 数据链路,可以直接创建任务。
从某个 HBase 集群的表导出到目标集群,可参考此文档。
使用 Spark 分析 Parque 文件
publicstaticvoidmain(String[]args){
Systen.out.printin(args);
SparkConfconf=newSparkConf().setAppName("SparkParquetDemo”)
;
SparkSessionsparkSession=SparkSession
.builder()
.config(conf)
.getorCreate();
//parquet文件所在HDFS目录,格式:
hdfs://naneservice/parquetfilediar
StringhdfsDir=args[8];
StringsparkTable="spark_parquet_hbase";
Stringsql="droptableifexists"+sparkTable;
sparkSession.sql(sql);
//创建spark表,表数据路径在HDFS上。
sql=
"createtable"+sparkTable+"(\n"+
rowkeybinary,\n"+
versionlong,\n"+
opint,\n"+
fanilybinary,\n"+
qualifierbinary,\n"+
valbinary\n"+
)usingparquet"+
location'"+hdfsDir+"";
Systen.out.println("createcad:In"+sol):
sparkSession.sql(sql);
//查询数据
sql="select*from"+sparkTable;
sparkSession.sql(sol).show(();
sparkSession.stop();
}
将数据打到 HDFS 上,传好后写一个 Spark 作业来分析 Parquet 文件。代码运用 sql 建一张表关联到映射的 Parquet 文件目录,传入hdfs目录后访问一张表一样,用 sql 分析数据,比较方便。
离线分析的弊端
.实时性差:全量导出代价大数据量越大数据产出越困难
.存储冗余: 数据在离线重复存储
.重复导出:识别增量较难,历史数据重复导出
实时性差,数据是在不断增长的,HBase 一直在实时的使用数据,数据一直在增长,每天定期全量导出代价很大。因为数据量越大数据产出越困难,每天产出的越来越多。
需要将数据存储在离线,存储需要冗余。
每天导出,理想状态是每天导出增量数据,访问方式限制识别增量很难。除非 rowkey 占时间而且不会更新数据,一般情况很难识别,今天最新更新的。
2、HBase 数据流式处理分析
完整步骤
1.增量订阅 HBase 到 Kakfa
使用 LTS 增量订阅 Hbase 数据,写入到 Kakfa
2.开发 Spark 作业对接 kafka
使用 Sparkstreaming 对接 Kafka,进行流式计算
3.计算结果写入 HBase
可以实时订阅数据,用流计算实时统计需要的指标。通过 LTS 实时将数据同步到 kafka 上,用 Spark streaming 对接 kafka 做实时的计算。
第一步增加订阅 HBase 到 kafka,第二步开发 Spark 作业对接 kafka,第三步计算结果写入 HBase 中。
导出到 kafka
引用阿里云 LTS 产品,此模块可以订阅文章或某些表格数据,写到对应的消息存储中,配置 kafka。增强配置可以通过文档导出。当任务进载后,HBase 数据会实时的导出到 kafka 里。底层实现原理是订阅 HBase 日志,看那些日志是新增的,一直会 care 正在写的日志,有新增的导不出去,临时会在两级甚至更低。
使用 Spark 对接 Kafka 进行实时计算
publicstaticvoidmain(String[]args)throwsI0Exception{
Stringbrokers=args[8];//kafka的broke,格式为:
ip:port,ip:port,ip:port
Stringtopic=args[1];//kafa的topic
StringgroupId=args[2];//kafkaGroupID
intbatchSize=18://SparkStreaming批处理的时间问题
SparkConfsparkConf=newSparkConf().setAppName(“SparkStreamin
g0nKafkaToHBaseDemo");
JavaStreamingContextssc=newJavaStreamingContext(sparkConf,
newDuration(millis:batchSize*1000));
List<String>topicList=newArrayList<>():
topicList.add(topic);//HBase集群配置文件名。使用时请把配置文件放到 spark 的conf 目录,配置文件内容从 hbase 控制台获取
String hbaseConfLocation=args[3];
//设置kafko多数
Map<String,Object>kafkaParans=newHashMap(); kafkaParans.put("bootstrap.servers"brokers): kafkaParans.put("value.deserializer","org.apache.kafka.comnon.serialization.StringDeserializer"); kafkaParans.put("key.deserializer",“org,apache.kafka.comnon.serialization.StringDeserializer"); kafkaParans.put(“group.id"groupId): LocationStrategylocationStrategy=LocationStrategies.PreferConsistent(); ConsunerStrategyconsumerStrategy=ConsunerStrategies.Subscribe(topicListkafkaParans);
//从 Kafka 接收数据并创建对应的 DStrean. JavaInputostream<ConsumerRecrd<String,String>>messages=KafkaUtils.createdirectStream(ssc,
locationStrategy, consumerStrategy);
String hbaseiableName ="hbase_xunlianying_table":
//解析LTS同步的HBase数据 JavadStream<String[]>wordsamessages.nap(newFunction<ConsumerRecord<String,String>,String[]>() {
Override publicString[]call(ConsumerRecord<String,String>stringStringConsumerRecord) throws Exception { JSONObjectjsonValue=JSONObject.parseObject(stringStringConsumerRecord.value()); JSONArraykeys=jsonValue.getJSoNArray(key:"keys"); JSONArrayvalues=jsonValue.getSONArray(key:“data); String[] result =new String[keys.size()+values.size()];
int retIndex = 8;
for (inti=8;i<keys.size():i++){
JSONObiect key=keys.getSONObiect(1):
result[retIndex++] = key.getString(key:"value");
数据增量订阅到 kafka 上后,Spark 作业增量订阅 HBase 数据,实时统计。底层用 Spark kafka,收到数据后需要 HBase 导出的数据用 keys 解析,解析完后进行计算。
四、总结
1、HBase 大数据分析
HBase 数据生态
有很多开源产品,首先 HBase 通过 App、数据库、消息队列这些高并发写入 HBase 后,在线通过 Flink、hadoop、presto 可以直接读取 HBase 数据做分析,同时结果批量/Bulkload 写入 HBase。 通过同步工具 LTS DataX、Kettle 将 HBase 数据离线分析到 Aliyun DLA ClickHouse MaxCompute Parquet 里去。增量订阅 LTS Replication 对接消息队列 kafka MQ DataHub 下一步对接流式处理分析 Apache Flink Spark。
Replication 开源没有特别流行,增加队列的产品。Replication 是另类基于选择的,伪装成一个目的 HBase 集群,收到 HBase 数据在做对应的操作。LTS 是主动读取 HBase。
2、下一代产品:云原生多模数据库 Lindorm
indorm 基于 HBase 有多种访问方式,底层还支持时序、搜索、文件、宽表、键值。能力上云原生弹性,扩展性强,多模之间像宽表可以直接对接到搜索互通融合,内部可直接融合在一起,低成本可以海量存储,数据生态强大。