作者:
王天宜 - StarRocks 解决方案架构师
周康 - 阿里云开源大数据OLAP团队
实时数仓建设背景
实时数仓需求
随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大。以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务。在部分领域,数据实时处理的能力已经成为限制企业数据变现的重要瓶颈之一。搭建数据看板快节奏地进行数据分析,已经成为了一种必然的选择。
实时数仓发展
实时数仓有三个著名的分水岭:第一个分水岭是从无到有,Apache Storm 的出现打破了 MapReduce 的单一计算方式,让业务能够处理 T+0 的数据;第二个分水岭是从有到全,Lambda 与 Kappa 架构的出现,使离线数仓向实时数仓迈进了一步,而 Lambda 架构到 Kappa 架构的演进,实现了离线数仓模型和实时数仓模型的紧密结合;第三个分水岭是从繁到简,Flink 技术栈的落地使实时数仓架构变得精简,并且是现在公认的流批一体最佳解决方案。
以 Flink 作为实时计算引擎实现的实时数仓,将一部分复杂的计算转嫁给 OLAP 分析引擎上,使得应用层的分析需求更加灵活。但仍然无法改变数据仓库变更数据的排斥。下一代的实时数仓平台,不仅要提供更为优秀的性能,同时也需要更为完善的功能以匹配不同的业务。
作为一款全平台极速 MPP 架构,StarRocks 提供了多种性能优化手段与灵活的建模方式,在预聚合、宽表和星型/雪花等多种模型上,都可以获得极致的性能体验。通过 StarRocks 结合 Flink 构建开源实时数仓的方案,可以同时提供秒级数据同步和极速分析查询的能力。同时,通过 StarRocks 主键模型,也可以更好地支持实时和频繁更新等场景。
基于 Flink 的开源实时数仓痛点
原有基于 Flink 构建实施数仓的方案中,由于数据源的多样性,需要使用不同的采集工具,如 Flume、Canal、Logstash。对于不同的业务,我们通常会采用不同的分析引擎。比如,对于固定报表业务,根据已知的查询语句可以预先将事实表与维度表打平成宽表,充分利用 ClickHouse 强大的单表查询能力;对于高并发的查询请求,可以使用 Apache Druid 承受大量用户高峰时期集中使用带来的并发压力。通过技术栈堆叠的方式确实可以满足业务要求,但也会让分析层变得臃肿,增加开发与运维的成本。
一般来说,StarRocks X Flink 构建开源实时数仓生态架构分为五层:
- 第一层是数据源。数据源可以是多种多样的,比如说 MySQL Binlog、爬虫数据或者是平面文件;
- 第二层是数据采集层。用户使用多种不同的 CDC 工具,比如 Canal、Debezium 拉取上游的增量数据,通常会将数据写入到 Kafka 中,而后在通过 Flink 消费 Kafka 中的数据;
- 第三层是实时计算层。可以通过 Flink 的实时计算能力完成轻量级的 ETL 工作,如拼宽表或数据清洗等;
- 第四层是数据存储层。Flink 相比其他的实时技术栈更加依赖 OLAP 引擎;
- 最后一层是后端应用层。可以是实时监控系统,实时报表系统,实时推荐系统以及实时数据接口服务。
我们常说,天下武功,唯快不破。以 Flink 为计算引擎构建的实时数仓系统,最关心的就是数据摄入速度足够快,延迟足够低。 在这样一套架构中,数据从数据源到 OLAP 分析系统途径采集工具层,消息队列层,实时计算层。冗长的链路给开发和运维带来了极大的风险,任何一个模块的阻塞都会对实时性产生影响。同时,在数据存储层上,我们也会选择不同的存储引擎适配不同的业务。对于上面的数据链路,我们也面临着诸多的挑战,需要从时效性、功能性及可维护性上做更多的探索,由此可以总结归纳出多个方面尚待优化:
- CDC 组件不统一,链路过长,任何组件出现瓶颈都会对时效性产生影响,组件过多,需要多部门协作维护,学习成本与维护成本成倍增长;
- 部分同步组件,如 Debezium 在保证数据一致性时,需要对读取的表加锁,可能会影响业务更新;
- 分析层使用多种数据存储产品适应不同的业务类型,难以有一种产品能够适应大部分的业务;
- 去重操作对应逻辑复杂,需要在 flink 里面增加 MapStat 逻辑。
Flink CDC,打通端到端链路
Flink CDC 是由 Flink 社区开发的集数据采集、数据转换、数据装载一体的组件,可以直接从 MySQL、PostgreSQL、Oracle 等数据源直接读取全量或增量数据并写入下游的 OLAP 数据存储系统。使用 Flink CDC 后,可以简单高效的抓取上游的数据变更,同步到下游的 OLAP 数据仓库中。
构建一体化数据传输链路
在传统的实时数仓建设中,数据采集工具是不可或缺的。由于上游的数据源不一致,通常来说我们可能会在数据采集层接入不同的同步与采集工具,比如采集 Oracle 中的数据时,我们通常选择 GoldenGate,而对于 MySQL,我们可能会选择 Canal 或 Debezium。有些采集工具支持全量数据同步,有些支持增量数据同步。数据经过采集层后,会传输到消息队列中如 Kafka,然后通过 Flink 消费 Kafka 中的增量数据再写入下游的 OLAP 数据仓库或者数据湖中。
在业务开发中,上游的数据源、消息中间件、Flink 以及下游的分析性数据仓库通常在不同的部门进行维护。遇到业务变更或者故障调试时,可能需要多个部门协作处理,增加了业务开发与测试的难度。通过使用 Flink CDC 替换上图中的数据采集组件与消息队列,将虚线框中的采集组件与消息队列合并到计算层 Flink 中,从而简化分析链路,降低维护成本。同时更少的组件也意味着更少的故障与传输瓶颈,数据实效性会进一步的提高。
在使用 Flink CDC 之后,数据链路中的组件变得更少,架构变得清晰简单,维护变得更方便。如在上面的例子中,我们使用 Flink CDC 拉取 MySQL 中的增量数据,通过 Flink SQL 创建事实与维度的 MySQL CDC 表,并在 Flink 中进行打宽操作,将结果写入到下游的 StarRocks 中。通过一个 Flink CDC 作业就可以完成抓取,转换,装载的全过程。
全量 + 增量数据同步
在传统的数据同步框架中,我们通常会分为两个阶段:
- 全量数据同步阶段:通过全量同步工具,如 DataX 或 sqoop 等,进行快照级别的表同步。
- 增量数据同步阶段:通过增量同步工具,如 Canal 或 GoldenGate 等,实时拉取快照之后的增量数据进行同步。
在全量数据同步时,为了加快导入的速度,我们可以选择多线程的导入模式。在多线程模型下进行全量数据同步时,在对数据进行切分后,通过启动多个并发任务完成数据的同步。由于多个并发业务之间可能不属于同一个读事务,并且存在一定的时间间隔,所以不能严格的保证数据的一致性。为了保证数据的一致性,从工程学与技术实现的角度做平衡,我们有两种方案:
- 停止数据的写入操作,通过锁表等方式保证快照数据的静态性。但这将影响在线的业务。
- 采用单线程同步的方式,不再对数据进行切片。但导入性能无法保证。
通过 Flink CDC,可以统一全量 + 增量的数据同步工作。Flink CDC 1.x 版本中,采用 Debezium 作为底层的采集工具,在全量的数据读取过程中,为了保证数据的一致性,也需要对库或表进行加锁操作。为了解决这个问题,Flink 2.0 中引入了 Chunk 切分算法保证数据的无锁读取。Chunk 的切分算法类似分库分表原理,通过表的主键对数据进行分片操作。
在经过 Chunk 数据分片后,每个 Chunk 只负责自己主键范围内的数据,只要保证每个 Chunk 的读取一致性,这也是无锁算法的基本原理。
StarRocks,实时数据更新新方案
StarRocks 是一款极速全场景 MPP 企业级数据仓库产品,具备水平在线扩缩容能力,金融级高可用,兼容 MySQL 协议和 MySQL 生态,提供全面向量化引擎与多种数据源联邦查询等重要特性。作为一款 MPP 架构的分析性数据仓库,StarRocks 能够支撑 PB 级别的数据量,拥有灵活的建模方式,可以通过物化视图、位图索引、稀疏索引等优化手段构建极速统一的分析层数据存储系统。
StarRocks 在 1.19 版本推出了主键模型(Primary Key model)。相较更新模型,主键模型可以更好地支持实时和频繁更新等场景。主键模型要求表有唯一的主键(传统数据仓库中的 primary key),支持对表中的行按主键进行更新和删除操作。
主键模型对实时数据变更的优化
在 OLAP 数据仓库中,可变数据通常是不受欢迎的。在传统数仓中,一般我们会使用批量更新的方式处理大量数据变更的场景。对于数据的变更我们有两种方法处理:
- 在新的分区中插入修改后的数据,通过分区交换完成数据变更。
- 部分 OLAP 数据仓库产品提供了基于 Merge on Read 模型的更新功能,完成数据变更。
分区交换数据更新模式
对于大部分的 OLAP 数据仓库产品,我们可以通过操作分区的方式,将原有的分区删除掉,然后用新的分区代替,从而实现对大量数据的变更操作。一般来说需要经历三个步骤:
- 创建一张新的分区表,根据业务变更,将新的数据存储到新表中;
- 卸载并删除原有的分区;
- 将新表中的分区装载到目标表中。
通过交换分区来实现大规模数据变更是一个相对较重的操作,适用于低频的批量数据更新。由于涉及到了表定义的变更,一般来说开发人员无法通过该方案独立完成数据变更。
Merge on Read 数据更新模式
部分的 OLAP 数据仓库提供了基于 Merge on Read 的数据变更模型,如 ClickHouse 提供了 MergeTree 引擎, 可以完成异步更新,但无法做到数据实时同步。在指定 FINAL 关键字后,ClickHouse 会在返回结果之前完成合并,从而实现准实时的数据更新同步操作。但由于 FINAL 操作高昂的代价,不足以支撑实时数仓带来的频繁维度更新需求。同时,即便是在低频的更新场景中,也无法将 ClickHouse Merge Tree 的方案复制到其他存储系统中。
StarRocks 提供了与 ClickHouse Merge Tree 类似的更新模型(Unique Key model),通过 Merge on Read 的模式完成数据的更新操作。在更新模型中,StarRocks 内部会给每一个批次导入的数据分配一个版本号,同一主键可能存在多个版本,在查询时进行版本合并,返回最新版本的记录。
Merge on Read 模式在写入时简单高效,但读取时会消耗大量的资源在版本合并上,同时由于 merge 算子的存在,使得谓词无法下推、索引无法使用,严重的影响了查询的性能。StarRocks 提供了基于 Delete and Insert 模式的主键模型,避免了因为版本合并导致的算子无法下推的问题。主键模型适合需要对数据进行实时更新的场景,可以更好的解决行级别的更新操作,支撑百万级别的 TPS,特别适合 MySQL 或其他业务库同步到 StarRocks 的场景。
在 TPCH 标准测试集中,我们选取了部分的查询进行对比,基于 Delete and Insert 模式的主键模型相较于基于 Merge on Read 的 Unique Key 模型,性能有明显的提高:
Query |
数据量 |
Primary Key (Delete and Insert) |
Unique Key (Merge on Read) |
性能提升 |
|
导入过程中 |
SELECT COUNT(*) FROM orders; |
8000 万 |
0.24 sec |
1.15 sec |
6.29x |
SELECT COUNT(*) FROM orders; |
1.6 亿 |
0.31 sec |
3.4 sec |
10.97x |
|
SELECT COUNT(*), SUM(quantify) FROM orders WHERE revenue < 2000; |
1000 万 |
0.23 sec |
3.49 sec |
15.17x |
|
导入后 |
SELECT COUNT(*) FROM orders; |
2 亿 |
0.32 sec |
1.17 sec |
3.66x |
SELECT COUNT(*), SUM(quantify) FROM orders WHERE revenue < 2000; |
1200 万 |
0.34 sec |
1.52 sec |
4.47x |
主键模型对去重操作的支持
消除重复数据是实际业务中经常遇到的难题。在数据仓库中,重复数据的删除有助于减少存储所消耗的容量。在一些特定的场景中,重复数据也是不可接受的,比如在客群圈选与精准营销业务场景中,为了避免重复推送营销信息,一般会根据用户 ID 进行去重操作。在传统的离线计算中,可以通过 distinct 函数完成去重操作。在实时计算业务中,去重是一个增量和长期的过程,我们可以在 Flink 中通过添加 MapState 逻辑进行去重操作。但通过 MapStat,多数情况下只能保证一定的时间窗口内数据去重,很难实现增量数据与 OLAP 库中的存量数据进行去重。随着时间窗口的增加,Flink 中的去重操作会占用大量的内存资源,同时也会使计算层变得臃肿复杂。
主键模型要求表拥有唯一的主键,支持表中的行按照主键进行更新和删除操作。主键的唯一性与去重操作的需求高度匹配,在数据导入时,主键模型就已经完成了去重操作,避免了手动去重带来的资源消耗。通过对业务逻辑的拆解,我们可以选取合适去重列作为主键,在数据导入时通过 Delete and Insert 的方式完成“数据根据唯一主键进行去重”的需求。相比于在 Flink 中实现去重,StarRocks 主键模型可以节省大量的硬件资源,操作更为简单,并且可以实现增量数据加存量数据的去重操作。
主键模型对宽表数据变更优化
在固定报表业务场景中,通常会根据固定的查询,在 Flink 中对数据进行简单的业务清洗后打平成宽表,借用宽表极佳的多维分析性能,助力查询提速,同时也简化了分析师使用的数据模型。但由于宽表需要预聚合的属性,在遇到维度数据变更的情况,需要通过重跑宽表以实现数据更新。StarRocks 的主键模型不仅可以应用于数据变更场景,同时部分列更新的功能,也高度契合多种业务对宽表中不同字段进行部分更新的需求。
在宽表模型中,一般会有几十上百甚至上千列。这给通过 UPSERT 方式完成数据更新的主键模型带了一定的挑战。我们需要获得变更行的所有信息后,才能后完成宽表的数据更新。这使得变更操作会附带上回表读取的操作,需要从 StarRocks 中拉取变更的数据行,然后拼出插入的语句完成数据更新。这给 StarRocks 带来了极大的查询压力。部分列更新的功能(partical update)极大程度的简化 upsert 操作。在开启参数 partial_update 后,我们可以根据主键,只修改部分指定的列,原有的 value 列保持不变。
如下面的例子中,我们可以通过 Routine Load 导入方式来消费 Kafka 中的数据。在 properties 中需要设置 "partial_update" = "true",指定开启部分列更新模式,并指定需要更新的列名 COLUMN(id, name)。
CREATE ROUTINE LOAD routine_load_patical_update_demo on example_table COLUMNS (id, name), COLUMNS TERMINATED BY ',' PROPERTIES ( "partial_update" = "true") FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101.0.0.200" );
StarRocks X Flink CDC,打造极速统一的开源实时数仓平台
Flink CDC 解决了数据链路冗长的问题,而 StarRocks 在 OLAP 分析层提供了极致的性能与一体化的数据存储方案以匹配不同的业务场景。通过 StarRocks 结合 Flink CDC 构建的实时数仓平台的方案,能够极大程度的减少开发与运维的成本。
StarRocks X Flink CDC,宽表实时数仓架构
使用 StarRocks 与 Flink CDC 的联合解决方案,我们可以较为清晰的将实时数仓规划成为四层结构:
- 数据源层,实时应用层,与原有架构相同,未做调整
- 数据传输与计算层,通过引入 Flink CDC,将数据采集层,消息队列与事实计算层都放置在 Flink CDC 中,简化了数据链路,减少了开发与运维成本。
- 数据分析与存储层,StarRocks 中作为分析层数据存储引擎,能够提供不同的数据模型支撑不同类型的业务,简化了分析层数据存储复杂的技术栈。
在 ETL 不复杂的场景,我们可以将大部分 ETL 的操作放在 Flink 中实现。在某些场景下,业务模型相对简单,事实数据与维度数据利用 Flink 多流 join 的能力打平成宽表,在 Flink 中完成了 DWD,DWS 与 ADS 层模型划分。同时对于非结构化的数据,也可以增量写入到 Iceberg、Hudi 或 Hive 中,利用 StarRocks 的外表功能完成湖仓一体的架构。
当 ETL 的过程中引入较为复杂的业务逻辑是,可能会在 Flink 计算层占用大量的内存资源。同时,宽表的模式无法应对查询不固定的多维度分析场景。我们可以选择使用星型模型来替换宽表模型,将数据清洗与建模的操作放到 StarRocks 中完成。
StarRocks X Flink CDC,实时数据变更架构
在某些复杂的业务,如自助 BI 报表,运营分析等场景中,分析师往往会从不同的维度进行数据探查。查询的随机性与灵活性要求 OLAP 分析引擎对性能和多种建模方式都有良好的支持,以满足使用者近乎“随意”的在页面上拉去指标和维度,下钻、上卷和关联查询。
对于 StarRocks 而言,可以使用更为灵活的星型模型代替大宽表。为了增强多表实时关联能力,StarRocks 提供了不同的 join 方式,如 Boardcast Join、Shuffle Join、Bucket Join、Replica Shuffle Join、Colocation Join。CBO 会根据表的统计信息选择 join reorder 与 join 的类型。同时也提供了多种优化手段,如谓词下推、limit 下推、延迟物化等功能,进行多表关联的查询加速。
基于 StarRocks 的实时 join 能力,我们可以将 ETL 操作后置到 StarRocks 中,在 StarRocks 通过实时 join 的方式完成数据建模。同时通过 Primary Key 模型对于数据变更的支持,可以在 StarRocks 中创建缓慢变化维实现维度数据变更。
通过星型/雪花模型构建的实时数仓,可以将计算层 Flink 的建模操作后置到 StarRocks 引擎中。在 Flink 中,只需要做 ODS 层数据的清洗工作,维度表与事实表会通过 Flink CDC 同步写入到 StarRocks 中。StarRocks 中会在 ODS 层进行事实数据与维度数据的落地,通过聚合模型或物化视图完成与聚合操作。利用 StarRocks 的实时多表关联能力,配合智能 CBO 优化器,稀疏索引及向量化引擎等多种优化手段,能够快速计算查询结果,保证业务的在不同模型层的数据高度同源一致。
在现实生活中,维度的属性并非是静止的,会随着时间的流逝发生缓慢的变化。星型模型可以将事实表与维度表独立存储,将维度数据从宽表中解藕,从而利用 StarRocks 的主键模型处理缓慢变化维的问题。一般来说,我们有三种方案处理缓慢变化维的问题:
- 使用主键模型,直接根据主键覆盖原有的维度值。这种方式较为容易实现,但是没有保留历史数据,无法分析历史维度变化信息;
- 使用明细模型,直接添加维度行,通过 version 列来管理不同的维度属性版本,改种方案在查询是需要根据业务条件筛选出合适的维度 version
- 使用主键模型,在主键中引入 version 信息,混合使用直接修改与新添加维度行的方法,该方法较为复杂,但也能更全面的处理复杂的维度变化需求
StarRocks X Flink CDC 用户案例
在某知名电商平台业务中,通过使用 StarRocks 与 Flink CDC 极大程度的简化聊数据链路的复杂度。用户通过 StarRocks 构建实时数据看板平台,实现了多维度数据筛选、灵活漏斗分析、不同维度上卷下钻的灵活分析。
困难与挑战
在电商数据看板平台中,最初选择了 ClickHouse 作为数据分析层的存储引擎。但随着业务的发展,ClickHouse 在部分场景中无法有效的支撑,主要体现在以下几个方面:
- 根据用户下单的操作,部分订单的状态会发生变化。但一般来说,超过两周的订单状态基本不会发生变化;
- 部分变化的数据不适合通过宽表的形式存储,部分的业务需求迭代较为频繁,宽表 + 星型模型的建模方式可以更好的服务于业务变更;
- ClickHouse 扩缩容操作复杂,无法自动对表进行 rebalance 操作,需要较长的业务维护窗口。
为了解决以上的问题,该电商平台重新做了技术选型。经过不断的对比与压测,最终选择使用 StarRocks 作为分析层的数据存储引擎。
系统架构
在实时看板业务中,主要可以划分成五个部分:
数据源层:数据源注意有两种,来自 Web 端与客户端的埋点日志,以及业务库中的订单数据;
Flink CDC:Flink CDC 抓取上游的埋点日志与业务数据,在 Flink CDC 中进行数据的清洗与转换,写入到 StarRocks 中;
数据存储层:根据业务的需求,将 DWD 层中的事实数据联合维度数据拼成宽表,通过视图的方式写入到 DWS 层,在 ADS 层划分出不同的主题域;
数据服务层:包含了数据指标平台和漏斗分析平台两部分,根据内部的指标、漏斗定义进行逻辑计算,最终生成报表供分析师查看;
数据中台:围绕大数据分析平台,提供稳定性保障、数据资产管理、数据服务体系等基础服务;
选型收益
数据传输层:通过 Flink CDC 可以直接拉取上游的埋点数据与 MySQL 订单库中的增量数据。相比于 MySQL -> Canal -> Kakfa -> Flink 的链路,架构更加清晰简单。特别是对于上游的 MySQL 分库分表订单交易库,可以在 Flink CDC 中通过 Mapping 的方式,将不同的库中的表和合并,经过清洗后统一写入到下游的 StarRocks 中。省略了 Canal 与 Kafka 组件,减少了硬件资源成本与人力维护成本。
数据存储层:通过 StarRocks 替换 ClickHouse,可以在业务建模时,不必限制于宽表的业务模型,通过更为灵活的星型模型拓展复杂的业务。主键模型可以适配 MySQL 业务库中的订单数据变更,根据订单 ID 实时修改 StarRocks 中的存量数据。同时,在节点扩容时,StarRocks 更为简单,对业务没有侵入性,可以完成自动的数据重分布。
性能方面:单表 400 亿与四张百万维度表关联,平均查询时间 400ms,TP99 在 800ms 左右,相较于原有架构有大幅的性能提升。替换 StarRocks 后,业务高峰期 CPU 使用从 70% 下降到 40%。节省了硬件成本。
在极速统一上更进一步
一款优秀的产品,只提供极致的性能是不够的。还需要丰富的功能适配用户多样的需求。未来我们也会对产品的功能进行进一步的拓展,同时也会在稳定性与易用性上做进一步的提升。
日前,阿里云 E-MapReduce 与 StarRocks 社区合作,推出了首款 StarRocks 云上产品。我们也可以在 EMR 上选择相应规格的 Flink 与 StarRocks。为了提供更好的使用体验,阿里云 E-MapReduce 团队与 StarRocks 也在不断的对产品进行优化,在未来的几个月会提供以下的功能:
- 多表物化视图:StarRocks 将推出多表关联物化视图功能,进一步加强 StarRocks 的实时建模能力;
- 湖仓一体架构:StarRocks 进一步 Apache Iceberg 与 Apache Hudi 外表功能,打造 StarRocks 湖仓一体架构;
- 表结构变更同步:在实时同步数据的同时,还支持将源表的表结构变更(增加列信息等)实时同步到目标表中;
- 分库分表合并同步:支持使用正则表达式定义库名和表名,匹配数据源的多张分库分表,合并后同步到下游的一张表中;
- 自定义计算列同步:支持在源表上新增计算列,以支持您对源表的某些列进行转换计算;
一款优秀的产品也离不开社区的生态,欢迎大家参与 StarRocks 与 Flink 社区的共建,也欢迎大家测试 StarRocks Primary Key X Flink CDC 的端到端实时数仓链路方案。
我们会在钉钉群定期推送精彩文章,邀请技术大牛直播分享。
欢迎钉钉扫码加入技术交流群参与讨论~