汽车之家基于 Apache Flink 的跨数据库实时物化视图探索

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 汽车之家在基于 Flink 的实时物化视图的一些实践经验与探索

本文转载自「之家技术」,作者刘首维。介绍了汽车之家在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。主要内容为:

  1. 系统分析与问题拆解
  2. 问题解决与系统实现
  3. 实时物化视图实践
  4. 限制与不足
  5. 总结与展望

GitHub 地址
https://github.com/apache/flink
欢迎关注 Flink~

前言

物化视图这一功能想必大家都不陌生,我们可以通过使用物化视图,将预先设定好的复杂 SQL 逻辑,以增量迭代的形式实时 (按照事务地) 更新结果集,从而通过查询结果集来避免每次查询复杂的开销,从而节省时间与计算资源。事实上,很多数据库系统和 OLAP 引擎都不同程度地支持了物化视图。另一方面,Streaming SQL 本身就和物化视图有着很深的联系,那么基于 Apche Flink (下称 Flink) SQL 去做一套实时物化视图系统是一件十分自然而然的事情了。

本文介绍了汽车之家 (下称之家) 在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。希望能给大家带来一些启发,共同探索这一领域。

一、系统分析与问题拆解

Flink 在 Table & SQL 模块做了大量的工作,Flink SQL 已经实现了一套成熟与相对完备的 SQL 系统,同时,我们也在 Flink SQL 上有着比较多的技术和产品积累,直接基于 Flink SQL 本身就已经解决了构建实时物化系统的大部分问题,而唯一一个需要我们解决的问题是如何不重不漏地生成数据源表对应的语义完备的 Changelog DataStream,包括增量和全量历史两部分。

虽然规约到只剩一个问题,但是这个问题解决起来还是比较困难的,那我们将这个问题继续拆解为以下几个子问题:

1.  加载全量数据;
2.  加载增量数据;
3.  增量数据与全量数据整合。

二、问题解决与系统实现

问题一:基于数据传输平台的增量数据读取

增量数据加载还是相对比较好解决的,我们直接复用实时数据传输平台的基础建设。数据传输平台[1] 已经将 Mysql / SqlServer / TiDB 等增量数据以统一的数据格式写入到特定的 Kafka Topic 中,我们只要获取到对应的 Kafka Topic 就可以进行读取即可。

问题二:支持 checkpoint 的全量数据加载

对于全量数据载入,我们先后写了两个版本。

第一版我们用 Legacy Source 写了一套 BulkLoadSourceFunction,这一版的思路比较朴素,就是全量从数据源表进行查询。这个版本确实能完成全量数据的加载,但是问题也是比较明显的。如果在 bulk load 阶段作业发生了重启,我们就不得不重新进行全量数据加载。对于数据量大的表,这个问题带来的后果还是比较严重的。

对于第一版的固有问题,我们一直都没有特别好的对策,直到 Flink-CDC[2] 2.0 的发布。我们参考了 Flink-CDC 的全量数据加载阶段支持 Checkpoint 的思路,基于 FLIP-27 开发了新的 BulkLoadSource。第二版不论在性能上还是可用性上,对比第一版都有了大幅提升。

问题三:基于全局版本的轻量 CDC 数据整合算法

这三个子问题中,问题三的难度是远大于前面两个子问题的。这个问题的朴素思路或许很简单,我们只要按照 Key 缓存全部数据,然后根据增量数据流来触发 Changelog DataStream 更新即可。

事实上我们也曾按照这个思路开发了一版整合逻辑的算子。这版算子对于小表还是比较 work 的,但是对于大表,这种思路固有的 overhead 开始变得不可接受。我们曾用一张数据量在 12 亿,大小约 120G 的 SqlServer 表进行测试,本身就巨大的数据再加上 JVM 上不可避免的膨胀,状态大小变得比较夸张。经过这次测试,我们一致认为这样粗放的策略似乎不适合作为生产版本发布,于是我们不得不开始重新思考数据整合的算法与策略。

在谈论我们的算法设计思路之前,我不得不提到 DBLog[3] 的算法设计, 这个算法的核心思路利用 watermark 对历史数据进行标识,并和对应的增量数据进行合并,达到不使用锁即可完成整个增量数据和历史数据的整合,Flink-CDC 也是基于这个思路进行的实现与改进。在相关资料搜集和分析的过程中,我们发现我们的算法思路与 DBLog 的算法的核心思路非常相似, 但是是基于我们的场景和情况进行了设计与特化。

首先分析我们的情况:

  • 增量数据需要来自于数据传输平台的 Kafka Topic;
  • 增量数据的是 at least once 的;
  • 增量数据是存在全序版本号的。

结合上述情况进行分析,我们来规约一下这个算法必须要达成的目标:

  • 保证数据的 Changelog Stream,数据完整,Event (RowKind) 语义完备
  • 保证该算法的 overhead 是可控的;
  • 保证算法实现的处理性能是足够高效;
  • 保证算法实现不依赖任何来自于 Flink 外部的系统或者功能。

