1. 背景
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。
湖仓一体架构的核心技术支撑,来自于table-format技术。目前开源支持的有iceberg、hudi、delta和paimon。其中paimon是国内开源的,也是最年轻的成员。paimon集合前辈数据湖格式技术的优点,创新地结合了 Lake 格式和 LSM 结构,将实时流式更新引入 Lake 架构。支持使用 Flink 和 Spark 构建实时 Lakehouse 架构,用于流式和批处理操作。
本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
2. 方案架构和优势
本文基于Flink+Paimon搭建流式湖仓的方案架构如下:
- Flink将数据源写入Paimon,形成ODS层。
- Flink订阅ODS层的变更数据(Changelog)进行加工,形成DWD层再次写入Paimon。
- 最后将宽表数据进行汇总计算,并写入下游 OLAP 查询引擎,用于数据可视化和数据分析。
2.1 优势
该方案有如下优势:
- Paimon的每一层数据都可以在分钟级的延时内将变更传递给下游,将传统离线数仓的延时从小时级甚至天级降低至分钟级。
- Paimon的每一层数据都可以直接接受变更数据,无需覆写分区,极大地降低了传统离线数仓数据更新与订正的成本,解决了中间层数据不易查、不易更新、不易修正的问题。
- 模型统一,架构简化。ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Paimon中,可以降低架构复杂度,提高数据处理效率。
3. 业务场景
本文以某个电商平台为例,通过搭建一套流式湖仓,实现数据的加工清洗,并支持上层应用对数据的查询。
- 构建ODS 表:业务数据库实时入仓MySQL有order_info(订单表),user_info(用户表),这两张表通过Flink实时写入 HDFS,并以Paimon格式进行存储,作为ODS层。
- 构建 DWD 表:将 order_info 和 user_info 表融合成为明细层宽表 order_info_detail。
- 创建 Flink 任务,对明细层宽表order_info_detail 进行查询,统计每个城市中每个用户的下单金额,并将结果写入下游 OLAP 引擎(本文以 MySQL 为例)的汇总表 user_order_st 中,用于进一步的分析和可视化。
4. 前提条件
- Dataphin 已绑定 Hadoop 计算引擎,Flink 计算引擎,并创建相应项目
- Dataphin 已绑定 MySQL 数据源和 绑定Paimon 数据源
5. 实现步骤
5.1 MySQL 数据源环境准备
- 利用 Dataphin 即席查询,创建数据库 SQL 任务,对 MySQL 数据源进行建表和数据插入操作
- 创建order_info和user_info表,并插入数据:
-- 创建order_info表 create table pf_mysql_order_info( order_id bigint primary key, order_time timestamp, user_id varchar(128), price bigint ); insert into pf_mysql_order_info values (1, '2024-07-01 00:00:00', '1001', 100) ,(2, '2024-07-01 01:00:00', '1001', 100) ,(3, '2024-07-01 02:00:00', '1001', 100) ,(4, '2024-07-01 03:00:00', '1001', 100) ; -- 创建user_info表 create table pf_mysql_user_info( user_id varchar(128) primary key, user_name varchar(500), user_gender varchar(10), user_city varchar(128) ); insert into pf_mysql_user_info values ('1001', 'alice', 'female', 'hangzhou') ,('1002', 'bob', 'male', 'hangzhou') ,('1003', 'jack', 'male', 'shanghai');
5.2 MySQL 表入湖
- 在实时计算项目中创建 MySQL 元表
新建mysql的order_info表,提交发布
新建mysql的user_info表,提交发布
- 在 dataphin 中创建 paimon 表
目前 dataphin不支持直接在产品上新建paimon表,用户需要提前通过离线 SQL 任务(或即席查询 SQL)创建 paimon 表(或在 hive cli 上通过 hive 创建 paimon 表)
dataphin中通过sql建表的方法如下:
- paimon建表依赖于额外的jar包,需要提前放到hive的lib目录下(参考文档:https://paimon.apache.org/docs/0.8/engines/hive/)
- 在 Dataphin 中通过离线 SQL 任务创建 paimon 表
CREATE TABLE if not exists ods_order_info( order_id bigint, order_time timestamp, user_id string, price bigint, PRIMARY KEY(order_id) disable novalidate ) STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'; CREATE TABLE if not exists ods_user_info( user_id string, user_name string, user_gender string, user_city string, PRIMARY KEY(user_id) disable novalidate ) STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';
- 新建paimon的order_info ods表,提交发布
- 新建 Flink SQL 任务,编写 ods_order_info的入湖任务,提交发布,然后运行。
insert into paimon_order_info select * from ${poc}.mysql_order_info;
运行大约几分钟之后,可以通过dataphin的离线查询sql任务,查询数据是否写入:
5) 新建paimon的user_info ods 元表,提交发布
6) 新建 Flink SQL 任务,编写 ods_user_info的入湖任务,提交发布,然后运行。
insert into paimon_user_info select * from ${poc}.mysql_user_info;
运行大约几分钟之后,可以通过dataphin的离线查询sql任务,查询数据是否写入:
5.3 cdm层构建宽表
- 如上的技术方案,利用离线 SQL 任务新建paimon表:
CREATE TABLE poc.cdm_order_detail( order_id bigint, order_time timestamp, user_id string, price bigint, user_name string, user_gender string, user_city string, PRIMARY KEY(order_id) disable novalidate ) STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';
新建paimon元表,并提交发布
- 创建 Flink SQL 任务,通过user_id把user信息关联到order表上形成明细宽表
insert into paimon_order_detail select order_id, order_time, o.user_id, price, c.user_name, c.user_gender, c.user_city from ( select *, proctime() as proc_time from ${poc}.paimon_order_info ) as o left join ${poc}.paimon_user_info FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.user_id = c.user_id;
提交发布,然后运行。运行大约几分钟之后,可以通过dataphin的离线查询sql任务,查询数据是否写入:
5.4 ads层计算出最终结果
最终将明细层宽表进行聚合计算,并将计算结果写入到mysql中。
- 利用数据库 SQL 任务创建 ads的mysql表:
create table pf_mysql_order_st( user_id varchar(128) primary key, user_name varchar(500), user_city varchar(128), amount bigint )
- 创建mysql元表:mysql_order_st
- 新建flink sql任务,做ads层的统计计算,并将结果写入 MySQL 数据库中:
insert into ${poc}.mysql_order_st select user_id, user_name, user_city, sum(price) as amount from paimon_order_detail group by user_id, user_name, user_city;
- 提交发布、运行任务。等待几分钟之后,就可以在 MySQL 数据库中查询到计算好的数据。
5.5 实时更新计算结果
我们向mysql表的order_info表新增加订单数据
insert into pf_mysql_order_info values (5, '2024-07-01 04:00:00', '1001', 100) ,(6, '2024-07-01 05:00:00', '1001', 100) ;
等待几分钟(paimon的数据可见性依赖于flink任务的checkpoint的时间周期),我们就会在结果表里看到新增的数据结果: