基于阿里云 Flink+Hologres 搭建实时数仓

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文为您介绍如何通过实时计算 Flink 版和实时数仓 Hologres 搭建实时数仓。

背景信息

随着社会数字化发展,企业对数据时效性的需求越来越强烈。除传统的面向海量数据加工场景设计的离线场景外,大量业务需要解决面向实时加工、实时存储、实时分析的实时场景问题。传统离线数仓搭建的方法论比较明确,通过定时调度实现数仓分层(ODS->DWD->DWS->ADS);但对于实时数仓的搭建,目前缺乏明确的方法体系。基于 Streaming Warehouse 理念,实现数仓分层之间实时数据的高效流动,可以解决实时数仓分层问题。

方案架构

实时计算 Flink 版是强大的流式计算引擎,支持对海量实时数据高效处理。Hologres 是一站式实时数仓,支持数据实时写入与更新,实时数据写入即可查。Hologres 与 Flink 深度集成,能够提供一体化的实时数仓联合解决方案。本文基于 Flink+Hologres 搭建实时数仓的方案架构如下:

  1. Flink 将数据源写入 Hologres,形成 ODS 层。
  2. Flink 订阅 ODS 层的 Binlog 进行加工,形成 DWD 层再次写入 Hologres。
  3. Flink 订阅 DWD 层的 Binlog,通过计算形成 DWS 层,再次写入 Hologres。
  4. 最后由 Hologres 对外提供应用查询。

该方案有如下优势:

  • Hologres 的每一层数据都支持高效更新与修正、写入即可查,解决了传统实时数仓解决方案的中间层数据不易查、不易更新、不易修正的问题。
  • Hologres 的每一层数据都可单独对外提供服务,数据的高效复用,真正实现数仓分层复用的目标。
  • 模型统一,架构简化。实时 ETL 链路的逻辑是基于 Flink SQL 实现的;ODS 层、DWD 层和 DWS 层的数据统一存储在 Hologres 中,可以降低架构复杂度,提高数据处理效率。

该方案依赖于 Hologres 的 3 个核心能力,详情如下表所示。

Hologres核心能力

详情

Binlog

Hologres提供Binlog能力,用于驱动Flink进行实时计算,以此作为流式计算的上游。Hologres的Binlog能力详情请参见订阅Hologres Binlog

行列共存

Hologres支持行列共存的存储格式。一张表同时存储行存数据和列存数据,并且两份数据强一致。该特性保证中间层表不仅可以作为Flink的源表,也可以作为Flink的维表进行主键点查与维表Join,还可以供其他应用(OLAP、线上服务等)查询。Hologres的行列共存能力详情请参见表存储格式:列存、行存、行列共存

资源强隔离

Hologres实例的负载较高时,可能影响中间层的点查性能。Hologres支持通过主从实例读写分离部署(共享存储)

计算组实例架构

实现资源强隔离,从而保证Flink对Hologres Binlog的数据拉取不影响线上服务。

实践场景

本文以某个电商平台为例,通过搭建一套实时数仓,实现数据的实时加工清洗和对接上层应用数据查询,形成实时数据的分层和复用,支撑各个业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化推荐等多个业务场景。

1、构建 ODS 层:业务数据库实时入仓

MySQL 有 orders(订单表),orders_pay(订单支付表),product_catalog(商品类别字典表)3 张业务表,这 3 张表通过Flink实时同步到 Hologres 中作为 ODS 层。

image.png

2、构建 DWD 层:实时主题宽表

将订单表、商品类别字典表、订单支付表进行实时打宽,生成 DWD 层宽表。

image.png

3、构建 DWS 层:实时指标计算

实时消费宽表的 binlog,事件驱动的聚合出相应的 DWS 层指标表。

image.png

前提条件

  • 已购买独享通用型 Hologres 实例,详情请参见购买 Hologres
    购买实例后,需要创建 order_dw 数据库和用户(为用户赋予 admin 权限),推荐使用简单权限模型创建数据库,详情请参见简单权限模型的使用DB 管理

