基于 HBase 的大数据在线分析|学习笔记

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 快速学习基于 HBase 的大数据在线分析

开发者学堂课程【HBase 入门与实战基于 HBase 的大数据在线分析】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/808/detail/13897


基于HBase的大数据在线分析


内容介绍

一.背景介绍  

二.在线交互式分析

三.离线及流式分析

四.总结  

一、背景介绍

1、课程预览

完成课程后整个 App 数据分析的架构图如下

可以看到在线分析、离线分析、流式分析三部分

image.png

HBase 作为开源分布式数据库,具有高性能,高可用,海量无线扩展的特点。但 HBase 的查询接口较简单,LOK、GET 复制组件执行,没办法满足丰富的数据分析需求,HBase 可以加时间。如果要获取一段时间的交易量,查询用客户端实现,组表扫描。因为要扫出所有数据,在过滤时间段的交易订单加交易量这时比较复杂。虽然 HBase 的客户端比较简单但伤害比较强大,有大量的开源组件对 HBase 进行数据分析。

这门课程会结合很多开源的组件,手把手教用户怎样进入 HBase 大数据生态构建 HBase 数据分析系统。

What ?

基于 HBase 存储的应用数据,构建一套大数据处理框架,使用不同方式对 HBase 数据进行分析。

Why ?

HBase 作为开源分布式数据库,具备高性能,高可用,海量无限扩展的特点。然而HBase 查询接口较简单,无法满足数据分析的需求,但 HBase 具有丰富数据生态,可以结合大量开源的组件对 HBase 进行数据分析。

Who ?

大数据开发,希望了解 HBase 数据分析及大数据生态的用户。

How?

本课程将从原理到代码编写,手把手教用户构建基于 HBase 的大数据分析系统

2、HBase 数据访问概览

HBase 整体架构图如下:

image.png

HBase 的访问图最基本的是通过 APi 访问,APi 是简单的,主要是 Put、Get、Scan三个接口,对于数据过滤、集合稍微复杂的需求可以用 fifter+coprocessor 结合 APi 处理。

第二个因为 HBase 是存储计算分离的架构,数据文件是储存在 HDFS 上的,可以直接用 HBase 提供的 Reader/Writer 两个公式直接读取 APi 上数据文件,或者直接写文件通过 Bulkload 到 HBase 表里。

第三个,HBase 提供了 Map Reduce 框架的包装,提供了几种常见的工具类。首先是 TableInputFormat、TableOutputFormat这两个底层获取访问 HBase 数据是通过 API 的方式。因为需要分任务底层会按照 HBase 分区自动分成多个并发,每个分区并发通过 API 访问数据。

HFile 封装也是一样的,按读写分区,可以直接读写 HFile。练习时可以直接用这几个 Format 来实现 Map Reduce 作业。

3、准备工作

image.png

首先买一个 HBase 实例,课程主要用 Lindorm 单机版做,它是完全借用了 HBase 可以在云上买单机版,可以用来做验证。买完后在控制台添加白名单,将访问 HBase 所在的服务器 IP加到白名单里。 第二准备客户端在数据库连接,宽表引擎,Lindorm shell 下载,在 HBase 下载到 Lindorm shell,在控制台获取配置后对 HBase 进行访问或执行操作。

最后准备好开发环境,整个课程详细操作步骤和代码提供在 demo 工程。后续可下载学习  


二、在线交互式分析

在线交互式分析指在不动 HBase 数据的情况下直接访问数据进行分析。

1、在线分析 HBase 数据-Spark

Spark+HBase 架构

image.png

Spark 专门是为大规模数据处理而设计的快速通用的计算引擎,有 Spark RDD、Spark SQL引擎可以使用。

Spark怎样连 HBase,有两种,一种通过HBaseClient 直接访问 HBase,第二种有很多开源的 Spark HBase Connector,通过 Spark 引入 HBase Connector 后直接操作分析数据。

流程概览

1.准备 Spark  

.下载安装 Spark

2. 开发  

.配置开发环境

.下载 demo 工程

.开发 Spark 作业

3.打包运行

.打包项目

.提交 Spark 作业

首先准备 Spark,如果已经有 Spark 环境可以直接用,提供简单安装的操作。第二部分开发,需要开发 Spark 作业。第三个打包运行,提交 Spark 作业。 HadoopRDD-InputFormat

