基于 HBase 的大数据在线分析|学习笔记

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 快速学习基于 HBase 的大数据在线分析

开发者学堂课程【HBase 入门与实战基于 HBase 的大数据在线分析】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/808/detail/13897


基于HBase的大数据在线分析


内容介绍

一.背景介绍  

二.在线交互式分析

三.离线及流式分析

四.总结  

一、背景介绍

1、课程预览

完成课程后整个 App 数据分析的架构图如下

可以看到在线分析、离线分析、流式分析三部分

image.png

HBase 作为开源分布式数据库,具有高性能,高可用,海量无线扩展的特点。但 HBase 的查询接口较简单,LOK、GET 复制组件执行,没办法满足丰富的数据分析需求,HBase 可以加时间。如果要获取一段时间的交易量,查询用客户端实现,组表扫描。因为要扫出所有数据,在过滤时间段的交易订单加交易量这时比较复杂。虽然 HBase 的客户端比较简单但伤害比较强大,有大量的开源组件对 HBase 进行数据分析。

这门课程会结合很多开源的组件,手把手教用户怎样进入 HBase 大数据生态构建 HBase 数据分析系统。

What ?

基于 HBase 存储的应用数据,构建一套大数据处理框架,使用不同方式对 HBase 数据进行分析。

Why ?

HBase 作为开源分布式数据库,具备高性能,高可用,海量无限扩展的特点。然而HBase 查询接口较简单,无法满足数据分析的需求,但 HBase 具有丰富数据生态,可以结合大量开源的组件对 HBase 进行数据分析。

Who ?

大数据开发,希望了解 HBase 数据分析及大数据生态的用户。

How?

本课程将从原理到代码编写,手把手教用户构建基于 HBase 的大数据分析系统

2、HBase 数据访问概览

HBase 整体架构图如下:

image.png

HBase 的访问图最基本的是通过 APi 访问,APi 是简单的,主要是 Put、Get、Scan三个接口,对于数据过滤、集合稍微复杂的需求可以用 fifter+coprocessor 结合 APi 处理。

第二个因为 HBase 是存储计算分离的架构,数据文件是储存在 HDFS 上的,可以直接用 HBase 提供的 Reader/Writer 两个公式直接读取 APi 上数据文件,或者直接写文件通过 Bulkload 到 HBase 表里。

第三个,HBase 提供了 Map Reduce 框架的包装,提供了几种常见的工具类。首先是 TableInputFormat、TableOutputFormat这两个底层获取访问 HBase 数据是通过 API 的方式。因为需要分任务底层会按照 HBase 分区自动分成多个并发,每个分区并发通过 API 访问数据。

HFile 封装也是一样的,按读写分区,可以直接读写 HFile。练习时可以直接用这几个 Format 来实现 Map Reduce 作业。

3、准备工作

image.png

首先买一个 HBase 实例,课程主要用 Lindorm 单机版做,它是完全借用了 HBase 可以在云上买单机版,可以用来做验证。买完后在控制台添加白名单,将访问 HBase 所在的服务器 IP加到白名单里。 第二准备客户端在数据库连接,宽表引擎,Lindorm shell 下载,在 HBase 下载到 Lindorm shell,在控制台获取配置后对 HBase 进行访问或执行操作。

最后准备好开发环境,整个课程详细操作步骤和代码提供在 demo 工程。后续可下载学习  


二、在线交互式分析

在线交互式分析指在不动 HBase 数据的情况下直接访问数据进行分析。

1、在线分析 HBase 数据-Spark

Spark+HBase 架构

image.png

Spark 专门是为大规模数据处理而设计的快速通用的计算引擎,有 Spark RDD、Spark SQL引擎可以使用。

Spark怎样连 HBase,有两种,一种通过HBaseClient 直接访问 HBase,第二种有很多开源的 Spark HBase Connector,通过 Spark 引入 HBase Connector 后直接操作分析数据。

流程概览

1.准备 Spark  

.下载安装 Spark

2. 开发  

.配置开发环境

.下载 demo 工程

.开发 Spark 作业

3.打包运行

.打包项目

.提交 Spark 作业

首先准备 Spark,如果已经有 Spark 环境可以直接用,提供简单安装的操作。第二部分开发,需要开发 Spark 作业。第三个打包运行,提交 Spark 作业。 HadoopRDD-InputFormat

public static void main(String[] args) throws I0Exception{

//设置spark访问入口

SparkConf conf=new SparkConf().setAppName("spark rdd on hbase demo") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); SparkContext sparkContext=new SparkContext(conf);

//hbase 侧的表名,需要在 hbase 侧提前创建。hbase 表创建可参考:https://helpaliyun.com/docu String hbaseTable ="hbase_xunlianying_table"; // 获取 HbaseRDD

//HBase 集群配置文件名。使用时请把配置文件放到 spark 的 conf 目录,配置文件内容从 hbase 控制台获取  

string hbaseConfiq=arqs[B];

Configuration config=HBaseConfiguration,create(): config.addResource(hbaseConfig): config.set(TableInputFormat.INPUT_TABLE,hbaseTable)

Job job=Job.getInstance(config);

RDD

rdd=sparkContext.newAPIHadoopRDD(job.getConfiguration(), TableInputFormat.class

ImmutableBytesWritable.class,

Result.class):

System.out.println("count result:"+rdd.count());

sparkContext.stop();

}

引入 HBase configuration 之后,RDD 不支持使用引入 configuration,依赖HBase 客户端,底层用 TableInputFarmat,底层 Spark 作业自动帮任务切分成微讯数量的并发,对每个微讯做微型计算,获取最终植入。

System.out.println("count result:"+rdd.count());

sparkContext.stop();

}

只是简单的做 count 的植入。

Spark Connector-SparkSQL

public static void nain(string[] args){

SparkSession sparkSession=SparkSession

.builder()

.appName("spark sql on HBase deno")

.getorCreate();

sparkSession.sal(sqIText:"show tables").show;

//Spark侧的表名。

string sparkTableName="spark_sql_hbase";

//hbase侧的表名,需要在hbase侧提前创建,

hbase表创建可参考:httos://helo.aliyun,com/document detail/174651,htnl String hbaseTableName="hbase_xunlianying_table";

//HBase集群配置文件名。使用时请把配置文件放到spark的conf目录,配置文件内容从hbase控制台获取  

string hbaseConfig=args[8]:

//1.创建Spark表

string createCmd ="CREATE TABLE"+ sparkTableName +" USING org.apache.hadoop.hbase.spark\n”+

OPTIONS('catalog'= \n"+

"{\"table\":{\"nanespace\":\"default\",\"name\":\"" + hbaseTableName + “\"},\"rowkey\":\"rowkey1\", \n' \"coluans!":{\n"+

\"col0\":{\"cf\":\"rowkey\",\"col":\"rowkey1\",\"typel":\"string\"},\n"+

\"col1\":{\"cf\":\"cf\",\"col\":\"col1\",\"type\":\"string\"}}}',\n" + "hbase.spark.use.hbasecontext’= 'false',n"+ "hbase.spark.config.location'=’"+hbaseConfig+\n"+

)"; System.out.println("create table cad:\n"+create Cnd): sparkSession.sal(createCnd);

//2.counthbase表数据

string querySql="select count(*)fron"+spark TableNane; sparkSession.sql(querySql).show();  

sparkSession.stop():

}

用 Spark SQL 访问 HBase,Spark SQL 是依赖 HBase Spark Connector。代码建 Spark 表,直接映射到 HBase,配置 HBase 字段 rowkey 对应到 Spark 表里的字段类型。

传到 HBase 配置。Spark 表建立后可执行 select count 访问 HBase 数据。

直接使用HBase客户端读写数据

valsc=newSparkContext("local","test")

valconfig=newHBaseConfiguration()

valhbaseContext=newHBaseContext(sc,config)

valssc=newStreamingContext(sc,Milliseconds(200))

valrdd1=...

valrdd2=...

valqueue=mutable.Queue[RDD(Array[Byte],Array{(Array[Byte],

Array[Byte],Array[Byte])])]]()

queue+=rdd1

queue+=rdd2

valdstream=ssc.queueStream(queue)

