一、背景介绍
飞书深诺集团致力于在出海数字营销领域提供全链路服务产品,满足不同企业的全球化营销需求。在广告效果监控和游戏运营业务场景中,为了及时响应广告投放成效与消耗方面的问题和快速监测运营动作效果,实时或准实时数据处理提供了至关重要的技术支撑。
通过对各个业务线实时需求的调研了解到,当前实时数据处理场景是各个业务线基于Java服务独自处理的。各个业务线实时能力不能复用且存在计算资源的扩展性问题,而且实时处理的时效已不能满足业务需求。鉴于当前大数据团队数据架构主要解决离线场景,无法承接更多实时业务,因此我们需要重新设计整合,从架构合理性,复用性以及开发运维成本出发,建设一套通用的大数据实时数仓链路。本次实时数仓建设将以游戏运营业务为典型场景进行方案设计,综合业务时效性、资源成本和数仓开发运维成本等考虑,我们最终决定基于Flink + Hudi + Hologres 来构建阿里云云原生实时湖仓,并在此文中探讨实时数据架构的具体落地实践。
二、飞书深诺大数据架构现状分析
当前数据架构图如下:
当前数据处理流图:
当前飞书深诺数据架构主要聚焦于处理离线数据场景,这个在数据处理流图可以看到,我们使用flink将kafka的数据写入MaxCompute来生成ODS层的数据,然后再使用ODPS-SQL任务进行ETL后生成DW层数据,然后将ADS层数据写入Holo,来对外提供数据服务,任务调度频次是按天或者按小时。基于统一公司大数据架构的背景,为了满足部分业务方对于实时性的要求,我们决定对当前架构做一些升级,将实时数据处理纳入架构考量。我们将重点从以下两个方面以作出决策:
1. 选择具有良好扩展性的存储和计算解决方案,以便能够适应未来日志协议的演进;
2. 实现实时数据处理的链路成本与时效性之间的合理平衡。
三、实时存储和计算选型
实时链路的时效性按照链路端到端的输出时间,可以分成下图中4个层级:
我们的实时业务主要有两块,一块是广告效果监控业务,对于数据处理时效的要求是秒级和毫秒级,这一部分业务我们选择如Hologres、Doris等实时数仓来处理;另外一块业务游戏运营对于数据处理时效的要求是分钟级即可,这部分数据我们打算放在低成本的湖存储中,然后通过实时数仓引擎来加速,实现准实时的效果。
3.1 实时数仓产品选型
在实时数仓产品选型上,我们主要对比了阿里云自研产品Hologres和开源Doris两款产品。
在产品能力上,两款产品均支持高吞吐的OLAP复杂分析和高并发点查以及湖仓加速的能力,但是在在扩展性方面,Doris是存算一体架构,存储计算无法单独扩展,而Hologres作为一款存储计算分离的云原生产品,支持存储、计算分别按需扩展以及Serverless模式的湖仓加速能力,在扩展性方面有更多的优势。
数据处理的时效性上,在Hologres和Doris的集群硬件对齐的情况下,以TPCH 100GB测试集测试效果来看,Hologres相比于Doris有3倍以上的性能提升。我们又拿广告效果监控数据实测下来,Hologres的性能优势还是很明显的,大概有两倍以上的性能差距。因不同OLAP业务查询处理逻辑不同,此处是基于TPCH标准测试集的测试结果,供参考:
Query |
Hologres(ms) |
doris(ms) |
Q1 |
620 |
8455 |
Q2 |
279 |
662 |
Q3 |
654 |
2233 |
Q4 |
420 |
1672 |
Q5 |
1444 |
3872 |
Q6 |
84 |
553 |
Q7 |
3433 |
1518 |
Q8 |
999 |
2873 |
Q9 |
2340 |
15252 |
Q10 |
919 |
3282 |
Q11 |
495 |
580 |
Q12 |
379 |
713 |
Q13 |
1164 |
7798 |
Q14 |
191 |
764 |
Q15 |
341 |
1007 |
Q16 |
754 |
1429 |
Q17 |
399 |
2155 |
Q18 |
2343 |
5738 |
Q19 |
334 |
765 |
Q20 |
1207 |
4164 |
Q21 |
3875 |
6148 |
Q22 |
622 |
1457 |
Total |
23296 |
73090 |
在实际业务开发中,我们发现Doris有一些限制导致无法完全满足业务方诉求,比如:
- Doris不支持读写分离,在大量写入以及大量ddl/dml时,写入和查询速度缓慢甚至报错,无法满足使用场景;
- Doris底层使用 MySQL协议有大量限制,如:主键最多支持36个字节,超过限制会自动截断,无法满足多主键的去重操作;STRING类型最大只能存放1M的数据,无法满足素材域大字段的存放(ad_body字段最大有8M左右等;
- 建表时要设置大量参数(分区、分桶、历史分区个数、副本数等),学习成本比较高,如果使用默认参数配置1亿数据量单查询count数要20s以上,hologres默认配置只需要50ms,这个性能差距还是很明显的;
综上,考虑到扩展性、易用性,时效性等维度,我们决定采用Hologres来支撑我们的实时数仓业务,而hologres作为一款商业化产品,更加稳定和成熟,且有庞大的运维团队去专项支持。
3.2 数据湖存储方案选型
对于要求秒级响应的广告监测业务,我们的数据直接存储在Hologres标准SSD存储中。对于游戏运营类的准实时业务(每秒写入RPS不超过2w和业务看板数据数据新鲜度10分钟),我们考察了Hudi、Delta、Iceburg三款比较火的数据湖存储格式,基本都可以满足要求。具体的产品对比网上有很多,此处不再赘述,基于业务场景实测,我们发现对于batch更新场景,Hudi性能更优,支持Bloomfilter过滤,对于streaming读写场景(更新流),只有Hudi可以做到生产可用。因此最终决定采用Hudi作为ODS层存储方案,Hologres作为DWD、DWS和ADS层的解决方案。
Hudi是当前比较主流的数据湖存储解决方案,提供了两种不同的表类型来适应各种数据处理需求:Copy-On-Write (COW) 和 Merge-On-Read (MOR)。COW类型适合写入次数较少,但需要快速、频繁读取的场景,MOR类型适合频繁更新写入次数较多,但读取次数少的场景。
Hudi具备核心优势:
1. 实时架构依赖:对于简单加工场景,下游可以通过外部表关联方式提供时效性;对于时效性较高的场景,可以依赖Hudi CDC流实现实时同步。
2. 离线实时共用ODS表:提前下游加工开始时间,保障SLA;降低数据质量问题的源头差异风险;减少开发运维成本。
3. 利于OLAP场景快速分析查询:Hudi支持Presto、Trino、Spark、Starrocks、Hologres等引擎直接查询
4. Schema Evolution:适配日志字段扩展;
5.局部更新:可以用于日志去重或回补场景
基于Hudi的ODS存储方案,数据加工链路演变对比如下:
和当前的大数据架构ODS层设计比较,减少了数据链路,并且实现了在Flink实时写入阶段就可以构建即时可用的ODS层,从而将时效从离线批次写入提升到实时写入。
3.3 实时计算方案选型
基于以上产品选型和我们的架构现状,结合当前阿里云云原生大数据实时方案,我们的实时计算方案有下面三种可选:
三种实时计算方案不同粒度对比:
序号 |
实时方案 |
说明 |
时效性 |
云资源成本 |
开发运维成本 |
计算复杂度支持 |
1 |
Flink=>Hudi=>Hologres |
Flink流入Hudi,使用外表机制直接在Hologres中进行实时加工或建设视图 |
<=10min |
低(ODS层写入Hudi后Hologres直接可读) |
低 |
低 |
2 |
Flink=>Hudi-CDC=>Flink=>Hologres |
Flink流入Hudi,后续流程使用Flkink Streaming增量方式进行生产加工实时写入Hologres |
<=5min |
高(ODS写入Hudi后需要额外引入Flink实时计算) |
高 |
高 |
3 |
Flink=>Hudi=>MC=>Hologres |
Flink流入Hudi,使用外表机制在Maxcompute中进行定期调度加工写入Hologres |
>=10min |
高(ODS层写入Hudi后MC直接可读,但需要额外增加一层Hologres导数任务) |
低 |
高 |
从三个方案的时效性、资源、开发运维成本和计算复杂度的支持情况综合考虑,根据本次业务场景特点(每秒写入RPS不超过2w和业务看板数据新鲜度10分钟),序号1对应的实时方案(Flink=>Hudi=>Hologres)比较符合我们的要求。
这种方案直接通过外表机制在Hologres中进行实时加工或建设视图,实现数据的实时同步和处理,确保数据能够在短时间内被处理并提供给下游系统。方案采用数仓同学比较熟悉的云原生组件Hologres,使得数仓开发同学没有任何额外的开发和运维成本。
该架构不仅实现了当前项目所需的时效性,还保持了系统的灵活性和可维护性。既能满足当前离线架构的需求,因为Hudi可以被MaxCompute引擎直接读取,又能满足未来更高时效(<=5min)的诉求,因为Hudi可以开启CDC模式,此时可以将序号1的实时方案演变成序号2,达到快速满足业务更高时效的诉求的能力,这让我们的架构具备一定的扩展性。
因此,结合存储方案和计算方案的选型结论,我们的数据架构演化成如下(红线标注):
3.4 离线架构设计的原因与配套设施搭建的思考
实时架构的设计上,为了保障实时数仓稳定性,可靠性,健壮性,通常会加入离线链路加工流程,主要是为了做一些补偿设计,解决以下问题:
1. 技术栈的校准:在实时数据处理场景中,由于计算逻辑的差异,可能会产生与离线数据处理不一致的结果。通过离线处理流程可以校准这些差异,确保数据准确性。
2.数据处理能力:离线批处理流程具有强大的数据处理能力,可以应对数据加工规则的调整、业务维度的变化、业务数据的更正以及数据丢失等问题。在这些情况下,离线处理流程能够有效地进行数据回补和修正。
在部分情况下,如果离线和实时数据处理产生的指标偏差在可接受的范围内,可以主要依赖实时数据处理流程作为主干路,这样做可以节约计算资源。而离线数据处理流程则被用于处理特殊情况,如跨天数据去重和数据回补等场景。
四、全链路监控
一个好的架构落地,离不开配套设施的搭建,这样才能在真正生产运行的时候做到全局可控,因此,我们还增加了全链路监控,以便在线上数据出现问题时能够迅速进行故障排查和定位。以下红圈是对链路不同环节增加的节点监控指标,如有异常,会以邮件、短信等形式发送预警信息。监控上主要是以下三点考虑:
1. 数据完整性监控: 设立全链路监控机制,确保数据的完整性,并能够快速定位问题所在。
2. 数据时效性监控: 端到端时效性监控,包括消息处理的延迟和积压情况,监控数据流转的实时性。
3.容量规划监控: 根据消息日志RPS、计算资源和存储资源的使用情况,以便进行有效的容量规划和资源分配。
此外,我们还针对Hologres和Flink-Hudi关键节点上,分别做了服务隔离和过载保护的设置,保障实时数仓链路的稳定性。
过载保护 |
服务隔离 |
|
Hologres |
查询超时1分钟 |
主从实例,读写隔离 |
Flink-Hudi |
Checkpoint 1分钟 |
存储冷热拆分 |
最后我们针对未来的不同运维场景,也提前制定不同的预案和处理方案。
场景 |
说明 |
历史数据回刷 |
使用批处理方案进行回刷; |
数据乱序 |
必要场景下,使用Flink基于业务时间进行重排; |
时间错误 |
数据过滤,ODS和业务侧增加数据校正逻辑; |
存量数据同步 |
批处理方案进行回刷 |
数据暴增 |
Kafka削峰,Flink自动扩缩容 |
最终我们的运行架构图如下:
实时架构采用Hudi来统一离线和实时数据处理流程,目的是为了减少数据不一致性的风险,并提高操作ODS层的数据处理效率。同时简化了数据加工链路,并且还实现了计算资源的高效利用。结合Hudi本身的一些特性和丰富的稳定性监控策略以及运维手段的设计,能够灵活应对各种异常情况和扩展性需求,保障数据的完整性和可靠性。最终构建出飞书深诺基于Flink + Hologres + Hudi的实时数仓架构。
五、游戏运营业务场景实践
结合前几节的架构设计,本次业务场景架构图如下:
业务数据链路描述:
·ODS层数据准备:
数据源有两类,一类是MySQL业务数据,一类是SDK打点日志。MySQL业务数据是通过Flink CDC实时同步到Hudi做ODS层直接使用。SDK打点日志是先写入Kafka,后续接入Flink将数据实时去重同步到Hudi做ODS层直接使用。
·DW层数据聚合
将ODS层按照游戏业务看板需求进行不同粒度聚合,数据范围是当天零点到当前时间前对齐10分钟的时间点之间所有接收的原始数据。由于Flink Checkpoint时间有一定开销,任务实际启动配置时间需要基于Checkpoint时间有一定偏移。
离线加工每天加工一次T-1之前的增量数据,增量更新完毕并保留历史全量数据,用于数据加工规则的调整、业务维度的变化、业务数据的更正以及数据丢失等问题的快速修正。
·ADS层数据服务
DW层数据聚合后会将数据推到实时数仓引擎Hologres上并对接公司数据统一服务平台OneService,最终将数据通过接口的形式提供给业务部门用于看板展示。
六、效果评估
本次实时数据架构落地稳定支持了游戏运营平台的重构。基于上线后长期的观察,当前数据量情况下,整个链路数据端到端的时效稳定在3分半,满足业务诉求。上线后我们还基于新架构当前计算和存储资源进行压测以评估现有系统容量。
针对计算资源,Flink以基准资源为单位进行压测,评估系统容量:
Job Manager CPU |
Job Manager Memory |
Task Manager CPU |
Task Manager Memory |
Checkpoint提交 |
0.25 |
1G |
1 |
4GiB |
60秒以内 |
容量评估结果:
基准资源下,场景X1最低需要一倍基准资源,保证Flink 60sCheckpoint提交,最大日志RPS为6000;场景X2则需要保障2倍基准资源,保证60sCheckpoint提交前提下最大日志RPS为3000;
对基准资源进行小倍数扩容,基本能够保障容量线性增长。
针对存储资源,Hologres以如下基准资源为单位进行压测,评估系统处理数据量:
Hologres资源 |
||
计算资源 |
计算节点数量 |
执行时长 |
32 Core 128 GB |
2 |
9分钟 |
容量评估结果:
实时数仓加工时长 = 端到端数据加工时长(10分钟)-Flink消费时长(1分钟(预估)) =9分钟
根据测试结果,可以计算2000W处理的时间在30s左右。目前加工逻辑复杂度可以水平扩展,合理假设数据吞吐随资源线性递增,推断出 9分钟/30s * 2000w / 86400s =4166RPS,所以当日志RPS在4166以下,以基准资源能够保障整体端到端数据加工在10分钟内完成。结合基准资源的容量测试结果和可水平扩展的假设,比较容易预估资源配置需要如何调整来实现系统容量的设计目标。
七、未来展望
此次基于Flink + Hudi + Hologres构建实时数仓,提供了数据端到端实时数据处理能力。这套架构是在阿里云云原生数仓架构的基础上进行的优化改造,对数仓开发来说无需增加额外的学习成本,开发运维等成本比较低,能快速支持公司的实时业务。
在设计之初,我们也预想到一些未来改进的点:
1,业务数据时效上如果有更高的要求,那么我们将利用Hudi的CDC能力,将当前选择的方案进行轻量改造,演变成3.2节中实时计算方案选型中序号2的方案上。
2,目前完全基于阿里云云原生架构的设计,未来考虑容灾可结合多云架构进行设计。