SmartNews:基于 Flink 加速 Hive 日表生产的实践

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 将 Flink 无缝地集成到以 Airflow 和 Hive 为主的批处理系统的技术挑战和应对方案。

本文介绍了 SmartNews 利用 Flink 加速 Hive 日表的生产,将 Flink 无缝地集成到以 Airflow 和 Hive 为主的批处理系统的实践。详细介绍过程中遇到的技术挑战和应对方案,以供社区分享。主要内容为:

  1. 项目背景
  2. 问题的定义
  3. 项目的目标
  4. 技术选型
  5. 技术挑战
  6. 整体方案及挑战应对
  7. 项目成果和展望
  8. 后记

GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~

一、项目背景

SmartNews 是一家机器学习驱动的互联网公司。自 2012 年于日本东京成立,并在美国和中国设有办公室。经过 8 年多的发展,SmartNews 已经成长为日本排名第一,美国成长最快的新闻类应用,覆盖全球超过 150 多个国家市场。据 2019 年初统计,SmartNews 的 iOS 和 Android 版本全球累计下载量已经超过 5000 万次。

SmartNews 在过去 9 年的时间,基于 Airflow, Hive, EMR 等技术栈构建了大量的数据集。随着数据量的增长,这些离线表的处理时间在逐渐拉长。另外,随着业务方迭代节奏的加快,对表的实时性也提出了更高的要求。因此,SmartNews 内部发起了 Speedy Batch 的项目,以加快现有离线表生产效率。

本次分享便是 Speedy Batch 项目中的一个例子,加速用户行为 (actions) 表的实践。

APP 端上报的用户行为日志,每日通过 Hive 作业生成日表,这个表是许多其他表的源头,至关重要。这个作业需要运行 3 个小时,进而拉高了许多下游表的延迟 (Latency),明显影响数据科学家、产品经理等用户的使用体验。因此我们需要对这些作业进行提速,让各个表能更早可用。

公司业务基本上都在公有云上,服务器的原始日志以文件形式上传至云存储,按日分区;目前的作业用 Airflow 调度到 EMR 上运行,生成 Hive 日表,数据存储在云存储。

二、问题的定义

1. 输入

新闻服务器每隔 30 秒上传一个原始日志文件,文件上传至相应日期和小时的云存储目录。

2. 输出

原始日志经过 ETL 处理之后,按日 (dt) 和行为 (action) 两级分区输出。action 种类约 300 个,不固定,常有增减。

3. 用户

对这个表的使用是广泛的,多途径的。有从 Hive 里查询,也有从 Presto,Jupyter 和 Spark 里查询,我们甚至不能确定以上就是全部的访问途径。

三、项目的目标

  1. 将 actions 表的时延从 3 小时缩短至 30 分钟;
  2. 对下游用户保持透明。透明又分两个方面:

    • 功能方面:用户无需修改任何代码,做到完全无感
    • 性能方面:新项目产生的表,不应该导致下游读取时的性能下降

四、技术选型

在本项目之前,同事已经对该作业做了多轮次改进,效果不是很显著。

尝试过的方案包括增加资源,投入更多的机器,但遇到了云存储的 IOPS 限制:每个 prefix 最多支持 3000 个并发读写,这个问题在输出阶段尤为明显,即多个 reducer 同时向同一个 action 子目录输出的时候,容易碰到这个限制。另外还尝试了按小时预处理,然后到每日凌晨再合并成日表,但合并过程亦耗时较多,整体时延还是在 2.5 小时左右,效果不够显著。

鉴于服务器端的日志是近实时上传至云存储,团队提出了流式处理的思路,摒弃了批作业等待一天、处理 3 小时的模式,而是把计算分散在一整天,进而降低当天结束后的处理用时。团队对 Flink 有比较好的背景,加上 Flink 近期对 Hive 的改进较多,因此决定采用基于 Flink 的方案。

五、技术挑战

挑战是多方面的。

1. 输出 RC 文件格式

当前 Hive 表的文件格式为 RCFile,为了保证对用户的透明,我们只能在现有的 Hive 表上做 in-place 的 upgrade,也就是我们得重用当前表,那么 Flink 输出的文件格式也得符合 RCFile 格式,因为一张 Hive 表只能有一个格式。

