从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。

Flink-Doris-Connector 作为 Apache Flink 与 Doris 之间的桥梁,打通了实时数据同步、维表关联与高效写入的关键链路。本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。

一、Apache Doris 简介

Apache Doris 是一款基于 MPP 架构的高性能、实时的分析型数据库,整体架构精简,只有 FE 、BE 两个系统模块。其中 FE 主要负责接入请求、查询解析、元数据管理和任务调度,BE 主要负责查询执行和数据存储。Apache Doris 支持标准 SQL 并且完全兼容 MySQL 协议,可以通过各类支持 MySQL 协议的客户端工具和 BI 软件访问存储在 Apache Doris 中的数据库。

在典型的数据集成和处理链路中,往往会对 TP 数据库、用户行为日志、时序性数据以及本地文件等数据源进行采集,经由数据集成工具或者 ETL 工具处理后写入至实时数仓 Apache Doris 中,并由 Doris 对下游数据应用提供查询和分析,例如典型的 BI 报表分析、OLAP 多维分析、Ad-hoc 即席查询以及日志检索分析等多种数据应用场景。

Apache Doris 简介 .PNG

Flink-Doris-Connector 是 Apache Doris 与 Apache Flink 在实时数据处理 ETL 的结合,依托 Flink 提供的实时计算能力,构建高效的数据处理和分析链路。Flink-Doris-Connector 的使用场景主要分为三种:

  1. Scan:通常用来做数据同步或是跟其他数据源的联合分析;
  2. Lookup Join:将实时流中的数据和 Doris 中的维度表进行 Join;
  3. Real-time ETL:使用 Flink 清洗数据再实时写入 Doris 中。

Apache Doris 简介 -2.PNG

二、Flink-Doris-Connector 典型场景的设计与实现

本章节结合 Scan、Lookup Join、Write 这三种场景,介绍 Flink-Doris-Connector 的设计与实现。

01 Scan 场景

Scan 场景指将 Doris 中的存量数据快速提取出来,当从 Doris 中读取大量数据时,使用传统的 JDBC 方法可能会面临性能瓶颈。因此 Flink-Doris-Connector 中可以借助 Doris Source ,充分利用 Doris 的分布式架构和 Flink 的并行处理能力,从而实现了更高效的数据同步。

Doris Source 读取流程

  • Job Manager 向 FE 端发起请求查询计划,FE 会返回要查询的数据对应的 BE 以及 Tablet;
  • 根据不同的 BE,将请求分发给不同的 TaskManager;
  • 通过 Task Manager 直接读取每个 BE 上对应 Tablet 的数据。

通过这种方式,我们可以利用 Flink 分布式处理的能力从而提高整个数据同步的效率。

Doris Source 读取流程.PNG

02 Lookup Join 场景

Lookup Join 场景.PNG

对于维度表存储在 Doris 中的场景,可通过 Lookup Join 实现对实时流数据与 Doris 维度表的关联查询。

JDBC Connector

Doris 支持 MySQL 协议,所以可以直接使用 JDBC Connector 进行 Lookup Join,但是这一方式存在一定的局限:

  • Jdbc Connector 中的 Lookup Join 是同步查询的操作,会导致实时流中每条数据都要等待 Doris 查询的结果,增加了延迟。
  • 仅支持单条数据查询,在上游数据量吞吐较高时,容易造成性能瓶颈和反压。

Flink-Doris-Connector 的优化

因此针对 Lookup Join 场景 ,Flink-Doris-Connector 实现了异步 Lookup Join 和攒批查询的优化:

  • 支持异步 Lookup Join: 异步 Lookup Join 意味着实时流中的数据不需要显式等待每条记录的查询结果,可以大大的降低延迟性。
  • 支持攒批查询: 将实时流的数据追加到队列 Queue 中,后台通过监听线程 Watcher,将队列里面的数据取出来再推送到查询执行的 Worker 线程池中,Worker 线程会将收到的这一批数据拼接成一个 Union All 的查询,同时向 Doris 发起 Query 查询。

通过异步 Lookup join 以及攒批查询,可以在上游数据量比较大的时候大幅度提高维表关联吞吐量,保障了数据读取与处理的高效性。

03 实时 ETL 场景

实时 ETL 场景.png

