Flink SQL 实战:HBase 的结合应用

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文着重介绍 HBase 和 Flink 在实际场景中的结合使用。主要分为两种场景,第一种场景:HBase 作为维表与 Flink Kafka table 做 temporal table join 的场景;第二种场景:Flink SQL 做计算之后的结果写到 HBase 表,供其他用户查询的场景。

本文主要介绍 HBase 和 Flink SQL 的结合使用。HBase 作为 Google 发表 Big Table 论文的开源实现版本,是一种分布式列式存储的数据库,构建在 HDFS 之上的 NoSQL 数据库,非常适合大规模实时查询,因此 HBase 在实时计算领域使用非常广泛。可以实时写 HBase,也可以利用 buckload 一把把离线 Job 生成 HFile Load 到HBase 表中。而当下 Flink SQL 的火热程度不用多说,Flink SQL 也为 HBase 提供了 connector,因此 HBase 与 Flink SQL 的结合非常有必要实践实践。

当然,本文假设用户有一定的 HBase 知识基础,不会详细去介绍 HBase 的架构和原理,本文着重介绍 HBase 和 Flink 在实际场景中的结合使用。主要分为两种场景,第一种场景:HBase 作为维表与 Flink Kafka table 做 temporal table join 的场景;第二种场景:Flink SQL 做计算之后的结果写到 HBase 表,供其他用户查询的场景。因此,本文介绍的内容如下所示:

· HBase 环境准备
· 数据准备
· HBase 作为维度表进行 temporal table join的场景
· Flink SQL 做计算写 HBase 的场景
· 总结

一、HBase 环境准备

由于没有测试的 HBase 环境以及为了避免污染线上 Hbase 环境。因此,自己 build一个 Hbase docker image(大家可以 docker pull guxinglei/myhbase 拉到本地),是基于官方干净的 ubuntu imgae 之上安装了 Hbase 2.2.0 版本以及 JDK1.8 版本。

启动容器,暴露 Hbase web UI 端口以及内置 zk 端口,方便我们从 web 页面看信息以及创建 Flink Hbase table 需要 zk 的链接信息。

docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash

1.png

· 进入容器,启动 HBase 集群,以及启动 rest server,后续方便我们用 REST API 来读取 Flink SQL 写进 HBase 的数据。

# 启动hbase 集群bin/start-hbase.sh# 后台启动restServerbin/hbase-daemon.sh start rest -p 8000

2.png

二、数据准备

由于 HBase 环境是自己临时搞的单机服务,里面没有数据,需要往里面写点数据供后续示例用。在 Flink SQL 实战系列第二篇中介绍了如何注册 Flink Mysql table,我们可以将广告位表抽取到 HBase 表中,用来做维度表,进行 temporal table join。因此,我们需要在 HBase 中创建一张表,同时还需要创建 Flink HBase table, 这两张表通过 Flink SQL 的 HBase connector 关联起来。

· 在容器中启动 HBase shell,创建一张名为 dim_hbase 的 HBase 表,建表语句如下所示:

# 在hbase shell创建 hbase表
hbase(main):002:0> create 'dim_hbase','cf'
Created table dim_hbase
Took 1.3120 seconds
=> Hbase::Table - dim_hbase

3.png

· 在 Flink 中创建 Flink HBase table,建表语句如下所示:

# 注册 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table;
CREATE TABLE flink_rtdw.demo.hbase_dim_table (
  rowkey STRING,
  cf ROW < adspace_name STRING >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'localhost:2181'
);

· Flink MySQL table 和 Flink HBase table 已经创建好了,就可以写抽取数据到HBase 的 SQL job 了,SQL 语句以及 job 状态如下所示:

# 抽取Mysql数据到Hbase表中


insert into
  hbase_dim_table
select
CAST (ID as VARCHAR),
ROW(name)
from
  mysql_dim_table;

4.png

5.png

6.png

7.png

03 HBase 作为维表与 Kafka 做 temporal join 的场景

在 Flink SQL join 中,维度表的 join 一定绕不开的,比如订单金额 join 汇率表,点击流 join 广告位的明细表等等,使用场景非常广泛。那么作为分布式数据库的 HBase 比 MySQL 作为维度表用作维度表 join 更有优势。在 Flink SQL 实战系列第二篇中,我们注册了广告的点击流,将 Kafka topic 注册 Flink Kafka Table,同时也介绍了 temporal table join 在 Flink SQL 中的使用;那么本节中将会介绍 HBase 作为维度表来使用,上面小节中已经将数据抽取到 Hbase 中了,我们直接写 temporal table join 计算逻辑即可。

· 作为广告点击流的 Flink Kafa table 与 作为广告位的 Flink HBase table 通过广告位 Id 进行 temporal table join,输出广告位 ID 和广告位中文名字,SQL join 逻辑如下所示:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       hbase_dim_table.cf.adspace_name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on cast(adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as string) = hbase_dim_table.rowkey;

· temporal table join job 提交 Flink 集群上的状态以及 join 结果如下所示:

8.png

9.png

四、计算结果 sink 到 HBase 作为结果的场景

上面小节中,HBase 作为维度表用作 temporal table join 是非常常见的场景,实际上 HBase 作为存储计算结果也是非常常见的场景,毕竟 Hbase 作为分布式数据库,底层存储是拥有多副本机制的 HDFS,维护简单,扩容方便, 实时查询快,而且提供各种客户端方便下游使用存储在 HBase 中的数据。那么本小节就介绍 Flink SQL 将计算结果写到 HBase,并且通过 REST API 查询计算结果的场景。