RCFile 属于 bulk format (相对应的是 row format),在每次 checkpoint 时必须一次性输出。如果我们选择 5 分钟一次 checkpoint,那么每个 action 每 5 分钟必须输出一个文件,这会大量增加结果文件数,进而影响下游的读取性能。特别是对于低频 action,文件数会上百倍的增加。我们了解了 Flink 的文件合并功能,但那是在一个 checkpoint 内多个 sink 数据的合并,这并不能解决我们的问题,我们需要的是跨 checkpoint 的文件合并。

团队考虑过以 row format (e.g. CSV) 输出,然后实现自定义的 Hive SerDe,使之兼容 RCFile 和 CSV。但很快我们放弃了这个设想,因为那样的话,需要为每个查询场景实现这个 Hybrid 的 SerDe,例如需要为 Presto 实现,为 Spark 实现,等等。

  • 一方面我们没法投入这么多资源;
  • 另一方面那种方案也是用户有感的,毕竟用户还是需要安装这个自定义的 SerDe。

    我们之前提出了生成一个新格式的表,但也因为对用户不够透明而被否决。

2. Partition 的可感知性和完整性

如何让下游作业能感知到当天这个 partition 已经 ready?actions 表分两级 partition, dt 和 action。action 属于 Hive 的 dynamic partition,数量多且不固定。当前 Airflow 下游作业是等待 insert_actions 这个 Hive 任务完成后,再开始执行的。这个没问题,因为 insert_actions 结束时,所有 action 的 partition 都已经 ready 了。但对于 Flink 作业来说,没有结束的信号,它只能往 Hive 里面提交一个个的 partition,如 dt=2021-05-29/action=refresh。因为 action 数量多,提交 partition 的过程可能持续数分钟,因此我们也不能让 Airflow 作业去感知 dt 级别的 partition,那样很可能在只有部分 action 的情况下触发下游。

3. 流式读取云存储文件

项目的输入是不断上传的云存储文件,并非来自 MQ (message queue)。Flink 支持 FileStreamingSource,可以流式的读入文件,但那是基于定时 list 目录以发现新的文件。但这个方案不适合我们的场景,因为我们的目录太大,云存储 list 操作根本无法完成。

4. Exactly Once 保证

鉴于 actions 表的重要性,用户无法接受任何的数据丢失或者重复,因此整个方案需要保证恰好一次的处理。

六、整体方案及挑战应对

1. 输出 RCFile 并且避免小文件

我们最终选择的方案是分两步走,第一个 Flink 作业以 json (row format) 格式输出,然后用另外一个 Flink 作业去做 Json 到 RC 格式的转化。以此解决 Flink 不能愉快的输出合适大小 RC 文件的问题。

输出 json 的中间结果,这样我们可以通过 Rolling Policy 控制输出文件的大小,可以跨多个 checkpoint 攒成足够大,或者时间足够长,然后再输出到云存储。这里 Flink 其实利用的是云存储的 Multi Part Upload (MPU) 的功能,即每次 checkpoint Flink 也是把当前 checkpoint 攒下来的数据上传至 云存储,但输出的不是文件,而是一个 part。最后当多个 part 达到大小或者时间要求,就可以调用云存储的接口将多个 part 合并成一个文件,这个合并操作在云存储端完成,应用端无需再次读取这个 part 到本地合并然后再上传。而 Bulk format 均需要一次性全局处理,因此无法分段上传然后合并,必须一次性全部上传。

当第二个作业感知到一个新的 json 文件上传后,加载它,转化成 RCFile,然后上传到最终的路径。这个过程带来的延迟较小,一个文件可以控制在 10s 以内,这是可以接受的。

2. 优雅的感知输入文件

输入端,没有采用 Flink 的 FileStreamingSource,而是采用云存储的 event notification 来感知新文件的产生,接受到这个通知后再主动去加载文件。

3. Partition 的可感知性和完整性

