如何在Dataphin中构建Flink+Paimon流式湖仓方案

简介: 当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。

1. 背景

当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。


湖仓一体架构的核心技术支撑,来自于table-format技术。目前开源支持的有iceberg、hudi、delta和paimon。其中paimon是国内开源的,也是最年轻的成员。paimon集合前辈数据湖格式技术的优点,创新地结合了 Lake 格式和 LSM 结构,将实时流式更新引入 Lake 架构。支持使用 Flink 和 Spark 构建实时 Lakehouse 架构,用于流式和批处理操作。


本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。


2. 方案架构和优势

本文基于Flink+Paimon搭建流式湖仓的方案架构如下:

  1. Flink将数据源写入Paimon,形成ODS层。
  2. Flink订阅ODS层的变更数据(Changelog)进行加工,形成DWD层再次写入Paimon。
  3. 最后将宽表数据进行汇总计算,并写入下游 OLAP 查询引擎,用于数据可视化和数据分析。

image.png

2.1 优势

该方案有如下优势:

  • Paimon的每一层数据都可以在分钟级的延时内将变更传递给下游,将传统离线数仓的延时从小时级甚至天级降低至分钟级。
  • Paimon的每一层数据都可以直接接受变更数据,无需覆写分区,极大地降低了传统离线数仓数据更新与订正的成本,解决了中间层数据不易查、不易更新、不易修正的问题。
  • 模型统一,架构简化。ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Paimon中,可以降低架构复杂度,提高数据处理效率。


3. 业务场景

本文以某个电商平台为例,通过搭建一套流式湖仓,实现数据的加工清洗,并支持上层应用对数据的查询。

  1. 构建ODS 表:业务数据库实时入仓MySQL有order_info(订单表),user_info(用户表),这两张表通过Flink实时写入 HDFS,并以Paimon格式进行存储,作为ODS层。
  2. 构建 DWD 表:将 order_info 和 user_info 表融合成为明细层宽表 order_info_detail。
  3. 创建 Flink 任务,对明细层宽表order_info_detail 进行查询,统计每个城市中每个用户的下单金额,并将结果写入下游 OLAP 引擎(本文以 MySQL 为例)的汇总表 user_order_st 中,用于进一步的分析和可视化。

4. 前提条件

5. 实现步骤

5.1 MySQL 数据源环境准备

  1. 利用 Dataphin 即席查询,创建数据库 SQL 任务,对 MySQL 数据源进行建表和数据插入操作

  1. 创建order_info和user_info表,并插入数据:
-- 创建order_info表
create table pf_mysql_order_info(
  order_id bigint primary key,
  order_time timestamp,
  user_id varchar(128),
  price bigint
);
insert into pf_mysql_order_info values
  (1, '2024-07-01 00:00:00', '1001', 100)
    ,(2, '2024-07-01 01:00:00', '1001', 100)
  ,(3, '2024-07-01 02:00:00', '1001', 100)
  ,(4, '2024-07-01 03:00:00', '1001', 100)
;
-- 创建user_info表
create table pf_mysql_user_info(
  user_id varchar(128) primary key,
  user_name varchar(500),
  user_gender varchar(10),
  user_city varchar(128)
);
insert into pf_mysql_user_info values
  ('1001', 'alice', 'female', 'hangzhou')
  ,('1002', 'bob', 'male', 'hangzhou')
  ,('1003', 'jack', 'male', 'shanghai');

5.2 MySQL 表入湖

  1. 在实时计算项目中创建 MySQL 元表

新建mysql的order_info表,提交发布


新建mysql的user_info表,提交发布


  1. 在 dataphin 中创建 paimon 表


目前 dataphin不支持直接在产品上新建paimon表,用户需要提前通过离线 SQL 任务(或即席查询 SQL)创建 paimon 表(或在 hive cli 上通过 hive 创建 paimon 表

dataphin中通过sql建表的方法如下:

  1. paimon建表依赖于额外的jar包,需要提前放到hive的lib目录下(参考文档:https://paimon.apache.org/docs/0.8/engines/hive/
  2. 在 Dataphin 中通过离线 SQL 任务创建 paimon 表
CREATE TABLE if not exists  ods_order_info(
    order_id bigint,
    order_time timestamp,
    user_id string,
    price bigint,
    PRIMARY KEY(order_id) disable novalidate
)
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';
CREATE TABLE if not exists  ods_user_info(
    user_id string,
    user_name string,
    user_gender string,
    user_city string,
    PRIMARY KEY(user_id) disable novalidate
)
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';


  1. 新建paimon的order_info ods表,提交发布



  1.  新建 Flink SQL 任务,编写 ods_order_info的入湖任务,提交发布,然后运行。


insert into paimon_order_info 
select * from ${poc}.mysql_order_info;


运行大约几分钟之后,可以通过dataphin的离线查询sql任务,查询数据是否写入:


5) 新建paimon的user_info ods 元表,提交发布



6) 新建 Flink SQL 任务,编写 ods_user_info的入湖任务,提交发布,然后运行。

insert into paimon_user_info 
select * from ${poc}.mysql_user_info;


运行大约几分钟之后,可以通过dataphin的离线查询sql任务,查询数据是否写入:


5.3 cdm层构建宽表

  1. 如上的技术方案,利用离线 SQL 任务新建paimon表:


CREATE TABLE poc.cdm_order_detail(
    order_id bigint,
    order_time timestamp,
    user_id string,
    price bigint,
    user_name string,
    user_gender string,
    user_city string,
    PRIMARY KEY(order_id) disable novalidate
)
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';

新建paimon元表,并提交发布


  1. 创建 Flink SQL 任务,通过user_id把user信息关联到order表上形成明细宽表
insert into paimon_order_detail select 
    order_id,
    order_time,
    o.user_id,
    price,
    c.user_name,
    c.user_gender,
    c.user_city
 from (
    select *, proctime() as proc_time from ${poc}.paimon_order_info
    ) as o
left join ${poc}.paimon_user_info FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.user_id = c.user_id;


提交发布,然后运行。运行大约几分钟之后,可以通过dataphin的离线查询sql任务,查询数据是否写入:

5.4 ads层计算出最终结果

最终将明细层宽表进行聚合计算,并将计算结果写入到mysql中。

  1. 利用数据库 SQL 任务创建 ads的mysql表:
create table pf_mysql_order_st(
  user_id varchar(128) primary key,
  user_name varchar(500),
  user_city varchar(128),
  amount bigint
)


  1. 创建mysql元表:mysql_order_st


  1. 新建flink sql任务,做ads层的统计计算,并将结果写入 MySQL 数据库中:
insert into ${poc}.mysql_order_st
select 
    user_id,
    user_name,
    user_city,
    sum(price) as amount
from paimon_order_detail
group by user_id, user_name, user_city;


  1. 提交发布、运行任务。等待几分钟之后,就可以在 MySQL 数据库中查询到计算好的数据。

5.5 实时更新计算结果

我们向mysql表的order_info表新增加订单数据

insert into pf_mysql_order_info values
  (5, '2024-07-01 04:00:00', '1001', 100)
    ,(6, '2024-07-01 05:00:00', '1001', 100)
;


等待几分钟(paimon的数据可见性依赖于flink任务的checkpoint的时间周期),我们就会在结果表里看到新增的数据结果:

image.png


作者介绍
目录