dstream.hbaseBulkPut(

hbaseContext,

TableName.valueOf(tableName),

(putRecord)=>{

valput=newPut(putRecord._1)

putRecord._2.foreach((putValue)=>put.addColumn(putValue._1,

putValue._2,putvalue._3)

put

})

也可以在 Spark 代码直接使用 HBase 客户端读写数据。

2、在线分析 HBase 数据 -Hive Hive+HBase 架构

image.png

Hive 是基于 HBase 的数据仓库工具,用于对大规模数据进行处理。底层是将对应的 sql 转化成 Map Reduce 任务来执行,依赖 HDFS 作为 MetaStore。整个架构打起来。通过 sql 来执行 HBase 数据分析。

准备工作

1.安装 hadoop

.下载安装 Hadoop

.设置环境变量

.配置 Hadoop

.启动 hdfs 和 yarn

.验证

2.安装 mysql

.安装 MySql

.准备好 Hive 使用 MySql

.用户和数据库

3.安装配置 Hive

.下载安装 Hive

.配置 Hive

.添加和替换 hbase 依赖

.添加 Hbase 配置

.验证

准备 Hive 环境,Hive 依赖 mysql,安装 mysql,添加和替换 hbase 的依赖。circle 的 driver 添加 hbase 集群配置。

1.建HBase外表:

CREATE EXTERNAL TABLE hbase_hive_table (key int,value string)  STOREDBY'org.apache.hadoop.hivehbase.HBaseStorageHandler’  WITHSERDEPROPERTIES(“hbase.columnsmapping”= “:keycf1:val”)TBLPROPERTIES(“hbase.table.name”=

“hbase_hive_table”,“hbase.mapred.output.output.table”= “hbase_hive_table”);

2.SQL 查询

select count(*)from hbase_hive_table

