Flink CDC 在新能源制造业的实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。

摘要:本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。主要有以下几个内容:

  1. CDC 方案选型

  2. 方案落地实施

  3. 平台的优越性

  4. 后续规划

我们是一家专注于新能源动力电池制造的企业,致力于推动能源技术的发展与应用。作为一家具有多年行业经验的企业,我们在新能源领域积累了深厚的技术实力和市场认知,业务涵盖了新能源产业链的关键环节。从上游的装备制造到下游的应用解决方案,为客户提供了全方位的服务。

随着企业业务的不断发展,对数据实时性的要求日益提高。如何管理复杂的 Flink 任务,提高任务的开发效率,降低任务的运维难度,成为我们亟待解决的问题。接下来,我将介绍在我们团队在新能源制造业中大数据平台 CDC 技术架构的选型和 Flink CDC 的最佳实践。

一、CDC 方案选型

目前,我们团队规划的数据平台架构如上图所示,数据源这一层主要以 Oracle 为主,且跨网络的分散在各个基地,考虑到数据源支持和时效性,我们需要支持 Oracle CDC 数据采集,结合我们的业务场景,我们调研了实时采集的如下方案:

如上图所示,从 CDC 机制、增量同步、断点续传、全量同步、全量+增量、架构、数据计算、生态这八个方面做了对比,可以看出其中的佼佼者主要是 Flink CDC 和 Oracle OGG 以及 Debezium。

首先,让我们了解这三种技术的基本概况。Flink CDC 是 Apache Flink 的一个官方子项目,它利用流处理能力来捕获和转换源数据库的变更事件。OGG 是 Oracle提供的一种成熟的商业化 CDC 技术,广泛用于数据复制和实时数据分析。Debezium则是一个开源的分布式 CDC 系统,支持多种数据库和消息中间件,易于扩展和集成。

从实现机制上来看,Flink CDC 依赖于数据库的日志或 redo log来捕捉变更,并通过Flink的流处理引擎进行后续处理。它的优势在于与 Flink 生态的无缝整合,为批处理和流处理提供了统一的解决方案。此外,Flink CDC 支持丰富的转换和处理功能,如窗口操作、状态管理和时间处理,这对于复杂的实时分析场景非常有益。然而,它的局限性也较为明显,即对源数据库的依赖性较强,需要特定的日志格式或访问权限。

OGG 作为一种成熟的商业产品,提供了强大的数据捕获和传输能力。它通过解析源数据库的日志文件来捕获变更,并能够在不同的系统之间高效地传输数据。OGG 的优势在于其稳定性和广泛的兼容性,尤其是在处理大规模数据和高并发场景时表现出色。但是,OGG 的成本较高,且对于开源生态系统的支持不如其他方案那么丰富。

Debezium 则采用了不同的方法,它直接连接到数据库,通过订阅 binlog 或 redo log 来捕获变更事件。Debezium 的特点是其灵活性和易用性,支持多种数据库类型,并且可以方便地与 Kafka 等消息中间件集成。这使得 Debezium 非常适合于多源异构环境下的数据同步和实时分析。但是,Debezium 的性能可能不如Flink CDC和 OGG 那样出色,尤其是在处理大量数据时可能需要更多的优化工作。

在应用场景方面,Flink CDC 适合于需要复杂事件处理和流式分析的场景,尤其是那些已经采用Flink作为流处理平台的组织。OGG 则更适合于对稳定性和性能有极高要求的大型或关键任务应用。而 Debezium 由于其灵活性和易用性,适用于需要快速开发和部署CDC解决方案的场景,尤其是面对多样化的数据源和技术栈时。

此外,考虑到 OGG 成本高,链路长,且文档资源少,而 Debezium 配置复杂,而且在后续大数据流量或存在复杂的计算逻辑时难以为继,最终我们选择了 Flink CDC 作为 Oracle 的实时采集的工具方案。

二、方案落地实施:

