2016年,阿里集团首先使用 Flink +Hologres 构建了内部应用,比如大屏。 2019 年收购了 Flink 初创公司。 2020 年开始, Flink 逐渐发展成为实时计算的事实标准。2022 年,阿里云Flink成为中国唯一进入Forester象限的实时流计算产品,也是唯一全面通过中国信通院基础能力、性能、稳定性三款测评的分布式流处理平台产品。
源于社区、兼容社区、超越社区是阿里云实时计算Flink的定位。
企业版Flink基于社区版做了很多能力的增强,更适合企业使用。
首先,它是一款全托管产品,serverless架构,无需关心资源、版本、运维等。性能上,企业版比开源版本优化提升2-3倍。它提供了企业级的丰富能力,比如上下游数据连接 connector 、作业自动调优、风控相关的能力、智能诊断以及全面告警监控等,也开放了被集成能力,可以与企业内部已有平台做完美整合。
上图为客户案例。客户有基于YARN自建的集群,规模6400核,500多个作业,最大作业 200 核,集群资源水位70%以上,已无法进行扩容。最大痛点在于集群作业互相影响,高峰期间有抢占现象,造成流计算不稳定,运维相关人力投入较多,资源也难以管控与配置。
客户要求成本不能高于自建,且迁移的同时还需完成调优,解决重点作业资源过多的问题。此外,不能改变现有架构,需要与现有企业内的其他系统融合。
经过我们的优化后,该客户的规模由6400核降至3900 核,资源优化率达40%。Flink的企业级能力也得到了充分认可,从测试到迁移到调优,仅花费2个月。
上图为一站式开发流程。
开源Flink没有开发平台,只能使用命令行窗口。越来越多用户希望能够不再通过写 Java 代码进行开发,而是使用 SQL 进行开发,更加简单直观。希望能够在平台上很方便地实现数据源查看、数据结果展示、单步调试以及使用特殊 session 集群进行结果打印,在开发时能够享受更易用的界面。
Flink企业版支持模拟数据生成connector,可以用于做压测,模拟真实业务场景。sessiong集群可以更好地进行作业启停、结果查看。
Flink是一个纯计算引擎,没有中间结果,无法像数据库一样方便地查看,因此运维是一个大问题,对此,我们的平台提供了很多功能。首先,会给运行中的作业提供整体筛查条件,比如基于风险筛查,每分钟刷新状态。在每个作业级别均可设置重要行为告警,比如是否重启、是否 checkpoint 成功以及记录延迟情况并基于延迟告警。
Flink作业的metrics 指标非常丰富,我们将所有指标发送至阿里云 ARMS ,提供基于指标做告警的能力。针对所有数据,平台提供了指标大盘并进行分类,用户可以非常直观地查看。
资源调优也一直是很多用户的痛点。实时作业上线要用多少资源、是否存在浪费等,可以通过 metrics 指标查看 CPU 负载或空闲情况并进行调节。
自建集群在业务高峰低谷时,需要手动启停作业、手动进行调整,过程复杂。而实时计算Flink支持了自动调优与定时调优两种策略。自动调优指用户无需关心资源,作业上线运行一段时间之后可以根据流量、指标情况自动对作业资源进行调节。可以设置一定的规则,比如两次调节时间间隔、资源限制等。定时调优适用于相对高阶的用户,对自己的作业足够了解,比如早上九点将资源配置为10个并发,晚上9点自动调整为 5 个并发。
作业启动之后会遇到很多问题,比如启动时慢、停止时慢。
实时计算Flink提供了基于阿里集团内部最佳实践的智能诊断。诊断项不是随着产品版本发布,而是定期更新,更新频率非常快。它会将常见问题放到诊断里面,目前已经覆盖十几种风险点,诊断结果会告知针对该情况应如何处理。
Flink实时计算的运维覆盖了状态的全生命周期,对业务稳定性、连续性和灵活性起到了至关重要的作用。
实时计算Flink可与企业内部的大数据平台做集成。后续还会推出与开发工具集成的能力,开发工具可以直接将作业提交到到平台上运行。
上图为 9 月发布的重点新特性。
① 状作业态集管理。将状态的管理与作业的启停解绑,不会再因为作业启停对状态造成影响;作业快照完成时间平均提升 5-10 倍,恢复时间平均提升 5 倍。
② 作业资源定时调优。
③ 作业健康分。为每个作业的健康度打分,作业的异常、重启或配置问题都会影响健康分。
④ Open API 。产品均基于 OpenAPI 架构设计,因此在界面上看到的功能都可以集成到企业自己的平台上。
⑤ Flink 1.15 支持。这是业界首次企业版 Flink 1.15 发布。
⑥ 作业启动速度平均提升15%。
上图为作业状态集管理界面。其中包括系统检查点和作业快照,系统检查点即checkpoint,作业快照即savepoint 了。用户可自定义策略,定时策略可以不同的时间粒度设置自动快照,重启时可从快照进行作业恢复。
上图为作业资源定时调优页面。如图所示,可以根据实际需求将周一至周五的并发配置为10,周六周日并发配置为5。此外,时间粒度可以选择为月、周、天以及小时等。
上图为资源分析界面。Flink 产品默认所有作业启动运行时都会开启资源分析功能。作业只要在平台上运行,就会持续分析资源状态是否健康,并给出调整建议。与 CDC作业配合,比如从 Flink到Hologres有从全量到增量写的过程,全量数据比较大,可以将配置改为自动调优,以多并发跑;增量时,数据量比较小,自动将并发下调。
自动调优可以配置相关参数,可以设置规则,比如两次重启时间间隔、最大并发数、最小并发数等均可配置。如果购买了云上产品并选择后付费模式,作业资源自动调优能够极大节省成本。
上图为作业健康分页面。比如显示健康分为46 分,因为作业近期持续重启。如果健康分为100,也会提供相关诊断结果,显示进行检测的项目。
2022年3月和5月发布了很多其他重要功能。
其中Flink CDC是近一年以来推出的非常重要的企业级特性,主要包含以下几个方面:
① MySQL和 PG connector 同步了社区 CDC 2.2版本,修复了一些 bug ,新增了参数。
② 支持了Kafka catalog ,可以新建Kafka catalog ,解析Kafka里的特殊格式信息,支持解析JSON 格式消息获取Schema。JSON 格式发生变化时,写到结果表的结构也会根据 JSON 自动变化。
③ 独有的CDAS语法实现了整库同步,支持分库分表和同步的能力,比如将 MySQL 整库数据、分库分表数据同步到下游 Hologres等产品。
④ MySQL CDC connector 支持PolarDB-X和PolarDB Mysql数据源。
⑤ 入湖方面,内置了企业级Hudiconnector,上游可以打通 CDC 链路,全量增量读取 MySQL 数据,下游可以自动同步到Hudi。上游 MySQL 的表结构变更也可自动同步到Hudi。集成了阿里云OSS和DLF等组件,完善数据在计算引擎之间的连通性。
Flink CDC 是目前云上用户使用较多的场景,它可以替代已有的开源数据集成工具,它基于开源工具做了增强和修复,比如其全量、增量以及全增量一体的能力是其他开源数据集成工具所不具备的,Flink CDC也可以支持更多上游以及 MySQL 相关的数据库,支持分库分表、表结构同步,支持元数据的管理比如无锁读取、断点续传等。下游同样支持很多开源、闭源产品。
比如表结构变更场景,如果上游是 MySQL ,下游是Hologres,当上游表结构发生变更时,能够自动在 Hologres 里完成变更;比如分库分表合并,假设原先作业上游表是三个,分库分表后变为四个,过程中数据作业无需停止,数据可以一直写到下游。
上图为Flink CDC+Hologres实时数仓能力总览。
实时计算Flink能够实现海量数据实时入仓的价值在于:对于MySQl中的TPC-DS 1T数据集,使用阿里云Flink 64并发,只需5小时即可完全同步到Hologres,TPS约30万/秒。Binlog同步阶段,同步性能最快可达 10 万条/秒。
实时计算Flink能够实现双流Join构建DWD宽表。
双流Join使用 Flink 还有一个节省成本的方案,主键关联的场景可以将Join的工作下沉到Hologres,通过Hologres的局部更新功能实现宽表merge,从而省去了Flink Join的状态维护成本。
维表 Lookup 也是 Hologres 与Flink 的组合方案。Hologres 既可以做原表,也可以做结果表,还可以作为维表使用。
上图为Flink CEP系统架构,将开发人员与测试人员进行解耦。规则保存在Hologres或 RDS 里,可以实时查看规则并进行维护和修改。修改之后,Flink会自动完成规则更新,保证业务连续性。另外,该架构支持Flink SQL与Datastream API。
Q&A
Q:自动调优需要要设置资源的最大阈值吗?
A:默认上限为64核,可以调小。
Q:使用 Flink 需要提前购买CU,购买时是否要预留最大值?
A:如果使用自动调优,则应该购买按量付费模式,而不是预留模式。
Q:可以按项目设置不同付费模式吗?
A:可以购买多个实例,比如某个实例按量付费,某个实例包月模式。
刘一鸣:Flink X Hologres构建企业级一站式实时数仓
一、实时数仓的技术需求
大数据计算正从规模化走向实时化、在线化。大屏是传统大数据应用的常见场景,实时展示是它极其重要的功能。
数据分析分为线上和线下两部分。线下部分偏对内分析,要求分析灵活度足够高,因此一般采用数仓技术。在线业务是典型的 QPS 远高于队列分析的场景,QPS通常为10 万+级别,延迟也从秒级变为毫秒级。因此,在线业务必须采用更复杂的架构。一般使用 lambda 架构,一部分数据放在 OLAP 系统,一部分放在在线系系统比如HBase、Redis ,架构相对更复杂。且在线业务引进了更多预计算,提高了计算效率。
基于以上要求,大数据实时数仓体系变得极其纷繁芜杂。
离线加工主要解决海量数据问题,实时加工将计算前置,在事件发生时即产生结果,多采用预计算 Flink 技术,将结果集存放至HBase、Redis ,对外提供更快速的点查服务。中间还存在一些近实时场景,希望分析更灵活,而如果全部采用预计算会牺牲灵活性。因此我通常采用像zk、join等,以明细方式提供分析能力,规模相比离线数仓较差,但灵活性比预计算更好。
因此,同一份数据源会进入离线、在线以及近实时三条链路。用户希望访问数据时有一个统一的门户,无需做不同的授权或访问不同的接口。此外,联邦分析系统跨多数据源访问时,访问体验往往不可预期,取决于数据源的性能。
因此,我不得不将大数据加工变为小数据,导入 Redis 做缓存,但这依旧没有解决灵活性问题。因为将数据聚合之后,无法查看明细。而随着业务的复杂度增加,问题定位也变得愈加困难。每个系统都需要运维,数据冗余、不一致等情况也愈发频繁。
此前的架构下,处处都是数据孤岛,存在高成本、数据冗余、高维护代价、数据一致性无法保证等问题,多个系统带来了高昂的学习成本。分析数据的方式没有本质变化,但是我们不得不因为系统的 IO 效率问题,考虑将一份数据拆成不同方式、存在不同系统里,造成很多数据孤岛。
业界对于实时数仓的要求大概分为三个阶段。
第一阶段相对比较通用,接近于数据库的概念,能做查询、查询接口尽量标准(SQL),分析数据相对灵活。
第二阶段在实时上场景做了很多优化,比如实时写入、高吞吐实时更新、实时查询及写入即可用,希望数据产生到消费中间端的间隔周期足够短。
第三阶段为一站式实时数仓,能够实现端到端实时加工。过去的数仓更偏向于队列分析、经营分析报表,如今已经演变为在线系统。对接了在线业务系统之后,数据有加工业务,有队列经营分析业务,也有在线分析支撑在线系统业务,不同业务之间如何做隔离、如何减少互相干扰、如何做高可用变得格外重要,因此需要支持多负载隔离。同时,离线数仓的典型技术为基于批处理技术,不擅长处理海量数据加工任务。因此大量场景下,需要离线数仓和实时数仓互相补充,通过离线作业对数据进行修正、补充和丰富,然后通过在线业务将它导入实时系统,也称为实时离线一体化。
Transaction 系统往往追求 TPS 足够高,采用行存、支持事务,但数据量上亿之后分析效率急剧下降。因为其存储结构面向行更新,而数据分析场景面向列、可以做压缩和过滤。
OLAP系统专门用于做数据分析,采用列存、压缩、索引、并行化等一系列技术优化大范围数据扫描场景下的统计分析。
在线Serving系统主要为将大数据加工好的数据导出到 Redis、HBase等供在线业务系统访问。此类系统往往 QPS要求很高,对延迟敏感,不允许抖动,但是其查询相方式相对简单,不会支持很复杂的 SQL 表达式,希望用更简单的方式来查询以及更新。
每个系统都有自己的设计场景,导致我不得不在系统之间做很多数据传递。而HTAP系统既可以做 transaction 又可以做 OLAP ,但这类系统的并发能力受限于 transaction 。另外,OLTP 系统产生的数据往往不符合分析师的需求,分析师需要看语义层,是对表的封装和抽象。
那么,是否可以创新一个系统,既可以做 OLAP 又可以做 serving?此类系统对数据分析有更高要求,但对事务没有明显要求。如果放弃分布式事务,则可能将系统性能、可靠性都做得更好。
Hologres是经典的 OLAP 系统,基于 MPP 架构,包含了数据分片、分段、索引技术等,其查询效率与用户对系统的熟悉程度直接相关,从毫秒级到秒级不等。它与其他 OLAP 系统的差别在于其更新能力更好。因此 Hologres设计之初即希望能够提供标准的使用方式,使用户像使用单机数据库一样使用大数据。
Hologres之所以能实现很高的 QPS,主要得益于它的约束:查询场景要基于主键点查,存储要设计为行存。基于主键查询可以支持很高的 QPS ,比如 Flink 做数据加工时与维表做关联可以做到 10w+ QPS 。另外,Hologres既有行存又有列存,两种存储方式的结合能扩展出更多可能性。
传统 OLAP 系统为列存,基于主键查询使用行存。非主键点查的需求下,数据库系统的常见解决方案为做二级索引。Hologres也可以实现类似的方案,做行列共存,将列存作为二级索引,先进行过滤,再根据列存过滤之后的rowID 找到该行数据。
湖仓一体的趋势下,仓计算能力能否与湖上数据做关联?以Hologres为例,它和阿里的maxCompute、DLF、 OSS都可以做很好的关联,可以将外部数据定义成外表,再用 Hologres SQL语句访问外表。另外,它能够实现百万级每秒的数据同步,离线数仓的数据不导出也可加速分析,意味着很多数据更新场景将变得更灵活。
Hologres对外提供基于开源 postgres 的兼容协议,兼容各类主流BI工具。存算分离架构能够实现灵活的扩缩容,使成本更加可控。
上图左侧为此前的旧方案,需要七八个组件,因为不同系统的性能需要作出很多折中与牺牲。而右侧是新方案,实时加工部分基于 Flink ,离线加工部分基于 maxCompute,结果及对外提供服务部分收拢在 Hologres,整个数仓架构简化成加工层和服务层。
二、Flink+Hologres核心能力
实时数仓的第一个要求为高性能实时写入与更新。
上图左侧为标准的数据库执行 SQL 语句,需要做分发、汇聚、排序、扫描算子等,计算流程过多,因此性能不佳。右侧黑框为Hologres里的 fixplan ,它将一条 SQL 执行计划省略了物理执行计划、逻辑执行计划、优化等,从写入到端直接写存储层,将性能做到极致。
Fixplan的吞吐足够高, 128 core规格、10个并发、 20 列的列存表,如果表里没有主键,可以实现每秒钟 200 万的写入;如果表里带了主键,写入大概缩减 10%-20%;如果有主键又频繁更新,Hologres提供了insert or update 能力,如果记录里没有该条数据则插入,如果有则替换或更新。
上图为Hologres 128core实例下,TPCH 100G(亿级别)标准数据集下的测试结果,有多表查询、子查询、开窗操作等,大部分查询响应时间为秒和毫秒级别。这意味着有机会为用户以尽量明细的数据方式提供交互式分析,提升开发效率。
上图为点查性能。25000QPS/s的查询,延迟大多在5ms以内,可用于HBase和 Redis 场景下。
读写分离通常的实现方式为一个master 带多个 slave ,而Hologres与其他系统稍有不同。它的架构下,所有节点共享一份数据存储,根据不同业务场景分配不同的计算资源,不同实例可以根据业务对 QPS 和稳定性的要求随时调整。另外,物理资源互相隔离,任何实例下线都不会影响其他业务。Hologres 当前的设计思路为一主最多带四重。
Hologres的每一张表都可以像数据库一样吐出Binlog 给 Flink 加工程序,实现端到端的实时加工。可以随时通过 SQL 语句检查数据状态并更正,查错、排错、修正问题的成本大幅下降。
数仓数据要结构化,但是随着业务发展埋点数据越多,行为数据也越多。如果将每个数据打平拉宽,会到来极高的开发成本。Hologres将 JSON 数据作为一级数据类型支持。 JSON 是一种很灵活的数据结构,可以随时加字段、加子节点,但是它在分析效率上存在瓶颈,必须将整个 JSON 完整体解析出来后才可以读取。
Hologres能将JSON 数据列存化,将 JSON 中的每个子节点内部虚拟成某个列存储下来,用户依然将 JSON 看作body 使用,但实际存储上将每个子节点都作为某个列使用。列上可以做索引、压缩、过滤, 使IO 访问效率得到本质的提升。
具体示例如上。原始数据应用层展现的 JSON 树形结构,内部存储时它会变为不同表,但是用户无法看到。使用时,如果只过滤某个字段,则相当只解析一列数据。
此前,只能通过调度方式或加 Flink 程序的方式实现聚合操作。而Hologres原生提供了实时物化视图,相当于一张事件表。事件表实时写入时,基于事件表制定聚合规则和聚合视图,结果实时更新,直接查询物化表即可。即使查询明细表,也会根据查询特征自动重写到聚合表,将此前查明细数据的复杂度降低到查聚合数据的复杂度,是O(n) 到 O(1) 的过程。
三、Flink +Hologres企业级实时数仓实践
实时数仓的开发方法论可以分为三个阶段。
第一阶段:探索期。探索期是面向特定业务的烟囱式开发,从采集、加工到服务,端到端地加工报表,上线速度很快。缺点在于每次上业务时,都需要重新拉数据、写加工、存结果表。而业务之间有大量内容可以共享,并不需要一再重复建设。且数据逻辑或数据质量有变化时,所有脚本都需要调整,运维成本和资源消耗越来越高。
第二阶段:规范期。规范期的核心思路为减少数据重复建设,将很多可以被共享的指标沉淀到数仓。此阶段与探索期的区别在于,加工环节分层,不再采用端到端的加工方式;另一方面,在存储层引进数仓,通过数仓将可以共享的指标进行沉淀。
该架构的缺点在于,并发与延迟无法支撑在线业务系统,同时需要将一份数据存储至HBase系统,数据状态也没有收拢。同步不及会造成各种口径不一,会引发更大的麻烦。
第三阶段:优化期。该阶段的核心变化在于数据服务层的抽象,加工层通过 Flink 收拢,服务层通过Hologres收拢,让架构更加简单,减少口径不一致。
Hologres与 Flink 实现了很多联合创新。
Flink从读取数据 source 表到关联数据维度表,再到管理员数据 catalog 表到写数据sink 表,每个环节都要与周边各种存储交互。Hologres在每个环节都做了原生支持,支持全表读取、Binlog读取实现批流一体、维度表支持百万级以上 QPS 、对结果表支持宽表更新。
Flink +Hologres会将数据链路分为加工层和服务层。加工层分为两个部分,分别是明细加工和汇聚加工。明细加工偏数据质量整理,汇聚加工偏业务属性整理。数据服务层也为分明细数据服务和汇聚数据服务。有些业务系统希望数据更细致、更巨灵活性,可以将数据存为明细表,直接对外提供服务;有些业务场景下明细数据太多太大,比如埋点数据、点击流数据,因此必须聚合为某一个 session 的统计数据才能被分析使用。将数据变为聚合方式主要有三种方法:
第一,数据经过简单加工落盘对外提供服务方式,适用于数据量不大、对数据明细要求比较高的场景。
第二,简单分析之后落盘,在数仓里做加工。加工脚本灵活,数据可以随时修正。
第三,全部预加工,适用于端到端延迟很敏感的场景,比如风控、推荐等。该方式将尽量多的任务通过事件驱动方式高度汇聚变为小结果并提供服务。
实时数仓场景1:即席查询
假设上图左侧为消息中间件Kafka,右侧为数仓。数仓会被分层,比如会物化成表或视图。
假设写入的为明细数据,通过视图封装了很多业务逻辑,该场景下原始数据有任何变更,分析端看到的都为最新数据。但缺点为计算量会变得非常大,因为没有提前做聚合,因此每次分析是明细数据。更适用于高价值数据的分析。
实时数仓场景2:分钟级准实时
数仓里引入了调度,分层依旧存在,但是通过调度可以将之前通过视图封装的逻辑封装为表。数据访问时, IO 效率得到很大改善,并发提高很多倍。该方式与离线数仓的开发方式几乎一样,只是增加了调度方式提高了效率。
实时数仓场景3:增量数据实时统计
如果更多业务逻辑是通过实时方式——Flink+Kafka 驱动的方式做加工。该方式时效性非常好,但依然存在改进点。如果重消息状态都在Kafka,对数据状态的修正检查不友好。因此,该场景下,数据向下一层传递时要落盘。这意味着数据有问题时,可以通过数仓检查某一层数据状态是否正确,且可以在数仓里进行修正。而如果没有数仓原始状态,一旦数据出错,则必须从原始数据重新拉,修正成本非常高。
场景1之下,数据落盘直接使用,适合开发阶段。因为开发阶段无法明确使用什么口径、什么数据,因此通过视图方式对其进行定义数仓开发阶段的常见方式,也符合时效性要求。
场景2为微批方式,通常来说,人类做决策绝大部分情况下不需要以秒为间隔,以小时为间隔的决策频率已经足够。
场景3为完全的事件驱动方式,推荐风控等机器决策的场景使用。更多采用计算前置的方式,计算场景可累加。
实时数仓不是一种技术,也不是一种产品,而是不同技术和产品的组合方式,需要根据不同业务场景来选择不同技术。