public static void main(String[] args) throws I0Exception{

//设置spark访问入口

SparkConf conf=new SparkConf().setAppName("spark rdd on hbase demo") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); SparkContext sparkContext=new SparkContext(conf);

//hbase 侧的表名,需要在 hbase 侧提前创建。hbase 表创建可参考:https://helpaliyun.com/docu String hbaseTable ="hbase_xunlianying_table"; // 获取 HbaseRDD

//HBase 集群配置文件名。使用时请把配置文件放到 spark 的 conf 目录,配置文件内容从 hbase 控制台获取  

string hbaseConfiq=arqs[B];

Configuration config=HBaseConfiguration,create(): config.addResource(hbaseConfig): config.set(TableInputFormat.INPUT_TABLE,hbaseTable)

Job job=Job.getInstance(config);

RDD

rdd=sparkContext.newAPIHadoopRDD(job.getConfiguration(), TableInputFormat.class

ImmutableBytesWritable.class,

Result.class):

System.out.println("count result:"+rdd.count());

sparkContext.stop();

}

引入 HBase configuration 之后,RDD 不支持使用引入 configuration,依赖HBase 客户端,底层用 TableInputFarmat,底层 Spark 作业自动帮任务切分成微讯数量的并发,对每个微讯做微型计算,获取最终植入。

System.out.println("count result:"+rdd.count());

sparkContext.stop();

}

只是简单的做 count 的植入。

Spark Connector-SparkSQL

public static void nain(string[] args){

SparkSession sparkSession=SparkSession

.builder()

.appName("spark sql on HBase deno")

.getorCreate();

sparkSession.sal(sqIText:"show tables").show;

//Spark侧的表名。

string sparkTableName="spark_sql_hbase";

//hbase侧的表名,需要在hbase侧提前创建,

hbase表创建可参考:httos://helo.aliyun,com/document detail/174651,htnl String hbaseTableName="hbase_xunlianying_table";

//HBase集群配置文件名。使用时请把配置文件放到spark的conf目录,配置文件内容从hbase控制台获取  

string hbaseConfig=args[8]:

//1.创建Spark表

string createCmd ="CREATE TABLE"+ sparkTableName +" USING org.apache.hadoop.hbase.spark\n”+

OPTIONS('catalog'= \n"+

"{\"table\":{\"nanespace\":\"default\",\"name\":\"" + hbaseTableName + “\"},\"rowkey\":\"rowkey1\", \n' \"coluans!":{\n"+

\"col0\":{\"cf\":\"rowkey\",\"col":\"rowkey1\",\"typel":\"string\"},\n"+

\"col1\":{\"cf\":\"cf\",\"col\":\"col1\",\"type\":\"string\"}}}',\n" + "hbase.spark.use.hbasecontext’= 'false',n"+ "hbase.spark.config.location'=’"+hbaseConfig+\n"+

)"; System.out.println("create table cad:\n"+create Cnd): sparkSession.sal(createCnd);

//2.counthbase表数据

string querySql="select count(*)fron"+spark TableNane; sparkSession.sql(querySql).show();  

sparkSession.stop():

}

用 Spark SQL 访问 HBase,Spark SQL 是依赖 HBase Spark Connector。代码建 Spark 表,直接映射到 HBase,配置 HBase 字段 rowkey 对应到 Spark 表里的字段类型。

传到 HBase 配置。Spark 表建立后可执行 select count 访问 HBase 数据。

直接使用HBase客户端读写数据

valsc=newSparkContext("local","test")

valconfig=newHBaseConfiguration()

valhbaseContext=newHBaseContext(sc,config)

valssc=newStreamingContext(sc,Milliseconds(200))

valrdd1=...

valrdd2=...

valqueue=mutable.Queue[RDD(Array[Byte],Array{(Array[Byte],

Array[Byte],Array[Byte])])]]()

queue+=rdd1

queue+=rdd2

valdstream=ssc.queueStream(queue)