经过大家的分析与讨论后,我们设计出了一套数据整合的算法,命名为 Global Version Based Pause-free Change-Data-Capture Algorithm

3.1 算法原理

我们同时读入 BulkLoadSource 的全量数据与 RealtimeChangelogSource 增量数据,并根据主键进行 KeyBy 与 Connect,而算法的核心逻辑主要由之后的 KeyedCoProcess 阶段完成。下面交待几个关键的字段值:

  • SearchTs:全量数据从数据源查询出来的时间戳;
  • Watermark:基于增量数据在数据库里产生的时间戳生成;
  • Version:全序版本号,全量数据是 0,即一定最小版本。

KeyedCoProcess 收到全量数据后,不会直接发送,而是先缓存起来,等到 Watermark 的值大于该 SearchTs 后发送并清除对应 version0 版本数据的缓存。在等待的期间,如果有对应的 Changlog Data,就将被缓存的 Version0 全量数据丢弃,然后处理 Changelog Data 并发送。在整个数据处理的流程中,全量数据和增量数据都是同时进行消费与处理的,完全不需要引入暂停阶段来进行数据的整合。

image

             增量数据在全量数据发送 watermark 之前到来,只发送增量数据即可,全量数据直接丢弃        

image

             全量数据发送 watermark 到达后,仍未有对应的增量数据,直接发送全量数据

3.2 算法实现

我们决定以 Flink Connector 的形式开展算法的实现,我们以接入 SDK 的名字 Estuary 为该 Connector 命名。通过使用 DataStreamScanProvider,来完成 Source 内部算子间的串联,Source 的算子组织如下图 (chain 到一起的算子已拆开展示)。

image

  • BulkLoadSource / ChangelogSource 主要负责数据的读入和统一格式处理;
  • BulkNormalize / ChangelogNormalize 主要是负责处理数据运行时信息的添加与覆盖,主键语义处理等工作;
  • WatermarkGenerator 是针对算法工作需求定制的 Watermark 生成逻辑的算子;
  • VersionBasedKeyedCoProcess 就是核心的处理合并逻辑和 RowKind 语义完备性的算子。

算法实现的过程中还是有很多需要优化或者进行权衡的点。全量数据进入 CoProcess 数据后,会首先检查当前是否处理过更大版本的数据,如果没有的话才进行处理,数据首先会被存入 State 中并根据 SearchTs + T (T 是我们设置的固有时延) 注册 EventTimeTimer。如果没有高版本的数据到来,定时器触发发送 Version 0 的数据,否则直接抛弃改为发送 RowKind 语义处理好的高版本增量数据。

另一方面,避免状态的无限增长,当系统判定 BulkLoad 阶段结束后,会结束对相关 Flink State 的使用,存在的 State 只要等待 TTL 过期即可。

另外,我们针对在数据同步且下游 Sink 支持 Upsert 能力的场景下,开发了特别优化的超轻量模式,可以以超低的 overhead 完成全量+增量的数据同步

开发完成后,我们的反复测试修改与验证,完成 MVP 版本的开发。

三、实时物化视图实践

MVP 版本发布后,我们与用户同学一起,进行了基于 Flink 的物化视图试点。

1. 基于多数据源复杂逻辑的 Data Pipeline 实时化

下面是用户的一个真实生产需求:有三张表,分别来自于 TiDB /。SqlServer / Mysql,数据行数分别为千万级 / 亿级 / 千万级,计算逻辑相对复杂,涉及到去重,多表 Join。原有通过离线批处理产生 T+1 的结果表。而用户希望尽可能降低该 Pipeline 的延迟。

由于我们使用的 TiCDC Update 数据尚不包含 -U 部分,故 TiDB 表的整合算法还是采取 Legacy Mode 进行加载。

我们与用户沟通,建议他们以批处理的思路去编写 Flink SQL,把结果的明细数据的数据输出到 StarRocks 中。用户也在我们的协助下,较为快速地完成了 SQL 的开发,任务的计算拓补图如下:

image

结果是相当让人惊喜的!我们成功地在保证了数据准确性的情况下,将原来天级延迟的 Pipeline 降低至 10s 左右的延迟。数据也从原来查询 Hive 变为查询 StarRocks,不论从数据接入,数据预计算,还是数据计算与查询,实现了全面的实时化。另一方面,三张表每秒的增量最大不超过 300 条,且该任务不存在更新放大的问题,所以资源使用相当的少。根据监控反馈的信息,初始化阶段完成后,整个任务 TM 部分只需要使用 1 个 Cpu (on YARN),且 Cpu 使用常态不超过 20%。对比原来批处理的资源使用,无疑也是巨大提升。

2. 数据湖场景优化

正如上文提到的,对于数据同步,我们做了专门的优化。只需要使用专用的 Source 表,就可以一键开启历史数据 + 增量数据数据同步,大大简化了数据同步的流程。我们目前尝试使用该功能将数据同步至基于 Iceberg 的数据湖中,从数据同步层面大幅提升数据新鲜度。

image

四、限制与不足

虽然我们在这个方向的探索取得了一定成果,但是仍有一定的限制和不足。