· 进入容器中,在 HBase 中新建一张 HBase 表,一个 column family 就满足需求,建表语句如下所示:

# 注册hbase sink table
create 'dwa_hbase_click_report','cf'

10.png

· 建立好 HBase 表之后,我们需要在 Flink SQL 创建一张 Flink HBase table,这个时候我们需要明确 cf 这个 column famaly 下面 column 字段,在 Flink SQL实战第二篇中,已经注册好了作为点击流的 Flink Kafka table,因此本节中,将会计算点击流的 uv 和点击数,因此两个 column 分别为 uv 和 click_count,建表语句如下所示:

# 注册 Flink Hbase table
DROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report;
CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report (
  rowkey STRING,
  cf ROW < uv BIGINT, click_count BIGINT >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dwa_hbase_click_report',
'sink.buffer-flush.max-rows' = '1000',
'zookeeper.quorum' = 'hostname:2181'
);

11.png

· 前面点击流的 Flink Kafka table 和存储计算结果的 HBase table 和 Flink HBase table 已经准备了,我们将做一个1分钟的翻转窗口计算 uv 和点击数,并且将计算结果写到 HBase 中。对 HBase 了解的人应该知道,rowkey 的设计对 hbase regoin 的分布有着非常重要的影响,基于此我们的 rowkey 是使用 Flink SQL 内置的 reverse 函数进行广告位 Id 进行反转和窗口启始时间做 concat,因此,SQL 逻辑语句如下所示:

INSERT INTO dwa_hbase_click_report
SELECT
CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) ,
'_',
CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
  ) as rowkey, 
ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf
FROM
  adsdw_dwd_max_click_mobileapp
WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
GROUP BY
  TUMBLE(ets, INTERVAL '1' MINUTE),
  publisher_adspace_adspaceId;

12.png

· SQL job 提交之后的状态以及结果 check 如下所示:

13.png

14.png

上述 SQL job 已经成功的将结算结果写到 HBase 中了。对于线上的 HBase 服务来讲,很多同事不一定有 HBase 客户端的权限,从而也不能通过 HBase shell 读取数据;另外作为线上报表服务显然不可能通过 HBase shell 来通过查询数据。因此,在实时报表场景中,数据开发工程师将数据写入 HBase, 前端工程师通过 REST API 来读取数据。前面我们已经启动了 HBase rest server 进程,我们可以通 rest 服务提供读取 HBase 里面的数据。

· 我们先 get 一条刚刚写到 HBase 中的数据看看,如下所示:

15.png

· 下面我们开始通过 REST API 来查询 HBase 中的数据,第一步,执行如下语句拿到 scannerId;首先需要将要查询的 rowkey 进行 base64 编码才能使用,后面需要将结果进行 base64 解码

rowkey base64 编码前:0122612_1606295280000
base64 编码之后:MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

curl -vi -X PUT \
         -H "Accept: text/xml" \
         -H "Content-Type: text/xml" \
         -d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw" endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \
"http://hostname:8000/dwa_hbase_click_report/scanner"

16.png

· 第二步,执行如下语句根据上条语句返回的 scannerID 查询数据,可以看到返回的结果:

curl -vi -X GET \
         -H "Accept: application/json" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

17.png

· 第三步,查询完毕之后,执行如下语句删除该scannerId:

curl -vi -X DELETE \
         -H "Accept: text/xml" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

18.png

五、总结

在本篇文章中,我们介绍了 HBase 和 Flink SQL 的结合使用比较广泛两种的场景:作为维度表用以及存储计算结果;同时使用 REST API 对 HBase 中的数据进行查询,对于查询用户来说,避免直接暴露 HBase 的 zk,同时将 rest server 和 HBase 集群解耦。

作者简介

余敖,360 数据开发高级工程师,目前专注于基于 Flink 的实时数仓建设与平台化工作。对 Flink、Kafka、Hive、Spark 等进行数据 ETL 和数仓开发有丰富的经验。

社区二维码.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
12月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
1166 1
|
7月前
|
SQL 运维 监控
SQL查询太慢?实战讲解YashanDB SQL调优思路
本文是Meetup第十期“调优实战专场”的第二篇技术文章,上一篇《高效查询秘诀,解码YashanDB优化器分组查询优化手段》中,我们揭秘了YashanDB分组查询优化秘诀,本文将通过一个案例,助你快速上手YashanDB慢日志功能,精准定位“慢SQL”后进行优化。
|
7月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
805 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
7月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
284 6
|
9月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1651 27
|
10月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1117 2
探索Flink动态CEP:杭州银行的实战案例
|
10月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
704 14
|
11月前
|
SQL 数据库 UED
SQL性能提升秘籍:5步优化法与10个实战案例
在数据库管理和应用开发中,SQL查询的性能优化至关重要。高效的SQL查询不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将分享SQL优化的五大步骤和十个实战案例,帮助构建高效、稳定的数据库应用。
853 3
|
11月前
|
SQL 缓存 监控
SQL性能提升指南:五大优化策略与十个实战案例
在数据库性能优化的世界里,SQL优化是提升查询效率的关键。一个高效的SQL查询可以显著减少数据库的负载,提高应用响应速度,甚至影响整个系统的稳定性和扩展性。本文将介绍SQL优化的五大步骤,并结合十个实战案例,为你提供一份详尽的性能提升指南。
899 0
|
12月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
224 0

相关产品

  • 实时计算 Flink版