dstream.hbaseBulkPut(

hbaseContext,

TableName.valueOf(tableName),

(putRecord)=>{

valput=newPut(putRecord._1)

putRecord._2.foreach((putValue)=>put.addColumn(putValue._1,

putValue._2,putvalue._3)

put

})

也可以在 Spark 代码直接使用 HBase 客户端读写数据。

2、在线分析 HBase 数据 -Hive Hive+HBase 架构

image.png

Hive 是基于 HBase 的数据仓库工具,用于对大规模数据进行处理。底层是将对应的 sql 转化成 Map Reduce 任务来执行,依赖 HDFS 作为 MetaStore。整个架构打起来。通过 sql 来执行 HBase 数据分析。

准备工作

1.安装 hadoop

.下载安装 Hadoop

.设置环境变量

.配置 Hadoop

.启动 hdfs 和 yarn

.验证

2.安装 mysql

.安装 MySql

.准备好 Hive 使用 MySql

.用户和数据库

3.安装配置 Hive

.下载安装 Hive

.配置 Hive

.添加和替换 hbase 依赖

.添加 Hbase 配置

.验证

准备 Hive 环境,Hive 依赖 mysql,安装 mysql,添加和替换 hbase 的依赖。circle 的 driver 添加 hbase 集群配置。

1.建HBase外表:

CREATE EXTERNAL TABLE hbase_hive_table (key int,value string)  STOREDBY'org.apache.hadoop.hivehbase.HBaseStorageHandler’  WITHSERDEPROPERTIES(“hbase.columnsmapping”= “:keycf1:val”)TBLPROPERTIES(“hbase.table.name”=

“hbase_hive_table”,“hbase.mapred.output.output.table”= “hbase_hive_table”);

2.SQL 查询

select count(*)from hbase_hive_table