对于实时写入来说,Doris Sink 的写入是基于 Stream Load 的导入方式去实现的。Stream Load 是 Apache Doris 中最为常见的数据导入方式之一,支持通过 HTTP 协议将本地文件或数据流导入到 Doris 中。主要流程如下:

  • Sink 端在接收到数据后会开启一个 Stream Load 的长链接请求。在 Checkpoint 期间,它会将接收到的数据以 Chunk 的形式持续发送到 Doris 中。
  • Checkpoint 时,会对刚才发起的 Stream Load 的请求进行提交,提交完成后,数据才会可见。

如何保证数据写入的 Exactly-Once 语义

如何保证数据写入的 Exactly-Once 语义 .png

那么,如何保证数据写入期间,端到端数据的精确一次性?

以 Kafka 同步到 Drois 的 Checkpoint 过程为例:

  1. Checkpoint 时,Source 端会接收到 Checkpoint Barrier;
  2. Source 端接收到 Barrier 后,首先会对自身做一个快照,同时会将 Checkpoint Barrier 下发到 Sink 端;
  3. Sink 端接收到 Barrier 后,执行 Pre-commit 提交,成功后数据就会完整写入到 Doris,由于此处执行的是预提交,所以在 Doris 上,此时对用户来说数据是不可见的;
  4. 将 Pre-Commit 成功的事务 ID 保存到状态中;
  5. 所有的算子 Checkpoint 都做完后,Job Manager 会下发本次 Checkpoint 完成的通知;
  6. Sink 端会对刚才 Pre-commit 成功的事务进行一次提交。

通过这种两阶段提交,就可以实现端到端的精确一次性。

实时性与 Exactly-Once

实时性与 Exactly-Once.png

上面提到,Doris Sink 端的写入与 Checkpoint 绑定,数据写入 Doris 的延迟性取决于 Checkpoint 的间隔。但在一些用户的场景下,希望数据可以实时写入,但是 Checkpoint 不能做的太频繁,同时对于一些作业来说,如果 Checkpoint 太频繁会消耗大量资源,针对该情况,Flink-Doris-Connector 引入了攒批机制,以平衡实时性与资源消耗之间的矛盾。

攒批的实现原理是 Sink 端接收上游数据之后,不会立即将每条数据单独写入 Doris,而是先在内存中进行缓存,然后通过对应参数设置,将缓存数据提交到 Doris 中。结合攒批写入和 Doris 中的主键模型,可以确保数据写入的幂等性。

通过引入攒批机制,既满足了用户对数据实时写入的需求,又避免了频繁 Checkpoint 带来的资源消耗问题,从而实现性能与效率的优化。

三、基于 Flink CDC 的整库同步方案

以上是对 Flink-Doris-Connector 的典型场景和实现原理介绍,接下来我们来看它在实际业务中的一个重要应用——整库同步。相比底层实现,整库同步更偏向具体使用场景。下面我们基于前面介绍的能力,进一步探讨如何通过 Flink CDC 实现 TP 数据库到 Doris 的高效、自动化同步。

01 整库同步痛点

在数据迁移过程中,用户通常希望可以尽快将数据迁移到 Doris 中,然而在同步 TP 数据库时,整库同步往往面临以下几点挑战:

  • 建表:
    • 存量表的快速批量创建:TP 数据库中往往存在成千上万的表,这些表的结构各异,对于存量表而言需要逐一在 Doris 中创建对应的表结构;
    • 同步任务开启后,新增表的自动创建与同步: 为了保证数据的完整性和实时性,同步工具需要实时监控 TP 数据库的变化,并自动在 Doris 中创建和同步新表。
  • 元数据映射: 上下游之间字段元数据的便捷映射,包括字段类型的转换、字段名称的对应修改等。
  • DDL 自动同步: 增加、删除列等操作会导致数据库结构发生变化,进而影响到数据同步。因此,同步工具需要能够实时捕获 DDL 并动态地更新 Doris 表结构,以确保数据的准确性和一致性。
  • 开箱即用: 零代码,低门槛,理想的同步工具只需进行简单配置,即可实现数据的迁移和同步。

02 基于 Flink CDC 实现整库同步