我们团队在 Flink CDC 中的实施落地主要分为以下几步:

  1. 确保 Oracle 日志归档模式开启,开通 Oracle 整库补充日志,开通表级别的全列补充日志。
  2. 赋予操作用户各类权限,如表的操作权限,logminer 与 archive log 相关视图查询权限等。
  3. 开启跨基地网络权限,打通 Hadoop 集群对 Oracle 的访问权限。
  4. 通过平台编写 Flink SQL 任务,将 Oracle 的数据直接存入到 Doris 中,下图为线上作业效果图。

开发的过程中虽然整体顺利,但也遇到了几个小插曲,在此与各位读者一起分享

  1. 该配置开启了 Oracle 日志在线挖掘模式,如果没有这个设置,会导致生产环境默认策略读取 log 较慢,且默认策略会写入数据字典信息到 redo log 中导致日志量增加较多。但该配置也有缺点,不写入数据字典到 redo log 中,会导致无法处理 DDL 语句。
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
  1. 我们在同步 Oracle11g 的数据时,遇到了报错
Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table xxx. Use command: ALTER TABLE xxx ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

该错误是因为对于 Oracle11 版本,Debezium 会默认把 tableIdCaseInsensitive 设置为 true, 导致表名被更新为小写,因此在 Oracle 中查询不到 这个表补全日志设置,导致误报这个 Supplemental logging not configured for table 错误”,我们只需要开启如下设置即可

'debezium.database.tablename.case.insensitive'='false'
  1. 我们的任务在运行一段时间后报错
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name

这是因为 schema change event parsing 导致的问题,已经在3.1版本得到了解决,下载3.1版本的包即可,详见Flink CDC 项目的 PR #2315

  1. Flink Oracle CDC connector在采集分区表的时候存在问题,报错为
Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot

这是由于分区表未能被成功扫描到,我们进入到源码的类 OracleConnectionUtils 中可以看到

String queryTablesSql = "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";

其中如果需要加入分区表的话,要在where条件中加入 PARTITIONED = 'YES',重新编译打包即可解决该问题。

当然写入Doris的速度也可能存在慢的现象,特别是 StreamLoad 中进程 stopped 与返回 loadResult 结果之间时间较长时的问题,得益于 Apache Doris 社区的帮助,我们总结以下几点优化方案:

  1. 看BE的资源是不是满了,检查 IO、CPU、内存

  2. 换成 2pc 导⼊,⾮ 2pc 会等待 publish,2pc 导⼊ publish 是异步的

  3. mow 尝试换成 mor 表

  4. show proc '/transactions' 看看事务的状态,如果都是 committed,说明 publish 卡住了,如果 io 不⾼,可以增加 publish 的线程和超时

publish_version_timeout_second=60
publish_version_worker_count=16
  1. 增加 be.conf 的配置 ,适当提⾼下⾯的参数的配置
doris_scanner_thread_pool_thread_num=1024
webserver_num_workers=1024
fragment_pool_queue_size=4096
fragment_pool_thread_num_max=2048
doris_max_remote_scanner_thread_pool_thread_num=2048

三、平台的优越性:

此外 Apache StreamPark 也给我们带来了很多惊喜,在开发 FlinkSQL 任务的过程中,借助 StreamPark,极大的提升了开发时的效率,我罗列了我们开发团队觉得非常优秀的点:

1. 多版本Flink管理

在开发过程中,可能会因为需求Flink新版本的某项能力,或是某些组件对 Flink版本的强需求而引入多版本的 Flink,在 StreamPark 中,只需要配置一个 Flink 的路径并为其命名,即可在开发任务中轻松地选择我所需要的 Flink 版本。

2. 对于Git的集成

我们在引入 Apache Dolphinscheduler 时,曾多次接到数仓团队表明调度系统未能接入版本控制的遗憾,但在引入实时平台时,这个遗憾被弥补了。StreamPark 的 Git 式项目管理,极大的简化了我们在编写 JAR 后复杂的部署链路,一键编译、部署、配置,一气呵成。

3. 强大的作业运维能力及告警机制

StreamPark 有着完善的作业操作记录,可以清晰的记录所有任务 Release、Start、Cancel 的记录,对于 yarn application 形式的任务,记录了每次运行的 appId,便于后续的 yarn 日志翻阅、运行状态的监测等等。一旦报错,StreamPark 会迅速发送告警,如果配置了重启机制,可以实现分钟级的重启效率。