配置好后可以简单创建 HBase 外表,与 Spark 很像将 Hive 配置到 HBase 里。 “:keycf1:val”)TBLPROPERTIES(“hbase.table.name”=

为字段公式 表建好后可以用 select 直接访问,对 HBase 做数据分析 截图为运行测试样例,有Map Reduce 的进度,HBase 会转化成 Map Reduce 作业并且会到依赖的 依赖的 Hive上去。具体在https://cwikiapache.org/confluence/display/Hive/ HBaseIntegr ation上详细介绍。

image.png

在线分析弊端

.影响在线:占用HBase在线服务资源影响应用对HBase的正常访问

.存储格式:相对列存,HBase存储格式对分析不友好,分析性能差

.数据变化:分析过程中数据一直在变化 在线分析数据虽然很方便但有些副作用,当用 Hive Spark 分析 HBase 数据时,底层会调用 HBase API,此操作占用 HBase 在线服务资源,如果任务工作量很大直接影响应用对 HBase 的正常访问。集团内业务跑起来占用在线服务资源,影响正常访问。相对列存,HBase 存储格式对分析并不友好,分析性能差,相对列存是最好的方式。在分析过程中应用一直在读取 HBase 数据一直在变化,要想获取某一时间点的值,应用一直在读取数据,后面的数据又更新了,数据又不一样,支持一定时间段内的数据,不是某一时间点的数据。  


三、离线及流式分析

1、HBase 数据离线处理分析

完整步骤

1.全量导出 Hbase 数据  

使用 LTS 将 Hbase 数据全量导出到 HDFS,存储为 Parquet 格式

2.开发 Spark 作业  开发 Spark 作业分析导出的 Parquet 文件数据

3.提交Spark任务

离线处理分析首先把 HBase 数据全量导出到 HDFS,存储为 Parquet 格式后打开 Spark 工作处理导出的 Parquet 文件数据。

image.png

https://help.aliyun.com/document detail/156428.html# title-ofz-4ah-45z

运用 LTS 工具,LTS 是阿里云自研的移动 Service,是HBase 数据链路,可以直接创建任务。

image.png

从某个 HBase 集群的表导出到目标集群,可参考此文档。

使用 Spark 分析 Parque 文件

publicstaticvoidmain(String[]args){

Systen.out.printin(args);

SparkConfconf=newSparkConf().setAppName("SparkParquetDemo”)

;

SparkSessionsparkSession=SparkSession

.builder()

.config(conf)

.getorCreate();

//parquet文件所在HDFS目录,格式:

hdfs://naneservice/parquetfilediar

StringhdfsDir=args[8];

StringsparkTable="spark_parquet_hbase";

Stringsql="droptableifexists"+sparkTable;

sparkSession.sql(sql);

//创建spark表,表数据路径在HDFS上。

sql=

"createtable"+sparkTable+"(\n"+

rowkeybinary,\n"+

versionlong,\n"+

opint,\n"+

fanilybinary,\n"+

qualifierbinary,\n"+

valbinary\n"+

)usingparquet"+

location'"+hdfsDir+"";

Systen.out.println("createcad:In"+sol):

sparkSession.sql(sql);

//查询数据

sql="select*from"+sparkTable;

sparkSession.sql(sol).show(();

sparkSession.stop();

}

将数据打到 HDFS 上,传好后写一个 Spark 作业来分析 Parquet 文件。代码运用 sql 建一张表关联到映射的 Parquet 文件目录,传入hdfs目录后访问一张表一样,用 sql 分析数据,比较方便。

离线分析的弊端

.实时性差:全量导出代价大数据量越大数据产出越困难

.存储冗余: 数据在离线重复存储

.重复导出:识别增量较难,历史数据重复导出

实时性差,数据是在不断增长的,HBase 一直在实时的使用数据,数据一直在增长,每天定期全量导出代价很大。因为数据量越大数据产出越困难,每天产出的越来越多。

需要将数据存储在离线,存储需要冗余。

每天导出,理想状态是每天导出增量数据,访问方式限制识别增量很难。除非 rowkey 占时间而且不会更新数据,一般情况很难识别,今天最新更新的。

2、HBase 数据流式处理分析

完整步骤

1.增量订阅 HBase 到 Kakfa

使用 LTS 增量订阅 Hbase 数据,写入到 Kakfa

2.开发 Spark 作业对接 kafka  

使用 Sparkstreaming 对接 Kafka,进行流式计算

3.计算结果写入 HBase

可以实时订阅数据,用流计算实时统计需要的指标。通过 LTS 实时将数据同步到 kafka 上,用 Spark streaming 对接 kafka 做实时的计算。

第一步增加订阅 HBase 到 kafka,第二步开发 Spark 作业对接 kafka,第三步计算结果写入 HBase 中。

导出到 kafka

image.png

引用阿里云 LTS 产品,此模块可以订阅文章或某些表格数据,写到对应的消息存储中,配置 kafka。增强配置可以通过文档导出。当任务进载后,HBase 数据会实时的导出到 kafka 里。底层实现原理是订阅 HBase 日志,看那些日志是新增的,一直会 care 正在写的日志,有新增的导不出去,临时会在两级甚至更低。

使用 Spark 对接 Kafka 进行实时计算

publicstaticvoidmain(String[]args)throwsI0Exception{

Stringbrokers=args[8];//kafka的broke,格式为:

ip:port,ip:port,ip:port

Stringtopic=args[1];//kafa的topic

StringgroupId=args[2];//kafkaGroupID

intbatchSize=18://SparkStreaming批处理的时间问题

SparkConfsparkConf=newSparkConf().setAppName(“SparkStreamin

g0nKafkaToHBaseDemo");

JavaStreamingContextssc=newJavaStreamingContext(sparkConf,

newDuration(millis:batchSize*1000));

List<String>topicList=newArrayList<>():

topicList.add(topic);//HBase集群配置文件名。使用时请把配置文件放到 spark 的conf 目录,配置文件内容从 hbase 控制台获取  

String hbaseConfLocation=args[3];

//设置kafko多数

Map<String,Object>kafkaParans=newHashMap(); kafkaParans.put("bootstrap.servers"brokers): kafkaParans.put("value.deserializer","org.apache.kafka.comnon.serialization.StringDeserializer"); kafkaParans.put("key.deserializer",“org,apache.kafka.comnon.serialization.StringDeserializer"); kafkaParans.put(“group.id"groupId): LocationStrategylocationStrategy=LocationStrategies.PreferConsistent(); ConsunerStrategyconsumerStrategy=ConsunerStrategies.Subscribe(topicListkafkaParans);

//从 Kafka 接收数据并创建对应的 DStrean. JavaInputostream<ConsumerRecrd<String,String>>messages=KafkaUtils.createdirectStream(ssc,

locationStrategy, consumerStrategy);

String hbaseiableName ="hbase_xunlianying_table":

//解析LTS同步的HBase数据 JavadStream<String[]>wordsamessages.nap(newFunction<ConsumerRecord<String,String>,String[]>() {

Override publicString[]call(ConsumerRecord<String,String>stringStringConsumerRecord) throws Exception { JSONObjectjsonValue=JSONObject.parseObject(stringStringConsumerRecord.value()); JSONArraykeys=jsonValue.getJSoNArray(key:"keys"); JSONArrayvalues=jsonValue.getSONArray(key:“data); String[] result =new String[keys.size()+values.size()];  

int retIndex = 8;

for (inti=8;i<keys.size():i++){

JSONObiect key=keys.getSONObiect(1):

result[retIndex++] = key.getString(key:"value");  

数据增量订阅到 kafka 上后,Spark 作业增量订阅 HBase 数据,实时统计。底层用 Spark kafka,收到数据后需要 HBase 导出的数据用 keys 解析,解析完后进行计算。  


四、总结

1、HBase 大数据分析

HBase 数据生态

image.png

有很多开源产品,首先 HBase 通过 App、数据库、消息队列这些高并发写入 HBase 后,在线通过 Flink、hadoop、presto 可以直接读取 HBase 数据做分析,同时结果批量/Bulkload 写入 HBase。 通过同步工具 LTS DataX、Kettle 将 HBase 数据离线分析到 Aliyun DLA ClickHouse MaxCompute Parquet 里去。增量订阅 LTS Replication 对接消息队列 kafka MQ DataHub 下一步对接流式处理分析 Apache Flink Spark。

Replication 开源没有特别流行,增加队列的产品。Replication 是另类基于选择的,伪装成一个目的 HBase 集群,收到 HBase 数据在做对应的操作。LTS 是主动读取 HBase。

2、下一代产品:云原生多模数据库 Lindorm

indorm 基于 HBase 有多种访问方式,底层还支持时序、搜索、文件、宽表、键值。能力上云原生弹性,扩展性强,多模之间像宽表可以直接对接到搜索互通融合,内部可直接融合在一起,低成本可以海量存储,数据生态强大。  

image.png

相关文章
|
24天前
|
SQL 分布式计算 数据可视化
Tableau与大数据:可视化工具在大数据分析中的应用
【4月更文挑战第8天】Tableau是一款领先的数据可视化工具,擅长于大数据分析,提供广泛的数据连接器,支持多源整合。它与Hadoop、Spark等深度集成,实现高效大数据处理。Tableau的拖拽式界面和交互式分析功能使得非技术人员也能轻松探索数据。在实战中,Tableau用于业务监控、数据storytelling和自助式分析,推动数据民主化,提升决策效率。未来,Tableau将持续创新,扩展生态系统,并保障数据安全与合规性,助力企业最大化数据价值。
33 0
|
1月前
|
存储 消息中间件 监控
【Flume】Flume在大数据分析领域的应用
【4月更文挑战第4天】【Flume】Flume在大数据分析领域的应用
|
2月前
|
供应链
代采系统如何利用大数据分析优化采购决策?
代采系统可以利用大数据分析来优化采购决策
|
2月前
|
Cloud Native 数据处理 云计算
探索云原生技术在大数据分析中的应用
随着云计算技术的不断发展,云原生架构作为一种全新的软件开发和部署模式,正逐渐引起企业的广泛关注。本文将探讨云原生技术在大数据分析领域的应用,介绍其优势与挑战,并探讨如何利用云原生技术提升大数据分析的效率和可靠性。
|
2月前
|
存储 消息中间件 大数据
Go语言在大数据处理中的实际应用与案例分析
【2月更文挑战第22天】本文深入探讨了Go语言在大数据处理中的实际应用,通过案例分析展示了Go语言在处理大数据时的优势和实践效果。文章首先介绍了大数据处理的挑战与需求,然后详细分析了Go语言在大数据处理中的适用性和核心技术,最后通过具体案例展示了Go语言在大数据处理中的实际应用。
|
2月前
|
数据采集 运维 数据挖掘
API电商接口大数据分析与数据挖掘 (商品详情店铺)
API接口、数据分析以及数据挖掘在商品详情和店铺相关的应用中,各自扮演着重要的角色。以下是关于它们各自的功能以及如何在商品详情和店铺分析中协同工作的简要说明。
|
3月前
|
API
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
GEE案例分析——利用sentinel-3数据计算空气污染指数(Air Pollution Index,简称API)
115 0
|
2月前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
28 0
|
3天前
|
存储 机器学习/深度学习 数据采集
大数据处理与分析实战:技术深度剖析与案例分享
【5月更文挑战第2天】本文探讨了大数据处理与分析的关键环节,包括数据采集、预处理、存储、分析和可视化,并介绍了Hadoop、Spark和机器学习等核心技术。通过电商推荐系统和智慧城市交通管理的实战案例,展示了大数据在提高用户体验和解决实际问题上的效能。随着技术进步,大数据处理与分析将在更多领域发挥作用,推动社会进步。
|
6天前
|
存储 运维 监控

热门文章

最新文章