在数据抽取方面,Flink-Doris-Connector 借用了 Flink CDC 的特性能力:

  • 增量快照读取
    • 无锁读取与并发读取:不论存量数据量多大,都可以通过横向提高 Flink 的并发提升数据读取速度。
    • 断点续传:当存量数据比较大时,可能面临同步中断的情况,CDC 支持中断任务的衔接同步。
  • 丰富数据源支持,Flink CDC 支持多种数据库,如 MySQL、Oracle、SQLServer 等。
  • 无缝对接 Flink 现有生态,方便与 Flink 已有Source 和 Sink 结合使用。

基于 Flink CDC 实现整库同步.png

一键建表与元数据自动映射

Flink-Doris-Connector 中集成了 Flink CDC 等能力,可以让用户只提交一个操作,就能进行整库同步的操作。其主要原理是 Flink CDC Source 在接收到上游的数据源之后,会进行分流处理,不同的表用不同的 Sink。同时在最新的 Connector 版本中,也支持单个 Sink 同步多张表,支持新增表的创建和同步。

集成 Flink CDC 的功能后,用户仅需通过 Flink-Doris-Connector 提交任务,就可以在 Doris 自动创建所需的表,而无需配置上下游表之间的显式关联,实现数据快速同步

当 Flink 任务启动后,Doris-Flink-Connector 将自动识别对应的 Doris 表是否存在。如果表不存在,Doris Flink Connector 会自动创建表,并根据 Table 名称进行分流,从而实现下游多个表的 Sink 接入;如果表存在,则直接启动同步任务。

这一改进,不仅简化了配置流程,还使得新增表的创建和同步更加便捷,从而提升数据处理的整体效率。

Light Schema Change 与 DDL 自动同步

Light Schema Change 与 DDL 自动同步.png

在 Apache Doris 1.2 版本之前,Schema Change 操作比较繁琐,需要手动增改数据列。在上游 TP 数据库发生表结构变更时,需要暂停数据同步任务、待 Doris 中的 Schema Change 完成后再重启任务。

自 Apache Doris 1.2 版本起,我们引入了轻量级的 Light Schema Change 机制,极大地简化了操作流程,常见的增减列场景其处理速度可达毫秒级。Light Schema Change 机制原理如下:

  • Schema Change:
    • 客户端向 FE 发起增减列的请求;
    • FE 在接收到请求后,修改当前元数据,并将最新的 Schema 持久化;
    • FE 向客户端同步 Schema Change 的结果;
  • Data Load:
    • 当后续导入任务发起时,FE 将导入任务与最新的 Schema 信息发送给 BE;
    • 在数据写入过程中,BE 的每个 Rowset 都会存储当前导入的 Schema 信息;
  • Query:
    • FE 将查询计划与最新的 Schema 一起发送给 BE;
    • BE 使用最新 Schema 执行查询计划;
  • Compaction:
    • 在 BE 中,对参与合并的 Rowset 版本进行比较;
    • 根据最新的 Schema Change 信息进行数据合并。

经测试,与早期的 Schema Change 相比,Light Schema Change 的数据同步性能有了数百倍的提升,

Light Schema Change 与 DDL 自动同步-2.png

Light Schema Change 与 Flink-Doris-Connector 的结合,通过 Flink CDC 可以实现 DDL 的自动同步,具体步骤如下:

  1. Source 端捕获上游 Schema Change 信息,开启 DDL 变更同步;
  2. Doris Sink 端识别并解析 DDL 操作(加减列);
  3. Table 校验,判断是否可以进行 Light Schema Change;
  4. 发起 Schema Change 操作;

基于这一实现,Doris 能自动获取到 DDL 语句并在毫秒级即可完成 Schema Change 操作,在上游 TP 数据库发生表结构变更时,数据同步任务无需暂停。

开箱即用:MySQL 整库同步示例

开箱即用:MySQL 整库同步示例.png

对于用户来讲,只要有 Flink 客户端,通过上图的操作就可以提交整库同步作业。支持传入 Flink 的配置,比如并发设置、Checkpoint 间隔等,也支持正则表达式去配置需要同步的表, 同时可以将 Flink CDC Source 和 Doris Sink 的配置直接透传给具体的 Connector。通过这种方式,用户可以很便捷地提交整库同步作业。

03 Flink-Doris-Connector 核心优势