输出端,我们输出 dt 级别的 success file,来让下游可靠地感知日表的 ready。我们实现自定义的 StreamingFileWriter,使之输出 partitionCreated 和 partitionInactive 的信号,并且通过实现自定义的 PartitionCommitter,来基于上述信号判断日表的结束。

其机制如下,每个云存储 writer 开始写某个 action,会发出一个 partitionCreated 信号,当它结束时又发出 partitionInactive 信号。PartitionCommitter 判断某一天之内是否所有的 partittion 都 inactive 了,如果是,则一天的数据都处理了,输出 dt 级别的 success file,在 Airflow 通过感知这个文件来判断 Flink 是否完成了日表的处理。

4. Exactly Once

云存储的 event notification 提供 At Least once 保证。Flink 作业内对文件级别进行去重,作业采用 Exactly Once 的 checkpoint 设定,云存储文件输出基于 MPU 机制等价于支持 truncate,因此云存储输出等价于幂等,因此等价于端到端的 Exactly Once。

七、项目成果和展望

项目已经上线,时延维持在 34 分钟上下,其中包括 15 分钟的等待迟到文件。

  • 第一个 Flink 作业需要 8 分钟左右完成 checkpoint 和输出,json 转 rc 作业需要 12 分钟完成全部处理。我们可以把这个时间继续压缩,但是综合时效性和成本,我们选择当前的状态。
  • json 转 rc 作业耗时比当初的预想的要大,因为上游作业最后一个 checkpoint 输出太多的文件,导致整体耗时长,这个可以通过增加作业的并发度线性的下降。
  • 输出的文件数比批作业输出的文件数有所增加,增加 50% 左右。这是流式处理于批处理的劣势,流式处理需要在时间到达时就输出一个文件,而此时文件大小未必达到预期。好在这个程度的文件数增加不明显影响下游的性能。
  • 做到了下游的完全透明,整个上线前后,没有收到任何用户异常反馈。

该项目让我们在生产环境验证了利用流式处理框架 Flink 来无缝介入批处理系统,实现用户无感的局部改进。将来我们将利用同样的技术,去加速更多其他的 Hive 表的生产,并且广泛提供更细粒度 Hive 表示的生产,例如小时级。另一方面,我们将探索利用 data lake 来管理批流一体的数据,实现技术栈的逐步收敛。

八、后记

由于采用完全不同的计算框架,且需要与批处理系统完全保持一致,团队踩过不少的坑,限于篇幅,无法一一列举。因此我们挑选几个有代表的问题留给读者思考:

  • 为了验证新作业产出的结果与原来 Hive 产出一致,我们需要对比两者的输出。那么,如何才能高效的比较两个 Hive 表的一致性呢?特别是每天有百亿级数据,每条有数百个字段,当然也包含复杂类型 (array, map, array等)。
  • 两个 Flink 作业的 checkpoint 模式都必须是 Exactly Once 吗?哪个可以不是,哪个必须是?
  • StreamFileWriter 只有在 checkpoint 时才接受到 partitionCreated 和 partitionInactive 信号,那么我们可以在它的 snapshotState() 函数里面输出给下游 (下游会保存到 state) 吗?
  • 最后一问:你们有更好的方案可供我们参考吗?

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
7月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
694 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
5月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
321 9
Flink在B站的大规模云原生实践
|
4月前
|
存储 SQL Apache
网易云信 x Doris:降本70%、提速11倍, 统一 ES/InfluxDB/Hive 多技术栈的落地实践
网易云信引入 Apache Doris 统一了原有 Elasticsearch、InfluxDB 和 Hive 多技术栈系统。凭借其高性能和易扩展的特点,提供一站式的数据存储和分析服务。实现机器成本降低 70%、实时场景查询提速 11 倍、离线任务耗时缩短 80% 的显著收益。
368 0
|
6月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
602 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
6月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
362 9
网易游戏 Flink 云原生实践
|
8月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
928 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
8月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
638 1
Flink CDC + Hologres高性能数据同步优化实践
|
8月前
|
SQL 存储 调度
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
204 1
基于 Flink 进行增量批计算的探索与实践
|
8月前
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
|
8月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
383 6

相关产品

  • 实时计算 Flink版