4. Flink SQL 在线开发

StreamPark 的类 IDE 样式的开发窗口相当优越,可以帮助我们检查低级的语法错误,简单的轻松地便可以配置各类想要配置的参数,如分配 JM、TM 的资源,配置预备提交的 yarn 队列,它的作业依赖配置更是相当简易,提供了填写 pom 和上传 jar 包两种形式,再也无需将包傻傻地通过hdfs上传,极大地提升了我们的开发效率。

四、后续规划:

感谢 Flink CDC,作为 Apache Flink 社区的重点项目,它提供给了我们开箱即用实时数据采集方案,极大的简化了 CDC 的实施过程,也规避了 OGG 昂贵的使用费用。

不过目前 Flink CDC Oracle Connector 也存在一些性能问题,众所周知,Oracle CDC依赖Debezium组件解析 Redo Log 与 Archive Log,Debezium 通过Oracle 的 Logminer 解析 Log。最大的问题就在于 LogMiner 的性能限制, 熟悉 Oracle 的开发者会知道,LogMiner是运行在 Oracle 内部, 并且运行在日志落地之后, 不可避免地需要消耗数据库的算力去完成工作, 为了降低这个不在主流程的进程的资源消耗, Oracle 对 LogMiner 做了非常严格的资源限制, 每个 LogMiner 进程, 他的资源消耗都不能超过 1 个 cpu 核心, 在大多数场景下, 这个将 LogMiner 的日志解析速度限制在1w条每秒以下, 在一些制造业的场景下, 这个速度是远远不够的, Oracle 是一个事务数据库, 一个大的 Update, 可能会带来数十万上百万的更新, 在这种情况下, 每秒1w的解析速度会使得下游延迟增大到数分钟级别, 更糟糕的是, 在数据库本身负载较高的情况下, 由于 LogMiner 的解析与数据库共享负载, 会让解析速度进一步下降。

后续团队考虑通过 Oracle 的机制, 将 redo log 异步传输到另外一台没有业务压力的 Oracle 实例上, 然后在另外一台机器上开启并发切分 scn range 事件,开启 LogMiner 线程并发的解析对应 scn range 事件,顺序的处理获取到的事件。


欢迎大家多多关注 Flink CDC,从钉钉用户交流群[1]、微信公众号[2]、Slack 频道[3]、邮件列表[4]加入 CDC 用户社区,以及在 Flink CDC GitHub 仓库[5]上参与代码贡献!


[1] “ Flink CDC 社区 ② 群”群的钉钉群号:80655011780

[2] ” Flink CDC 公众号“的微信号:ApacheFlinkCDC

[3] https://flink.apache.org/what-is-flink/community/#slack

[4] https://flink.apache.org/what-is-flink/community/#mailing-lists

[5] https://github.com/apache/flink-cdc


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?utm_content=g_1000395379&productCode=sc

retouch_2024070417440476.jpg

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
3天前
|
SQL 存储 Apache
基于 Flink 进行增量批计算的探索与实践
本文整理自阿里云高级技术专家、Apache Flink PMC朱翥老师在Flink Forward Asia 2024的分享,内容分为三部分:背景介绍、工作介绍和总结展望。首先介绍了增量计算的定义及其与批计算、流计算的区别,阐述了增量计算的优势及典型需求场景,并解释了为何选择Flink进行增量计算。其次,详细描述了当前的工作进展,包括增量计算流程、执行计划生成、控制消费数据量级及执行进度记录恢复等关键技术点。最后,展示了增量计算的简单示例、性能测评结果,并对未来工作进行了规划。
262 5
基于 Flink 进行增量批计算的探索与实践
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
215 15
|
17天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
352 2
探索Flink动态CEP:杭州银行的实战案例
|
20天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
67 16
|
1月前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
2月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
2月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
142 9
|
3月前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
123 9
|
3月前
|
运维 数据可视化 数据处理
实时计算Flink场景实践和核心功能体验 评测
实时计算Flink场景实践和核心功能体验 评测
84 5

相关产品

  • 实时计算 Flink版