基于以上优化,可以完美解决用户的痛点:

  1. 自动建表,即存量表与增量表的自动创建,无需用户提前在 Doris 中预先创建对应的表结构;
  2. 自动映射上下游字段,无需手动写入上下游字段间的匹配规则,节省大量人力成本;
  3. 增减列无感同步,及时获取上游 DDL 语句并自动在 Doris 中实现毫秒级 Schema Change,无需停服、数据同步任务平稳运行;
  4. 开箱即用,降低学习成本,更专注业务本身。

04 最佳实践

在生产环境中,若作业数量较多,直接采用上述提交方式的作业管理复杂度较高。通常建议借助任务托管平台(如 StreamPark),实现对作业的统一创建、监控与运维,从而提升任务管理效率与系统稳定性。

最佳实践.png

最佳实践-2.png

四、未来规划

未来,基于 Flink-Doris-Connector 的能力规划如下:

  1. 支持实时读取。目前 Doris Source 只是把数据 Scan 出来,是一个有界流的读取,后续会支持 CDC 的场景,可以使用 Flink 来对 Doris 中的数据进行流式的读取。
  2. Sink 一流多表。目前Flink-Doris-Connector支持单个 Sink 同步多张表,但是 Stream Load 的导入方式还是只支持单个表的导入。所以在表特别多的时候,需要在 Sink 端维护大量 StreamLoad 的连接,在后续会做到单个 Stream Load 的连接支持多张表的写入。
  3. 整库同步方面,支持更多的上游数据源,满足更多数据同步场景。
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
2月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
543 43
|
2月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
181 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
14天前
|
人工智能 Java API
Java与大模型集成实战:构建智能Java应用的新范式
随着大型语言模型(LLM)的API化,将其强大的自然语言处理能力集成到现有Java应用中已成为提升应用智能水平的关键路径。本文旨在为Java开发者提供一份实用的集成指南。我们将深入探讨如何使用Spring Boot 3框架,通过HTTP客户端与OpenAI GPT(或兼容API)进行高效、安全的交互。内容涵盖项目依赖配置、异步非阻塞的API调用、请求与响应的结构化处理、异常管理以及一些面向生产环境的最佳实践,并附带完整的代码示例,助您快速将AI能力融入Java生态。
176 12
|
6月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
644 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
2月前
|
人工智能 自然语言处理 安全
Python构建MCP服务器:从工具封装到AI集成的全流程实践
MCP协议为AI提供标准化工具调用接口,助力模型高效操作现实世界。
451 1
|
2月前
|
供应链 监控 搜索推荐
35页PPT|零售行业自助数据分析方法论:指标体系构建平台集成、会员与商品精细化运营实践
在零售行业环境剧变的背景下,传统“人找货”模式正被“货找人”取代。消费者需求日益个性化,购买路径多元化,企业亟需构建统一的指标体系,借助BI平台实现数据驱动的精细化运营。本文从指标体系构建、平台集成到会员与商品运营实践,系统梳理零售经营分析的方法论,助力企业实现敏捷决策与业务闭环。
35页PPT|零售行业自助数据分析方法论:指标体系构建平台集成、会员与商品精细化运营实践
|
2月前
|
机器学习/深度学习 SQL 大数据
什么是数据集成?和数据融合有什么区别?
在大数据领域,“数据集成”与“数据融合”常被混淆。数据集成关注数据的物理集中,解决“数据从哪来”的问题;数据融合则侧重逻辑协同,解决“数据怎么用”的问题。两者相辅相成,集成是基础,融合是价值提升的关键。理解其差异,有助于企业释放数据潜力,避免“数据堆积”或“盲目融合”的误区,实现数据从成本到生产力的转变。
什么是数据集成?和数据融合有什么区别?
|
3月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
242 1
京东零售基于Flink的推荐系统智能数据体系
|
2月前
|
消息中间件 存储 数据采集
Apache InLong:构建10万亿级数据管道的全场景集成框架
Apache InLong(应龙)是一站式、全场景海量数据集成框架,支持数据接入、同步与订阅,具备自动、安全、可靠和高性能的数据传输能力。源自腾讯大数据团队,现为 Apache 顶级项目,广泛应用于广告、支付、社交等多个领域,助力企业构建高效数据分析与应用体系。
|
5月前
|
数据采集 SQL canal
Amoro + Flink CDC 数据融合入湖新体验
本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
259 1
Amoro + Flink CDC 数据融合入湖新体验

相关产品

  • 实时计算 Flink版