Fluss 实战:用 Partial Update 构建实时宽表的新范式

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 传统流式数据管道通过多表 Join 构建宽表,如实时推荐引擎需整合用户偏好、购买记录等8个数据源,但此方法在大规模场景下状态管理复杂、资源消耗高且调试困难。Fluss 提出部分更新方案,基于主键将各数据源独立写入共享宽表,避免复杂 Join 操作。示例中,通过 Flink SQL 创建推荐、曝光、点击等表,并逐步插入数据实现宽表构建。最终,借助 Fluss 的高效合并机制,输出包含最新信息的统一视图,提升可扩展性和维护性。

传统流式数据管道通常需要在主键上 Join 多个表或流,以创建一个宽表。例如,假设你正在为一个电子商务平台构建实时推荐引擎。为了提供高度个性化的推荐,你的系统需要拥有每个用户的完整 360° 视图,包括:用户偏好、过往购买记录、点击流行为、购物车活动、产品评价、客服工单、广告曝光以及会员忠诚度状态等信息。

这至少涉及8个不同的数据源,每个数据源都会独立地产生更新。

在大规模场景下 Join 多个数据流虽然可以通过 Apache Flink 实现,但实际上非常具有挑战性且消耗大量资源。更具体地说,可能会导致以下问题:

  • Flink 中的状态规模可能变得非常庞大:因为系统需要缓存所有传入的事件,直到它们可以被 Join。在很多情况下,这些状态需要长期保留,甚至无限期保留。
  • 面临检查点(checkpoint)开销和反压(backpressure)问题:由于 Join 操作以及大规模状态的上传,可能会成为整个数据管道中的性能瓶颈。
  • 状态难以查看和调试:因为它们通常既庞大又复杂。这使得理解管道中发生了什么、以及为什么某些事件没有被正确处理变得困难。
  • 状态的 TTL(存活时间)可能导致结果不一致:因为事件可能在被 Join 之前就被丢弃了。这可能导致数据丢失,并最终输出错误的结果。

总体而言,这种方法不仅消耗大量的内存和 CPU 资源,还使作业的设计和维护变得更加复杂。

部分更新:一种基于 Fluss 的全新方案

Fluss 引入了一种更为优雅的解决方案:对主键表进行部分更新。

不同于在流处理作业中执行多路 Join,Fluss 允许每个数据流源根据主键独立地仅将其相关的列更新到一张共享的宽表中。在 Fluss 中,你可以定义一张宽表(例如,以 user_id为主键的 user_profile 表),其中包含来自所有数据源的所有可能字段。然后,每个源数据流只需将它所知道的部分字段写入到这张表中。

Fluss 的存储引擎会根据主键自动将这些部分更新合并在一起。本质上,Fluss 会为每个主键维护最新的完整数据,因此你无需再在 Flink 中管理庞大的 Join 状态。

在底层,当某个主键的部分更新到达时,Fluss 会查找该主键已有的记录,然后仅更新此次提供的特定列,其余列保持不变。合并后的结果会被作为该记录的新版本写回。这一切都是实时发生的,因此这张表始终都包含来自各个数据流的最新信息。

接下来,我们将通过一个具体的例子来更好地理解这一机制在实际中的工作方式。

示例:构建一个统一的宽表

你可以在 GitHub 上(链接)找到完整的源代码。
首先克隆仓库,然后运行 docker compose up 启动开发环境。最后,打开一个终端进入 jobmanager,并运行以下命令启动 Flink SQL CLI:

./bin/sql-client.sh
AI 代码解读

到目前为止很棒!👍

第一步:我们需要做的第一件事是创建一个 Flink Catalog,用于存储我们将要创建的表。让我们创建一个名为 fluss_catalog 的 Catalog,并使用这个 Catalog。

CREATE CATALOG fluss_catalog WITH (
    'type' = 'fluss',
    'bootstrap.servers' = 'coordinator-server:9123'
);

USE CATALOG fluss_catalog;
AI 代码解读

第二步:接下来我们创建 3 张表,代表用于构建推荐宽表的不同数据源。