1. 服务器时钟的隐式依赖

仔细阅读上面算法原理,我们会发现,不论是 SearchTs 的生成还是 Watermark 的生成,实际上最后都依赖了服务器系统的时钟,而非依赖类似 Time Oracle 机制。我们虽然算法实现上引入固有延迟去规避这个问题,但是如果服务器出现非常严重时钟不一致,超过固有延迟的话,此时 watermark 是不可靠的,有可能会造成处理逻辑的错误。

经确认,之家服务器时钟会进行校准操作。

2. 一致性与事务

事实上我们目前这套实现没有任何事务相关的保证机制,仅能承诺结果的最终一致性,最终一致性其实是一种相当弱的保证。就拿上文提到的例子来说,如果其中一张表存在 2 个小时的消费延迟,另一张表基本不存在延迟,这个时候两表 Join 产生的结果其实是一种中间状态,或者说对于外部系统应该是不可见的。

为了完成更高的一致性保证,避免上面问题的产生,我们自然会想到引入事务提交机制。然而目前我们暂时没有找到比较好的实现思路,但是可以探讨下我们目前的思考。

2.1 如何定义事务

事务这个概念想必大家或多或少都有认识,在此不多赘述。如何数据库系统内部定义事务是一件特别自然且必要的事情,但是如何在这种跨数据源场景下定义事务,其实是一件非常困难的事情。还是以上文的例子来展开,我们能看到数据源来自各种不同数据库,我们其实对于单表记录了对应的事务信息,但是确实没有办法定义来自不同数据源的统一事务。我们目前的朴素思路是根据数据产生的时间为基准,结合 checkpoint 统一划定 Epoch,实现类似 Epoch-based Commit 的提交机制。但是这样做又回到前面提到的问题,需要对服务器时间产生依赖,无法从根源保证正确性。

2.2 跨表事务

对于 Flink 物化视图一致性提交这个问题,TiFlink[4] 已经做了很多相关工作。但是我们的 Source 来自不同数据源,且读取自 Kafka,所以问题变得更为复杂,还是上面提到的例子,两张表 Join 过后,如果想保证一致性,不只是 Source 和 Sink 算子,整个关系代数算子体系都需要考虑引入事务提交的概念和机制,从而避免中间状态的对外部系统的发布。

3. 更新放大

这个问题其实比较好理解。现在有两张表 join,对于左表的每一行数据,对应右表都有 n (n > 100) 条数据与之对应。那么现在更新左表的任意一行,都会有 2n 的更新放大。

4. 状态大小

目前整套算法在全量同步阶段的 Overhead 虽然可控,但是仍有优化空间。我们目前实测,对于一张数据量在 1 亿左右的表,在全量数据阶段,需要峰值最大为 1.5G 左右的 State。我们打算在下个版本继续优化状态大小,最直接的思路就是 BulkSource 通知 KeyedCoProcess 哪些主键集合是已经处理完毕的,这样可以使对应的 Key 提早进入全量阶段完成模式,从而进一步优化状态大小。

五、总结与展望

本文分析了基于 Flink 物化视图实现的问题与挑战,着重介绍了处理生成完整的 Changelog DataStream 的算法与实现和在业务上的收益,也充分阐述了目前的限制与不足。

虽然这次实践的结果称不上完备,存在一些问题亟待解决,但是我们仍看到了巨大的突破与进步,不论是从技术还是业务使用上。我们充分相信未来这项技术会越来越成熟,越来越被更多人认可和使用,也通过此次探索充分验证了流处理和批处理的统一性。

我们目前的实现还处在早期版本,仍有着工程优化和 bug fix 的空间与工作 (比如前文提到的两表的推进的 skew 太大问题,可以尝试引入 Coordinator 进行调节与对齐),但是相信随着不断的迭代与发展,这项工作会变得越来越稳固,从而支撑更多业务场景,充分提升数据处理的质量与效率!

特别鸣谢张茄子和云邪老师的帮助与勘误。

引用

[1] http://mp.weixin.qq.com/s/KQH-relbrZ2GUqdmaTWx6Q

[2] http://github.com/ververica/flink-cdc-connectors

[3] http://arxiv.org/pdf/2010.12597.pdf

[4] http://zhuanlan.zhihu.com/p/422931694


近期热点

img


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
651 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
73 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
241 2
|
3月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
61 0
|
21天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
1月前
|
存储 数据采集 大数据
Flink实时湖仓,为汽车行业数字化加速!
本文由阿里云计算平台产品专家李鲁兵(云觉)分享,聚焦汽车行业大数据应用。内容涵盖市场趋势、典型大数据架构、产品市场地位及能力解读,以及典型客户案例。文章详细介绍了新能源汽车市场的快速增长、大数据架构分析、实时湖仓方案的优势,以及Flink和Paimon在车联网中的应用案例。
181 8
Flink实时湖仓,为汽车行业数字化加速!
|
19天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
46 9
|
2月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
405 31
Apache Flink 流批融合技术介绍
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
58 1

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
    下一篇
    无影云桌面