配置好后可以简单创建 HBase 外表,与 Spark 很像将 Hive 配置到 HBase 里。 “:keycf1:val”)TBLPROPERTIES(“hbase.table.name”=

为字段公式 表建好后可以用 select 直接访问,对 HBase 做数据分析 截图为运行测试样例,有Map Reduce 的进度,HBase 会转化成 Map Reduce 作业并且会到依赖的 依赖的 Hive上去。具体在https://cwikiapache.org/confluence/display/Hive/ HBaseIntegr ation上详细介绍。

image.png

在线分析弊端

.影响在线:占用HBase在线服务资源影响应用对HBase的正常访问

.存储格式:相对列存,HBase存储格式对分析不友好,分析性能差

.数据变化:分析过程中数据一直在变化 在线分析数据虽然很方便但有些副作用,当用 Hive Spark 分析 HBase 数据时,底层会调用 HBase API,此操作占用 HBase 在线服务资源,如果任务工作量很大直接影响应用对 HBase 的正常访问。集团内业务跑起来占用在线服务资源,影响正常访问。相对列存,HBase 存储格式对分析并不友好,分析性能差,相对列存是最好的方式。在分析过程中应用一直在读取 HBase 数据一直在变化,要想获取某一时间点的值,应用一直在读取数据,后面的数据又更新了,数据又不一样,支持一定时间段内的数据,不是某一时间点的数据。  


三、离线及流式分析

1、HBase 数据离线处理分析

完整步骤

1.全量导出 Hbase 数据  

使用 LTS 将 Hbase 数据全量导出到 HDFS,存储为 Parquet 格式

2.开发 Spark 作业  开发 Spark 作业分析导出的 Parquet 文件数据

3.提交Spark任务

离线处理分析首先把 HBase 数据全量导出到 HDFS,存储为 Parquet 格式后打开 Spark 工作处理导出的 Parquet 文件数据。

image.png

https://help.aliyun.com/document detail/156428.html# title-ofz-4ah-45z

运用 LTS 工具,LTS 是阿里云自研的移动 Service,是HBase 数据链路,可以直接创建任务。

image.png

从某个 HBase 集群的表导出到目标集群,可参考此文档。

使用 Spark 分析 Parque 文件

publicstaticvoidmain(String[]args){

Systen.out.printin(args);

SparkConfconf=newSparkConf().setAppName("SparkParquetDemo”)

;

SparkSessionsparkSession=SparkSession

.builder()

.config(conf)

.getorCreate();

//parquet文件所在HDFS目录,格式:

hdfs://naneservice/parquetfilediar

StringhdfsDir=args[8];

StringsparkTable="spark_parquet_hbase";

Stringsql="droptableifexists"+sparkTable;

sparkSession.sql(sql);

//创建spark表,表数据路径在HDFS上。

sql=

"createtable"+sparkTable+"(\n"+

rowkeybinary,\n"+

versionlong,\n"+

opint,\n"+

fanilybinary,\n"+

qualifierbinary,\n"+

valbinary\n"+

)usingparquet"+

location'"+hdfsDir+"";

Systen.out.println("createcad:In"+sol):

sparkSession.sql(sql);

//查询数据

sql="select*from"+sparkTable;

sparkSession.sql(sol).show(();

sparkSession.stop();

}

将数据打到 HDFS 上,传好后写一个 Spark 作业来分析 Parquet 文件。代码运用 sql 建一张表关联到映射的 Parquet 文件目录,传入hdfs目录后访问一张表一样,用 sql 分析数据,比较方便。

离线分析的弊端

.实时性差:全量导出代价大数据量越大数据产出越困难

.存储冗余: 数据在离线重复存储

.重复导出:识别增量较难,历史数据重复导出

实时性差,数据是在不断增长的,HBase 一直在实时的使用数据,数据一直在增长,每天定期全量导出代价很大。因为数据量越大数据产出越困难,每天产出的越来越多。

需要将数据存储在离线,存储需要冗余。

每天导出,理想状态是每天导出增量数据,访问方式限制识别增量很难。除非 rowkey 占时间而且不会更新数据,一般情况很难识别,今天最新更新的。

2、HBase 数据流式处理分析

完整步骤

1.增量订阅 HBase 到 Kakfa

使用 LTS 增量订阅 Hbase 数据,写入到 Kakfa

2.开发 Spark 作业对接 kafka  

使用 Sparkstreaming 对接 Kafka,进行流式计算

3.计算结果写入 HBase

可以实时订阅数据,用流计算实时统计需要的指标。通过 LTS 实时将数据同步到 kafka 上,用 Spark streaming 对接 kafka 做实时的计算。

第一步增加订阅 HBase 到 kafka,第二步开发 Spark 作业对接 kafka,第三步计算结果写入 HBase 中。

导出到 kafka

image.png

引用阿里云 LTS 产品,此模块可以订阅文章或某些表格数据,写到对应的消息存储中,配置 kafka。增强配置可以通过文档导出。当任务进载后,HBase 数据会实时的导出到 kafka 里。底层实现原理是订阅 HBase 日志,看那些日志是新增的,一直会 care 正在写的日志,有新增的导不出去,临时会在两级甚至更低。

使用 Spark 对接 Kafka 进行实时计算

publicstaticvoidmain(String[]args)throwsI0Exception{

Stringbrokers=args[8];//kafka的broke,格式为:

ip:port,ip:port,ip:port

Stringtopic=args[1];//kafa的topic

StringgroupId=args[2];//kafkaGroupID

intbatchSize=18://SparkStreaming批处理的时间问题

SparkConfsparkConf=newSparkConf().setAppName(“SparkStreamin

g0nKafkaToHBaseDemo");

JavaStreamingContextssc=newJavaStreamingContext(sparkConf,

newDuration(millis:batchSize*1000));

List<String>topicList=newArrayList<>():

topicList.add(topic);//HBase集群配置文件名。使用时请把配置文件放到 spark 的conf 目录,配置文件内容从 hbase 控制台获取  

String hbaseConfLocation=args[3];

//设置kafko多数

Map<String,Object>kafkaParans=newHashMap(); kafkaParans.put("bootstrap.servers"brokers): kafkaParans.put("value.deserializer","org.apache.kafka.comnon.serialization.StringDeserializer"); kafkaParans.put("key.deserializer",“org,apache.kafka.comnon.serialization.StringDeserializer"); kafkaParans.put(“group.id"groupId): LocationStrategylocationStrategy=LocationStrategies.PreferConsistent(); ConsunerStrategyconsumerStrategy=ConsunerStrategies.Subscribe(topicListkafkaParans);

//从 Kafka 接收数据并创建对应的 DStrean. JavaInputostream<ConsumerRecrd<String,String>>messages=KafkaUtils.createdirectStream(ssc,

locationStrategy, consumerStrategy);

String hbaseiableName ="hbase_xunlianying_table":

//解析LTS同步的HBase数据 JavadStream<String[]>wordsamessages.nap(newFunction<ConsumerRecord<String,String>,String[]>() {

Override publicString[]call(ConsumerRecord<String,String>stringStringConsumerRecord) throws Exception { JSONObjectjsonValue=JSONObject.parseObject(stringStringConsumerRecord.value()); JSONArraykeys=jsonValue.getJSoNArray(key:"keys"); JSONArrayvalues=jsonValue.getSONArray(key:“data); String[] result =new String[keys.size()+values.size()];  

int retIndex = 8;

for (inti=8;i<keys.size():i++){

JSONObiect key=keys.getSONObiect(1):

result[retIndex++] = key.getString(key:"value");  

数据增量订阅到 kafka 上后,Spark 作业增量订阅 HBase 数据,实时统计。底层用 Spark kafka,收到数据后需要 HBase 导出的数据用 keys 解析,解析完后进行计算。  


四、总结

1、HBase 大数据分析

HBase 数据生态

image.png

有很多开源产品,首先 HBase 通过 App、数据库、消息队列这些高并发写入 HBase 后,在线通过 Flink、hadoop、presto 可以直接读取 HBase 数据做分析,同时结果批量/Bulkload 写入 HBase。 通过同步工具 LTS DataX、Kettle 将 HBase 数据离线分析到 Aliyun DLA ClickHouse MaxCompute Parquet 里去。增量订阅 LTS Replication 对接消息队列 kafka MQ DataHub 下一步对接流式处理分析 Apache Flink Spark。

Replication 开源没有特别流行,增加队列的产品。Replication 是另类基于选择的,伪装成一个目的 HBase 集群,收到 HBase 数据在做对应的操作。LTS 是主动读取 HBase。

2、下一代产品:云原生多模数据库 Lindorm

indorm 基于 HBase 有多种访问方式,底层还支持时序、搜索、文件、宽表、键值。能力上云原生弹性,扩展性强,多模之间像宽表可以直接对接到搜索互通融合,内部可直接融合在一起,低成本可以海量存储,数据生态强大。  

image.png

相关文章
|
2月前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
167 2
|
3月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
96 5
|
23天前
|
机器学习/深度学习 数据可视化 大数据
机器学习与大数据分析的结合:智能决策的新引擎
机器学习与大数据分析的结合:智能决策的新引擎
124 15
|
29天前
|
SQL 分布式计算 DataWorks
DataWorks产品测评|基于DataWorks和MaxCompute产品组合实现用户画像分析
本文介绍了如何使用DataWorks和MaxCompute产品组合实现用户画像分析。首先,通过阿里云官网开通DataWorks服务并创建资源组,接着创建MaxCompute项目和数据源。随后,利用DataWorks的数据集成和数据开发模块,将业务数据同步至MaxCompute,并通过ODPS SQL完成用户画像的数据加工,最终将结果写入`ads_user_info_1d`表。文章详细记录了每一步的操作过程,包括任务开发、运行、运维操作和资源释放,帮助读者顺利完成用户画像分析。此外,还指出了文档中的一些不一致之处,并提供了相应的解决方法。
|
28天前
|
分布式计算 DataWorks 搜索推荐
用户画像分析(MaxCompute简化版)
通过本教程,您可以了解如何使用DataWorks和MaxCompute产品组合进行数仓开发与分析,并通过案例体验DataWorks数据集成、数据开发和运维中心模块的相关能力。
|
2月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
115 4
|
2月前
|
关系型数据库 分布式数据库 数据库
PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具
在数字化时代,企业面对海量数据的挑战,PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具。它不仅支持高速数据读写,还通过数据分区、索引优化等策略提升分析效率,适用于电商、金融等多个行业,助力企业精准决策。
43 4
|
2月前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
299 5
|
2月前
|
存储 监控 数据挖掘
【Clikhouse 探秘】ClickHouse 物化视图:加速大数据分析的新利器
ClickHouse 的物化视图是一种特殊表,通过预先计算并存储查询结果,显著提高查询性能,减少资源消耗,适用于实时报表、日志分析、用户行为分析、金融数据分析和物联网数据分析等场景。物化视图的创建、数据插入、更新和一致性保证通过事务机制实现。
269 14
|
2月前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
185 2