-- Recommendations – model scores
CREATE TABLE recommendations (
    user_id  STRING,
    item_id  STRING,
    rec_score DOUBLE,
    rec_ts   TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');


-- Impressions – how often we showed something
CREATE TABLE impressions (
    user_id STRING,
    item_id STRING,
    imp_cnt INT,
    imp_ts  TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');

-- Clicks – user engagement
CREATE TABLE clicks (
    user_id  STRING,
    item_id  STRING,
    click_cnt INT,
    clk_ts    TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');

CREATE TABLE user_rec_wide (
    user_id   STRING,
    item_id   STRING,
    rec_score DOUBLE,   -- updated by recs stream
    imp_cnt   INT,      -- updated by impressions stream
    click_cnt INT,      -- updated by clicks stream
    PRIMARY KEY (user_id, item_id) NOT ENFORCED
) WITH ('bucket.num' = '3', 'table.datalake.enabled' = 'true');
AI 代码解读

第三步:当然,我们需要一些示例数据来进行操作,因此让我们继续向表中插入一些记录吧。💻

-- Recommendations – model scores
INSERT INTO recommendations VALUES
    ('user_101','prod_501',0.92 , TIMESTAMP '2025-05-16 09:15:02'),
    ('user_101','prod_502',0.78 , TIMESTAMP '2025-05-16 09:15:05'),
    ('user_102','prod_503',0.83 , TIMESTAMP '2025-05-16 09:16:00'),
    ('user_103','prod_501',0.67 , TIMESTAMP '2025-05-16 09:16:20'),
    ('user_104','prod_504',0.88 , TIMESTAMP '2025-05-16 09:16:45');
-- Impressions – how often each (user,item) was shown
INSERT INTO impressions VALUES
    ('user_101','prod_501', 3, TIMESTAMP '2025-05-16 09:17:10'),
    ('user_101','prod_502', 1, TIMESTAMP '2025-05-16 09:17:15'),
    ('user_102','prod_503', 7, TIMESTAMP '2025-05-16 09:18:22'),
    ('user_103','prod_501', 4, TIMESTAMP '2025-05-16 09:18:30'),
    ('user_104','prod_504', 2, TIMESTAMP '2025-05-16 09:18:55');
-- Clicks – user engagement
INSERT INTO clicks VALUES
    ('user_101','prod_501', 1, TIMESTAMP '2025-05-16 09:19:00'),
    ('user_101','prod_502', 2, TIMESTAMP '2025-05-16 09:19:07'),
    ('user_102','prod_503', 1, TIMESTAMP '2025-05-16 09:19:12'),
    ('user_103','prod_501', 1, TIMESTAMP '2025-05-16 09:19:20'),
    ('user_104','prod_504', 1, TIMESTAMP '2025-05-16 09:19:25');
AI 代码解读

注意:🚨 到目前为止我们运行的作业都是有界作业,因此它们在插入记录后就会完成。接下来我们将运行一些流式作业。请记住,每个作业都以并行度 3 运行,而我们的环境总共配置了 10 个 slot。因此,请务必关注 Flink Web UI,查看已使用和可用的 slot 数量,并在不再需要时停止一些作业,以释放资源。

第四步:此时,让我们打开一个新的终端并启动 Flink SQL CLI。在这个新终端中,请确保设置以下输出模式:

SET 'sql-client.execution.result-mode' = 'tableau';
AI 代码解读

随后运行:

SELECT * FROM user_rec_wide;
AI 代码解读

以便在我们从不同数据源向表中插入部分记录时,能够观察到表的输出结果。

第五步:让我们将推荐表中的记录插入到 user_rec_wide 宽表中。

-- Apply recommendation scores
INSERT INTO user_rec_wide (user_id, item_id, rec_score)
SELECT
    user_id,
    item_id,
    rec_score
FROM recommendations;
AI 代码解读

输出:请注意,在 user_rec_wide 表中,只有相关的列被更新,其余列则为 NULL。

Flink SQL> SELECT * FROM user_rec_wide;
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
| op |                        user_id |                        item_id |                      rec_score |     imp_cnt |   click_cnt |
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
| +I |                       user_101 |                       prod_501 |                           0.92 |      <NULL> |      <NULL> |
| +I |                       user_101 |                       prod_502 |                           0.78 |      <NULL> |      <NULL> |
| +I |                       user_104 |                       prod_504 |                           0.88 |      <NULL> |      <NULL> |
| +I |                       user_102 |                       prod_503 |                           0.83 |      <NULL> |      <NULL> |
| +I |                       user_103 |                       prod_501 |                           0.67 |      <NULL> |      <NULL> |
AI 代码解读

第六步:接下来,让我们将 impressions 表中的记录插入到 user_rec_wide 宽表中。

-- Apply impression counts
INSERT INTO user_rec_wide (user_id, item_id, imp_cnt)
SELECT
    user_id,
    item_id,
    imp_cnt
FROM impressions;
AI 代码解读

输出:请注意观察impressions表的记录是如何插入到user_rec_wide表中,并且 imp_cnt 列是如何更新的。

Flink SQL> SELECT * FROM user_rec_wide;
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
| op |                        user_id |                        item_id |                      rec_score |     imp_cnt |   click_cnt |
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
| +I |                       user_101 |                       prod_501 |                           0.92 |      <NULL> |      <NULL> |
| +I |                       user_101 |                       prod_502 |                           0.78 |      <NULL> |      <NULL> |
| +I |                       user_104 |                       prod_504 |                           0.88 |      <NULL> |      <NULL> |
| +I |                       user_102 |                       prod_503 |                           0.83 |      <NULL> |      <NULL> |
| +I |                       user_103 |                       prod_501 |                           0.67 |      <NULL> |      <NULL> |



| -U |                       user_101 |                       prod_501 |                           0.92 |      <NULL> |      <NULL> |
| +U |                       user_101 |                       prod_501 |                           0.92 |           3 |      <NULL> |
| -U |                       user_101 |                       prod_502 |                           0.78 |      <NULL> |      <NULL> |
| +U |                       user_101 |                       prod_502 |                           0.78 |           1 |      <NULL> |
| -U |                       user_104 |                       prod_504 |                           0.88 |      <NULL> |      <NULL> |
| +U |                       user_104 |                       prod_504 |                           0.88 |           2 |      <NULL> |
| -U |                       user_102 |                       prod_503 |                           0.83 |      <NULL> |      <NULL> |
| +U |                       user_102 |                       prod_503 |                           0.83 |           7 |      <NULL> |
| -U |                       user_103 |                       prod_501 |                           0.67 |      <NULL> |      <NULL> |
| +U |                       user_103 |                       prod_501 |                           0.67 |           4 |      <NULL> |
AI 代码解读

第七步:最后,让我们将 clicks 表中的记录插入到 user_rec_wide 宽表中。

-- Apply click counts
INSERT INTO user_rec_wide (user_id, item_id, click_cnt)
SELECT
    user_id,
    item_id,
    click_cnt
FROM clicks;
AI 代码解读

输出:请注意观察clicks 表的记录是如何插入到 user_rec_wide 表中的,以及 click_cnt 列是如何更新的。

Flink SQL> SELECT * FROM user_rec_wide;
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
| op |                        user_id |                        item_id |                      rec_score |     imp_cnt |   click_cnt |
+----+--------------------------------+--------------------------------+--------------------------------+-------------+-------------+
| +I |                       user_101 |                       prod_501 |                           0.92 |      <NULL> |      <NULL> |
| +I |                       user_101 |                       prod_502 |                           0.78 |      <NULL> |      <NULL> |
| +I |                       user_104 |                       prod_504 |                           0.88 |      <NULL> |      <NULL> |
| +I |                       user_102 |                       prod_503 |                           0.83 |      <NULL> |      <NULL> |
| +I |                       user_103 |                       prod_501 |                           0.67 |      <NULL> |      <NULL> |



| -U |                       user_101 |                       prod_501 |                           0.92 |      <NULL> |      <NULL> |
| +U |                       user_101 |                       prod_501 |                           0.92 |           3 |      <NULL> |
| -U |                       user_101 |                       prod_502 |                           0.78 |      <NULL> |      <NULL> |
| +U |                       user_101 |                       prod_502 |                           0.78 |           1 |      <NULL> |
| -U |                       user_104 |                       prod_504 |                           0.88 |      <NULL> |      <NULL> |
| +U |                       user_104 |                       prod_504 |                           0.88 |           2 |      <NULL> |
| -U |                       user_102 |                       prod_503 |                           0.83 |      <NULL> |      <NULL> |
| +U |                       user_102 |                       prod_503 |                           0.83 |           7 |      <NULL> |
| -U |                       user_103 |                       prod_501 |                           0.67 |      <NULL> |      <NULL> |
| +U |                       user_103 |                       prod_501 |                           0.67 |           4 |      <NULL> |


| -U |                       user_103 |                       prod_501 |                           0.67 |           4 |      <NULL> |
| +U |                       user_103 |                       prod_501 |                           0.67 |           4 |           1 |
| -U |                       user_101 |                       prod_501 |                           0.92 |           3 |      <NULL> |
| +U |                       user_101 |                       prod_501 |                           0.92 |           3 |           1 |
| -U |                       user_101 |                       prod_502 |                           0.78 |           1 |      <NULL> |
| +U |                       user_101 |                       prod_502 |                           0.78 |           1 |           2 |
| -U |                       user_104 |                       prod_504 |                           0.88 |           2 |      <NULL> |
| +U |                       user_104 |                       prod_504 |                           0.88 |           2 |           1 |
| -U |                       user_102 |                       prod_503 |                           0.83 |           7 |      <NULL> |
| +U |                       user_102 |                       prod_503 |                           0.83 |           7 |           1 |
AI 代码解读

提醒:‼️ 如前所述,请务必停止不再需要的作业,以释放资源。
现在,让我们切换到批处理模式,并查询 user_rec_wide 表的当前快照。
但在那之前,我们需要启动分层服务(Tiering Service,该服务支持将表迁移为Lakehouse表。
第八步:在 Coordinator Server 中打开一个新终端 💻,并运行以下命令来启动分层服务:

./bin/lakehouse.sh -D flink.rest.address=jobmanager -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=30s -D flink.parallelism.default=2
AI 代码解读

配置的检查点间隔为 flink.execution.checkpointing.interval=30s,因此请稍等片刻,直到第一个检查点创建完成,并且数据将迁移至 Lakehouse 表中。
第九步:最后,让我们切换到批处理模式,并查询 user_rec_wide 表的当前快照。

SET 'execution.runtime-mode' = 'batch';

Flink SQL> SELECT * FROM user_rec_wide;
+----------+----------+-----------+---------+-----------+
|  user_id |  item_id | rec_score | imp_cnt | click_cnt |
+----------+----------+-----------+---------+-----------+
| user_102 | prod_503 |      0.83 |       7 |         1 |
| user_103 | prod_501 |      0.67 |       4 |         1 |
| user_101 | prod_501 |      0.92 |       3 |         1 |
| user_101 | prod_502 |      0.78 |       1 |         2 |
| user_104 | prod_504 |      0.88 |       2 |         1 |
+----------+----------+-----------+---------+-----------+
5 rows in set (2.63 seconds)
AI 代码解读

🎉 就是这样!你已成功使用 Fluss 中的部分更新功能创建了一个统一的宽表。

总结

Fluss中的部分更新(Partial Updates)为流式数据打宽提供了替代性技术路径。

当所有数据源共享主键时(否则可灵活组合流式 Lookup Join),你可以转变思路:以增量方式更新一张统一的宽表,而非实时 Join 流。

这种方式最终带来了更具可扩展性、更易维护且更高效的流水线。工程师们可以减少在 Flink 状态管理、Checkpoint 和 Join 机制上的投入时间,而将更多精力放在提供新鲜、整合的数据上,从而支持实时分析和应用。借助 Fluss 来处理合并逻辑,从多个差异化的数据流中获得一个统一且最新的数据视图变得更加优雅。😁

在你离开之前 😊 别忘了在 GitHub 上给 Fluss 🌊 点个 ⭐,送上一份 ❤️ 哦!


更多内容


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
8
8
0
1615
分享
相关文章
基于云原生多模数据库 Lindorm 构建物联网应用赛题解析 | 学习笔记
快速学习基于云原生多模数据库 Lindorm 构建物联网应用赛题解析
基于云原生多模数据库 Lindorm 构建物联网应用赛题解析 | 学习笔记
阿里云 Lindorm联合EMQ ,构建新一代 IoT 全链数据解决方案
近日,阿里云 Lindorm 云原生数据库团队与EMQ 核心研发团队共同宣布:双方联合推出的新一代 IoT 全链数据解决方案已成功完成验证!
302522 2
阿里云 Lindorm联合EMQ ,构建新一代 IoT 全链数据解决方案
|
7月前
|
从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理
本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。
190 1
多模数据库Lindorm再升级:对接Dataphin,打通数据治理“最后一公里”
Lindorm通过与Dataphin的深度整合,进一步解决了数据集成和数据治理的问题,为企业提供更加高效和更具性价比的方案。
多模数据库Lindorm再升级:对接Dataphin,打通数据治理“最后一公里”
DataphinV4.1大升级: 支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
Dataphin 是阿里巴巴旗下的一个智能数据建设与治理平台,旨在帮助企业构建高效、可靠、安全的数据资产。在V4.1版本升级中,Dataphin 引入了Lindorm等多项新功能,并开启公共云半托管模式,优化代码搜索,为用户提供更加高效、灵活、安全的数据管理和运营环境,提升用户体验,促进企业数据资产的建设和价值挖掘。
1830 3
DataphinV4.1大升级: 支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
DataphinV4.1大升级:支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
DataphinV4.1大升级:支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
182 3
实时计算 Flink版产品使用问题之基于宽表数据展示实时报表,该如何实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
DataWorks产品使用合集之没有使用独享资源组,如何将Lindorm中的数据导出或迁移到其他数据存储服务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
111 0
时序数据库工具grafana里的$timeFilter查询1个小时内的数据如何写查询条件
【6月更文挑战第24天】时序数据库工具grafana里的$timeFilter查询1个小时内的数据如何写查询条件
1339 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问