说明:

  • Hologres1.3 版本在创建完数据库后,需要执行 create extension hg_binlog 命令才能开启 binlog 扩展。
  • Hologres2.0 之后版本默认开启 binlog 扩展,无需手动执行。


说明:Flink 全托管需要与 Hologres 实例处于相同 VPC 和相同可用区。


  • 已准备 MySQL CDC 数据源,order_dw 数据库中的三张业务表的建表 DDL 以及插入的数据如下。
CREATE TABLE `orders` (
  order_id bigint not null primary key,
  user_id varchar(50) not null,
  shop_id bigint not null,
  product_id bigint not null,
  buy_fee numeric(20,2) not null,   
  create_time timestamp not null,
  update_time timestamp not null default now(),
  state int not null 
);
CREATE TABLE `orders_pay` (
  pay_id bigint not null primary key,
  order_id bigint not null,
  pay_platform int not null, 
  create_time timestamp not null
);
CREATE TABLE `product_catalog` (
  product_id bigint not null primary key,
  catalog_name varchar(50) not null
);
-- 准备数据
INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
INSERT INTO orders VALUES
(100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
(100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
(100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
(100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
(100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
(100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
(100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
INSERT INTO orders_pay VALUES
(2001, 100001, 1, '2023-02-15 17:40:56'),
(2002, 100002, 1, '2023-02-15 17:40:56'),
(2003, 100003, 0, '2023-02-15 17:40:56'),
(2004, 100004, 0, '2023-02-15 17:40:56'),
(2005, 100005, 0, '2023-02-15 18:40:56'),
(2006, 100006, 0, '2023-02-15 18:40:56'),
(2007, 100007, 0, '2023-02-15 18:40:56');


使用限制

  • 仅实时计算引擎 VVR 6.0.7 及以上版本支持该实时数仓方案。
  • 仅 1.3 及以上版本的 Hologres 支持该实时数仓方案。

构建实时数仓

管理元数据

1、创建 Hologres Catalog。

实时计算控制台上,新建一个名为 test 的 SQL 作业,将如下代码拷贝到 test 作业的 SQL 编辑器上,修改目标参数取值后,选中代码片段后单击左侧代码行上的运行

CREATE CATALOG dw WITH (
  'type' = 'hologres',
  'endpoint' = '<ENDPOINT>', 
  'username' = '<USERNAME>',
  'password' = '<PASSWORD>',
  'dbname' = 'order_dw',
  'binlog' = 'true', -- 创建catalog时可以设置源表、维表和结果表支持的with参数,之后在使用此catalog下的表时会默认添加这些默认参数。
  'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。
  'cdcmode' = 'true',
  'connectionpoolname' = 'the_conn_pool',
  'ignoredelete' = 'true',  -- 宽表merge需要开启,防止回撤。
  'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。
  'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。
  'table_property.binlog.level' = 'replica', --也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。
  'table_property.binlog.ttl' = '259200'
);

您需要修改以下参数取值为您实际 Hologres 服务信息。

参数

说明

备注

endpoint

Hologres的Endpoint地址。

详情请参见实例配置

username

阿里云账号的AccessKey。

当前配置的AccessKey对应的用户需要能够访问所有的Hologres数据库,Hologres数据库权限请参见Hologres权限模型概述

password

阿里云账号的AccessSecret。

说明:创建 Catalog 时可以设置默认的源表、维表和结果表的 WITH 参数,也可以设置创建 Hologres 物理表的默认属性,例如上方 table_property 开头的参数。详情请参见管理Hologres Catalog实时数仓 Hologres WITH 参数

2、创建 MySQL Catalog

实时计算控制台,将如下代码拷贝到 test 作业的 SQL 编辑器上,修改目标参数取值后,选中代码片段后单击左侧代码行上的运行

CREATE CATALOG mysqlcatalog WITH(
  'type' = 'mysql',
  'hostname' = '<hostname>',
  'port' = '<port>',
  'username' = '<username>',
  'password' = '<password>',
  'default-database' = 'order_dw'
);

您需要修改以下参数取值为您实际 MySQL 服务信息。

参数

说明

hostname

MySQL数据库的IP地址或者Hostname。

port

MySQL数据库服务的端口号,默认值为3306。

username

MySQL数据库服务的用户名。

password

MySQL数据库服务的密码。

构建 ODS 层:业务数据库实时入仓

基于 Catalog 的CREATE DATABASE AS(CDAS)语句功能,可以一次性把 ODS 层建出来。ODS 层一般不直接做 OLAP 或 SERVING(KV 点查),主要作为流式作业的事件驱动,开启 binlog 即可满足需求。

1、创建 CDAS 同步作业 ODS。

a. 在实时计算控制台上,新建名为 ODS 的 SQL 流作业,并将如下代码拷贝到 SQL 编辑器。

CREATE DATABASE IF NOT EXISTS dw.order_dw   -- 创建catalog时设置了table_property.binlog.level参数,因此通过CDAS创建的所有表都开启了binlog。
AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根据需要选择上游数据库需要入仓的表。
/*+ OPTIONS('server-id'='8001-8004') */ ;   -- 指定mysql-cdc源表。


b. 单击右上方的部署,进行作业部署。

c. 单击左侧导航栏的作业运维,单击刚刚部署的ODS作业操作列的启动,启动作业。

2、查看 MySQL 同步到 Hologres 的 3 张表数据。

HoloWeb 开发页面连接 Hologres 实例并登录目标数据库后,在 SQL 编辑器上执行如下命令。

---查orders中的数据。
SELECT * FROM orders;
---查orders_pay中的数据。
SELECT * FROM orders_pay;
---查product_catalog中的数据。
SELECT * FROM product_catalog;

构建 DWD 层:实时主题宽表

1、通过 Flink Catalog 功能在 Hologres 中建 DWD 层的宽表 dwd_orders。

实时计算控制台上,将如下代码拷贝到 test 作业的 SQL 编辑器后,选中目标片段后单击左侧代码行上的运行

-- 宽表字段要nullable,因为不同的流写入到同一张结果表,每一列都可能出现null的情况。
CREATE TABLE dw.order_dw.dwd_orders (
  order_id bigint not null primary key,
  order_user_id string,
  order_shop_id bigint,
  order_product_id bigint,
  order_product_catalog_name string,
  order_fee numeric(20,2),
  order_create_time timestamp,
  order_update_time timestamp,
  order_state int,
  pay_id bigint,
  pay_platform int comment 'platform 0: phone, 1: pc', -- catalog建表可以设置注释。
  pay_create_time timestamp
);
-- 支持通过catalog修改Hologres物理表属性。
ALTER TABLE dw.order_dw.dwd_orders SET (
  'table_property.binlog.ttl' = '604800' --修改binlog的超时时间为一周。
);

2、实现实时消费 ODS 层 orders、orders_pay 表的 binlog。

实时计算控制台上,新建名为 DWD 的 SQL 作业,并将如下代码拷贝到 SQL 编辑器后,部署并启动作业。通过如下 SQL 作业,orders 表会与 product_catalog 表进行维表关联,将最终结果写入 dwd_orders 表中,实现数据的实时打宽。

BEGIN STATEMENT SET;
INSERT INTO dw.order_dw.dwd_orders 
 (
   order_id,
   order_user_id,
   order_shop_id,
   order_product_id,
   order_fee,
   order_create_time,
   order_update_time,
   order_state,
   order_product_catalog_name
 ) SELECT o.*, dim.catalog_name 
   FROM dw.order_dw.orders as o
   LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
   ON o.product_id = dim.product_id;
INSERT INTO dw.order_dw.dwd_orders 
  (pay_id, order_id, pay_platform, pay_create_time)
   SELECT * FROM dw.order_dw.orders_pay;
END;

3、查看宽表 dwd_orders 数据。

HoloWeb 开发页面连接 Hologres 实例并登录目标数据库后,在 SQL 编辑器上执行如下命令。

SELECT * FROM dwd_orders;

构建 DWS 层:实时指标计算

1、通过 Flink Catalog 功能,在 Hologres 中创建 dws 层的聚合 dws_users 以及 dws_shops。

实时计算控制台上,将如下代码拷贝到 test 作业的 SQL 编辑器,选中目标片段后单击左侧代码行上的运行

-- 用户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_users (
  user_id string not null,
  ds string not null,
  paied_buy_fee_sum numeric(20,2) not null, -- 当日完成支付的总金额。
  primary key(user_id,ds)  NOT ENFORCED
);
-- 商户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_shops (
  shop_id bigint not null,
  ds string not null,
  paied_buy_fee_sum numeric(20,2) not null, -- 当日完成支付总金额。
  primary key(shop_id,ds)  NOT ENFORCED
);

2、实时消费 DWD 层的宽表 dw.order_dw.dwd_orders,在 Flink 中做聚合计算,最终写入 Hologres 中的 DWS 表。

实时计算控制台上,新建名为 DWS 的 SQL 流作业,并将如下代码拷贝到 SQL 编辑器,部署并启动作业。

BEGIN STATEMENT SET;
INSERT INTO dw.order_dw.dws_users
  SELECT 
    order_user_id,
    DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
    SUM (order_fee)
    FROM dw.order_dw.dwd_orders c
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。
    GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
INSERT INTO dw.order_dw.dws_shops
  SELECT 
    order_shop_id,
    DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
    SUM (order_fee)
   FROM dw.order_dw.dwd_orders c
   WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。
   GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
END;

3、查看 DWS 层的聚合结果,其结果会根据上游数据的变更实时更新。

HoloWeb 开发页面连接 Hologres 实例并登录目标数据库后,在 SQL 编辑器上执行如下命令。

  • 查询 dws_users 表结果。
SELECT * FROM dws_users;

  • 查询 dws_shops 表结果。
SELECT * FROM dws_shops;

数据探查

如果对中间结果需要即系(Ad-hoc)性质的业务数据探查,或者对最终计算结果进行数据正确性排查,此方案的每一层数据都实现了持久化,可以便捷的探查中间过程。

  • 流模式探查
    a. 新建并启动数据探查流作业。
    实时计算控制台上,新建名为 Data-exploration 的 SQL 流作业,并将如下代码拷贝到 SQL 编辑器后,部署并启动作业。
-- 流模式探查,打印到print可以看到数据的变化情况。
CREATE TEMPORARY TABLE print_sink(
  order_id bigint not null primary key,
  order_user_id string,
  order_shop_id bigint,
  order_product_id bigint,
  order_product_catalog_name string,
  order_fee numeric(20,2),
  order_create_time timestamp,
  order_update_time timestamp,
  order_state int,
  pay_id bigint,
  pay_platform int,
  pay_create_time timestamp
) WITH (
  'connector' = 'print'
);
INSERT INTO print_sink SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --这里的startTime是binlog生成的时间
WHERE order_user_id = 'user_001';

b. 查看数据探查结果。
作业运维详情页面,单击目标作业名称,在作业探查页签下左侧运行日志页签,单击运行 Task Managers 页签下的 Path, ID。在 Stdout 页面搜索 user_001 相关的日志信息。

  • 批模式探查
    实时计算控制台上,创建 SQL 流作业,并将如下代码拷贝到 SQL 编辑器后,单击调试。详情请参见作业调试
    批模式探查是获取当前时刻的终态数据,在 Flink 作业开发界面调试结果如下图所示。
SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ 
WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; --批量模式支持filter下推,提升批作业执行效率。

使用实时数仓

上一小节展示了通过 Flink Catalog,可以仅在 Flink 侧搭建一个基于 Flink 和 Hologres的Streaming Warehouse 实时分层数仓。本节则展示数仓搭建完成之后的一些简单应用场景。

Key-Value 服务

根据主键查询 DWS 层的聚合指标表,支持百万级 RPS。

HoloWeb 开发页面查询指定用户指定日期的消费额的代码示例如下。

-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';

明细查询

对 DWD 层宽表进行 OLAP 分析。

HoloWeb 开发页面查询某个客户 23 年 2 月特定支付平台支付的订单明细的代码示例如下。

-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;

实时报表

基于 DWD 层宽表数据展示实时报表,支持秒级响应。

HoloWeb 开发页面查询 23 年 2 月内每个品类的订单总量和订单总金额的代码示例如下。

-- holo sql
SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD'),
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  1, 2
ORDER BY
  1, 2;



更多内容


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:

0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)

了解活动详情:https://free.aliyun.com/?pipCode=sc

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
20天前
|
DataWorks 数据挖掘 大数据
方案实践测评 | DataWorks集成Hologres构建一站式高性能的OLAP数据分析
DataWorks在任务开发便捷性、任务运行速度、产品使用门槛等方面都表现出色。在数据处理场景方面仍有改进和扩展的空间,通过引入更多的智能技术、扩展数据源支持、优化任务调度和可视化功能以及提升团队协作效率,DataWorks将能够为企业提供更全面、更高效的数据处理解决方案。
|
3月前
|
人工智能 自然语言处理 关系型数据库
阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成
近日,阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成。
|
3月前
|
OLAP
解决方案|基于hologres搭建轻量OLAP分析平台获奖名单公布!
解决方案|基于hologres搭建轻量OLAP分析平台获奖名单公布!
|
3月前
|
DataWorks 数据挖掘 关系型数据库
基于hologres搭建轻量OLAP分析平台解决方案评测
一文带你详细了解基于hologres搭建轻量OLAP分析平台解决方案的优与劣
493 9
|
4月前
|
数据可视化 数据挖掘 OLAP
基于 Hologres 搭建轻量 OLAP 分析平台评测报告
【9月更文第6天】开作为互联网手游公司的产品经理和项目经理,数据分析对于我们的业务至关重要。我们一直在寻找高效、可靠的数据分析解决方案,以更好地了解玩家行为、优化游戏体验和提升运营效率。近期,我们体验并部署了《基于 Hologres 搭建轻量 OLAP 分析平台》解决方案,以下是我们对该方案的评测报告。
95 12
基于 Hologres 搭建轻量 OLAP 分析平台评测报告
|
4月前
|
SQL DataWorks 数据挖掘
手把手体验Hologres的OLAP数据分析
本方案基于阿里云实时数仓Hologres与DataWorks数据集成,实现数据库RDS到Hologres的实时同步,充分发挥Hologres强大的查询分析能力,提供一站式高性能OLAP数据分析。Hologres支持标准SQL,无缝对接主流BI工具,适用于多种场景。方案包括创建VPC、开通Hologres、开通DataWorks、创建公网NAT、建立Hologres表、实时同步数据、OLAP分析及资源清理等步骤,为轻量级OLAP分析平台搭建奠定基础。
|
3月前
|
人工智能 分布式计算 数据管理
阿里云位居 IDC MarketScape 中国实时湖仓评估领导者类别
国际数据公司( IDC )首次发布了《IDC MarketScape: 中国实时湖仓市场 2024 年厂商评估》,阿里云在首次报告发布即位居领导者类别。
|
3月前
|
SQL 分布式计算 数据挖掘
加速数据分析:阿里云Hologres在实时数仓中的应用实践
【10月更文挑战第9天】随着大数据技术的发展,企业对于数据处理和分析的需求日益增长。特别是在面对海量数据时,如何快速、准确地进行数据查询和分析成为了关键问题。阿里云Hologres作为一个高性能的实时交互式分析服务,为解决这些问题提供了强大的支持。本文将深入探讨Hologres的特点及其在实时数仓中的应用,并通过具体的代码示例来展示其实际应用。
267 0
|
4月前
|
运维 数据挖掘 OLAP
阿里云Hologres:一站式轻量级OLAP分析平台的全面评测
在数据驱动决策的今天,企业对高效、灵活的数据分析平台的需求日益增长。阿里云的Hologres,作为一站式实时数仓引擎,提供了强大的OLAP(在线分析处理)分析能力。本文将对Hologres进行深入评测,探讨其在多源集成、性能、易用性以及成本效益方面的表现。
187 7
|
4月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版