StarRocks和clickhouse优缺点对比?适用场景?
StarRocks适用场景
StarRocks 可以满足企业级用户的多种分析需求,包括 OLAP (Online Analytical Processing) 多维分析、定制报表、实时数据分析和 Ad-hoc 数据分析等。 OLAP 多维分析 利用 StarRocks 的 MPP 框架和向量化执行引擎,用户可以灵活的选择雪花模型,星型模型,宽表模型或者预聚合模型。适用于灵活配置的多维分析报表,业务场景包括: 用户行为分析 用户画像、标签分析、圈人 高维业务指标报表 自助式报表平台 业务问题探查分析 跨主题业务分析 财务报表 系统监控分析 实时数据仓库 StarRocks 设计和实现了 Primary-Key 模型,能够实时更新数据并极速查询,可以秒级同步 TP (Transaction Processing) 数据库的变化,构建实时数仓,业务场景包括: 电商大促数据分析 物流行业的运单分析 金融行业绩效分析、指标计算 直播质量分析 广告投放分析 管理驾驶舱 探针分析APM(Application Performance Management) 高并发查询 StarRocks 通过良好的数据分布特性,灵活的索引以及物化视图等特性,可以解决面向用户侧的分析场景,业务场景包括: 广告主报表分析 零售行业渠道人员分析 SaaS 行业面向用户分析报表 Dashboard 多页面分析 统一分析 通过使用一套系统解决多维分析、高并发查询、预计算、实时分析查询等场景,降低系统复杂度和多技术栈开发与维护成本。 使用 StarRocks 统一管理数据湖和数据仓库,将高并发和实时性要求很高的业务放在 StarRocks 中分析,也可以使用 External Catalog 和外部表进行数据湖上的分析。
产品特性:
MPP 分布式执行框架 pipeline并行执行框架 全面向量化执行引擎 Global Runtime Filter CBO 优化器 可实时更新的列式存储引擎 智能的物化视图 数据湖分析 存算分离
优点:
单表查询和多表查询性能都很强,可以同时较好支持宽表查询场景和复杂多表查询。 支持高并发查询。 支持实时数据微批ETL处理。 流式和批量数据写入都能都比较强。 兼容MySQL协议和标准SQL。 能够保证数据的exactly-once 支持多种分布式Join方式,支持多种数据模型 架构简单,易于维护,无侵入式弹性伸缩与扩容 新版本支持存算分离,尽可能的减少依赖外部组件
缺点:
数据导入性能:虽然StarRocks支持多种数据导入方式,但在大数据量的情况下,数据导入性能可能会成为瓶颈。 数据更新:StarRocks的数据更新操作相对复杂,需要通过DELETE+INSERT或者MOR的方式进行,这可能会影响到数据更新的效率或者查询效率。 数据一致性:在分布式环境中,保证数据的一致性是一大挑战。尽管Doris/StarRocks有一套完整的数据一致性保证机制,但在某些情况下,可能还是会出现数据不一致的问题。 内存消耗:Doris/StarRocks在处理大数据量的查询时,可能会消耗大量的内存,这可能会对系统的稳定性造成影响。 ETL能力:大规模ETL能力不足 flink-doris-connector写入是单并行度写入的,flink-connector-starrocks写入是可以支持多并行度写入的,多并行度情况下需要考虑如何保证数据有序性。
clickhouse适用场景:
用户行为分析。ClickHouse将用户行为分析表制作成一张大的宽表,减少join的形式,实现路径分析、漏斗分析、路径转化等功能。除此之外,它还能支撑广告,营销和AB实验。 实时BI报表。ClickHouse可以根据业务需求,实时制作及时产出,查询灵活的BI报表,包括订单分析,营销效果分析,大促活动分析等等。 监控。ClickHouse可以将系统和应用监控指标通过流式计算引擎Flink,Spark streaming清洗处理以后,实时写入ClickHouse。结合Grafna进行可视化展示。 用户画像。ClickHouse可以对各种用户特征进行数据加工,制作成包含全部用户的一张或多张用户特征表,提供灵活的用户画像分析,支撑广告,圈人等业务需求等等。
产品特性:
1、完备的DBMS功能 2、列式存储和数据压缩 3、向量化执行引擎 4. 关系模型与SQL查询 5. 多样化的表引擎 6. 多线程与分布式 7. 多主架构 8. 在线查询 9. 数据分片与分布式查询
优点
为了高效的使用CPU,数据不仅仅按列存储,同时还按向量进行处理; 数据压缩空间大,减少IO;处理单查询高吞吐量每台服务器每秒最多数十亿行; 索引非B树结构,不需要满足最左原则;只要过滤条件在索引列中包含即可;即使在使用的数据不在索引中,由于各种并行处理机制ClickHouse全表扫描的速度也很快; 写入速度非常快,50-200M/s,对于大量的数据更新非常适用。 很强的单表查询性能,适合基于大宽表的OLAP多维分析查询。 包含丰富的MergeTree Family,支持预聚合。 非常适合大规模日志明细数据写入分析。
缺点:
不支持事务,事务可以是一条SQL语句或一组SQL语言或者整个程序,只要中间有任何错误这个事务的所有操作都要撤销。 缺少完整的UPDATE DELETE操作, 对于工具自动生成的语句不支持,必须通过变通的方式来完成这两类操作,仅能用于批量删除或者修改数据。 部分技术支持待完善,支持有限的操作系统,驱动程序不够完善,市面主流工具对其支持不全。 不支持BIOB DOCUMENT 类型数据,聚合结果必须小于一台机器的内存大小。 不支持高并发,官方建议qps为100,可以通过修改config.xml的max_concurrent_queries配置。 不支持二级索引 有限的SQL支持,join实现与众不同 不支持窗口功能 元数据管理需要人工干预维护,运维起来比较麻烦。 多表join效率性能比较低
starrocks四种模型选择场景?
明细模型
需要保留原始数据的业务,例如原始日志、原始操作记录等。 查询方式灵活,不需要局限于预聚合的分析方式。 导入日志数据或者时序数据,主要特点是旧数据不会更新,只会追加新的数据。
聚合模型
聚合模型会在数据导入时将维度列相同的数据根据指标列设定的聚合函数进行聚合,最终表格中只会保留聚合后的数据。聚合模型可以减少查询时需要处理的数据量,进而提升查询的效率,其适合的场景如下: 1、不需要原始的明细数据,只关注汇总的结果; 2、业务涉及的查询为汇总类查询,比如sum、min、max、count等类型的查询; 3、查询维度固定。
更新模型
建表时,支持定义主键和指标列,查询时返回主键相同的一组数据中的最新数据。相对于明细模型,更新模型简化了数据导入流程,能够更好地支撑实时和频繁更新的场景。 更新模型可以视为聚合模型的特殊情况,指标列指定的聚合函数为 REPLACE,返回具有相同主键的一组数据中的最新数据。 数据分批次多次导入至更新模型,每一批次数据分配一个版本号,因此同一主键的数据可能有多个版本,查询时返回版本最新(即版本号最大)的数据。 适用场景: 实时和频繁更新的业务场景,例如分析电商订单。在电商场景中,订单的状态经常会发生变化,每天的订单更新量可突破上亿
主键模型
主键模型与更新模型的特点比较接近,主键模型的表要求有唯一的主键,支持对表中的行按主键进行更新和删除操作。但对比更新模型,主键模型通过牺牲微小的写入性能和内存占用,极大提升了查询性能。同时,主键模型可以更好地支持实时/频繁更新的功能 适用场景: 主键模型适用于实时和频繁更新的场景,例如: 实时对接事务型数据至 StarRocks。事务型数据库中,除了插入数据外,一般还会涉及较多更新和删除数据的操作,因此事务型数据库的数据同步至 StarRocks 时,建议使用主键模型。通过 Flink-CDC 等工具直接对接 TP 的 Binlog,实时同步增删改的数据至主键模型,可以简化数据同步流程,并且相对于 Merge-On-Read 策略的更新模型,查询性能能够提升 3~10 倍。 利用部分列更新轻松实现多流 JOIN。在用户画像等分析场景中,一般会采用大宽表方式来提升多维分析的性能,同时简化数据分析师的使用模型。而这种场景中的上游数据,往往可能来自于多个不同业务(比如来自购物消费业务、快递业务、银行业务等)或系统(比如计算用户不同标签属性的机器学习系统),主键模型的部分列更新功能就很好地满足这种需求,不同业务直接各自按需更新与业务相关的列即可,并且继续享受主键模型的实时同步增删改数据及高效的查询性能。 主键模型适用于主键占用空间相对可控的场景。这是由于 StarRocks 存储引擎会为主键模型的主键创建索引,并导入时将主键索引加载至内存中,所以相对于其他模型,主键模型对内存的要求比较高。目前主键模型中,主键编码后,占用内存空间上限为 127 字节
更新模型本质上是一种查询时合并(merge on read)的方案:在数据导入时,更新模型将数据排序后直接写入新的文件,不做重复键检查。读取时通过 Key 值比较进行多版本合并(Merge),仅保留最新版本的数据返回。Merge on read 的更新模式写入数据非常快,但是因为读时需要做多版本合并,读数据性能不佳且不稳定,并且无法使用例如 Bitmap 之类的索引进行进一步加速。更新模型会有读放大问题。
只有主键模型支持update语法。StarRocks 主键模型引入主键索引。在数据导入时,对于每一条数据,主键模型通过主键索引定位原来这条记录所在位置,找到记录后只需要给这条记录打个删除标记(Delete Bitmap),表示这条记录是被删除的,然后所有其它 update 记录可以当成新增数据插入到新的 Block 里面。这样的好处是读取时直接把所有的 Block 都可以并行的加载,然后只需要根据 Delete Bitmap 标记过滤已经删除的记录。 在数据导入时候完成数据更新(delete + insert),只保留每一条数据最新的状态。 对于主键模型,查询时不需要合并多版本,在有导入的场景下,查询性能可以基本不受影响。
主键模型还有很多其他的特性: 完整支持 upsert,delete,partial update 操作 实时更新时,查询性能不受影响 查询性能不低于明细模型 内存耗费少,支持主键索引落盘
Copy-on-Write:适合 T+1 的不会频繁更新的场景,不适合实时更新场景。StarRocks 的明细模型表。 Merge-On-Read:适用于实时和频繁更新的场景,这种方式写入的性能最好,实现也很简单,但是读取的性能很差,不适合对查询性能要求很高的场景。采用这种方案的包括 Hudi 的 Merge-on-Read 表,StarRocks 的更新模型表,ClickHouse 的实时更新模型等。 Delta Store:这种方式由于写入时要查询索引,写入性能稍差,但是读取的性能要好很多。适合查询性能要求比较高的场景。 采用这种方案的典型系统就是 Kudu,当然还有一些内存数据库 TP 系统也常用这种方案。 Delete-and-Insert(Merge-On-Write) :牺牲了一些写入性能换取读性能。StarRocks 的主键模型主要采用了 Delete-and-Insert 模式,并且进行了很多新的设计,以支持在大规模实时数据更新时提供极速的查询性能。
image
StarRocks数据组织形式?
StarRocks 中的表由行和列构成。在 StarRocks 中,一张表的列可以分为维度列(也称为 Key 列)和指标列(也称为 Value 列)。
通过某行数据的维度列所构成的前缀查找该行数据的过程包含以下五个步骤: 先查找前缀索引表,获得逻辑数据块的起始行号。 查找维度列的行号索引,定位到维度列的数据块。 读取数据块。 解压、解码数据块。 从数据块中找到维度列前缀对应的数据项。
starrocks分区分桶设计?
总结一下:分区是针对表的,是对表的数据取段。分桶是针对每个分区的,会将分区后的每段数据打散为逻辑分片Tablet。 副本数是针对Tablet的,是指Tablet保存的份数。那么我们不难发现,对某一个数据表,若每个分区的分桶数一致,其总Tablet数:总Tablet数=分区数分桶数副本数 分桶数=BE数量BE节点CPU核数 或者BE数量BE节点CPU核数/2
分区: 分区键选择:当前分区键仅支持日期类型和整数类型,为了让分区能够更有效的裁剪数据,我们一般也是选取时间或者区域作为分区键。使用动态分区可以定期自动创建分区,比如每天创建出新的分区等。选择分区单位时需要综合考虑数据量、查询特点、数据管理粒度等因素。 分区粒度选择:StarRocks的分区粒度视数据量而定,单个分区原始数据量建议维持在100G以内。
分桶: StarRocks采用Hash算法作为分桶算法,同一分区内, 分桶键的哈希值相同的数据形成(Tablet)子表, 子表多副本冗余存储, 子表副本在物理上由一个单独的本地存储引擎管理, 数据导入和查询最终都下沉到所涉及的子表副本上, 同时子表也是数据均衡和恢复的基本单位。在聚合模型、更新模型、主键模型下,分桶键必需是排序键中的列。 分桶列选择 选择高基数的列(例如唯一ID)来作为分桶键,可以保证数据在各个bucket中尽可能均衡。如果数据倾斜情况严重,用户可以使用多个列作为数据的分桶键,但是不建议使用过多列。 分桶的数量影响查询的并行度,最佳实践是计算一下数据存储量,将每个tablet设置成 1GB 左右。 在机器资源不足的情况下,用户如果想充分利用机器资源,可以通过 BE数量 * cpu core / 2的计算方式来设置bucket数量。例如,在将 100GB 未压缩的 CSV 文件导入 StarRocks 时,用户使用 4 台 BE,每台包含 64 核心,仅建立一个分区,通过以上计算方式可得出 bucket 数量为 4 * 64 /2 = 128。此时每个 tablet 的数据为 781MB,能够充分利用CPU资源,保证数据在各个bucket中尽可能均衡。 分桶数量的选择 分桶数的设置需要适中,如果分桶过少,查询时查询并行度上不来(CPU多核优势体现不出来)。而如果分桶过多,会导致元数据压力比较大,数据导入导出时也会受到一些影响。 分桶数的设置通常也建议以数据量为参考,从经验来看,每个分桶的原始数据建议不要超过5个G,考虑到压缩比,也即每个分桶的大小建议在100M-1G之间。建议用户根据集群规模的变化,建表时调整分桶的数量。集群规模变化,主要指节点数目的变化。 对照CSV文件,StarRocks的压缩比在 0.3 ~ 0.5 左右(以下计算取0.5,按照千进制计算)。假设10GB的CSV文件导入StarRocks,我们分为10个均匀的分区。一个分区承担的CSV文本数据量:10GB/10 = 1GB。单一副本按照0.5压缩比存入StarRocks文件大小:1GB * 0.5 = 500MB,通常存储三副本,一个分区的文件总大小为500MB*3 = 1500MB,按照建议,一个tablet规划300MB,则需设置5个分桶:1500MB/300MB = 5,如果是MySQL中的文件,一主两从的模式,我们只需要计算单副本的MySQL集群大小,按 照0.7的压缩比(经验值)换算成CSV文件大小,再按照上面的步骤计算出StarRcoks的分桶数量。 分桶的数据的压缩方式使用的是Lz4。建议压缩后磁盘上每个分桶数据文件大小在 1GB 左右。这种模式在多数情况下足以满足业务需求。
image
starrocks 内部实时更新方案
(1)场景: full row upsert / delete
对数据库里的一行(整行)进行upsert或者delete:在mysql下,插入一条数据,如果key(unique key)重复就覆盖数据行。在StarRocks下,有Unique key 表支持 upsert语义,另外 StartRocks 新加了一个 Primary key 的表模型,它支持upsert和delete语义。有了这两个语义之后就能完美支持 从OLTP系统到OLAP系统CDC数据的同步。
(2)Merge-on-Read
当获取CDC数据,排序后直接写入新的文件,不做重复键检查。读取时通过Key值比较进行Merge,合并多个版本的数据,仅保留最新版本的数据返回。 Merge-on-Read方式写入数据非常快,但是读数据比较慢,对查询的影响比较大。它参考的是存储引擎比较典型的、应用广泛的LSM树的数据结构。
Hudi的MOR表以及StarRocks 的Unique key 表都是使用这种方式是实现的。
(3)Copy-on-Write
当获取CDC数据后,新的数据和原来的记录进行full join,检查新的数据中每条记录跟原数据中的记录是否有冲突(检查Key 值相同的记录)。对有冲突的文件,重新写一份新的、包含了更新后数据的。
读取数据时直接读取最新数据文件即可,无需任何合并或者其他操作,查询性能最优,但是写入的代价很大,因此适合 T+1 的不会频繁更新的场景,不适合实时更新场景。
现在的数据湖方案里像 Delta Lake,Hudi 的 COW表,Iceberg 以及商用的 Snowflake都是使用这种方式达到数据更新目标。
(4)Delta Store
基本思想是牺牲写入性能,换取更高的读性能。当获取CDC数据后,通过主键索引,可以定位到这条记录原来所在的位置、文件(或者Block),然后在这个Block旁边放一个Delta Store,用来保存对于这个Block的增量修改。这个Delta Store 里保存的不是主键而是数据记录在Block里的行号(RowId)。
查询时读到原始Block,然后根据RowId与Delta 数据进行合并更新并返回最新数据给上层引擎。由于Delta数据是按行号组织的,与Merge-on-Read按照 Key进行合并比,查询性能好很多。不过这种方式会破坏二级索引,因为对Block做修改后,他的索引相当于失效了,想要在更新后再维护索引复杂度会很高。
这种方式写入性能差(要查询索引,定位原数据),但读取的性能好很多。另外因为引入了主键索引和Delta Store,复杂性也较高。
采用这种方案较典型的是Apache Kudu,许多TP系统甚至HTAP 数据库也会到 Delta Store 存储方式。
(5) Delete-and-Insert(Merge-On-Write)
思路也是牺牲部分写性能,极大地优化读的性能。原理也是引入主键索引,通过主键索引定位原来这条记录所在位置,找到记录后只需要给这条记录打个删除标记(Delete Bitmap),表示这条记录是被删除的,然后所有其它update记录可以当成新增数据插入到新的Block里面。 这样的好处是读取时直接把所有的Block都可以并行的加载,然后只需要根据Delete Bitmap标记过滤已经删除的记录。
优缺点数据导入时,会有和之前的数据比对的操作,会有一定的导入性能下降。 读时只需要根据标记位的索引跳过不需要读取的行,读取效率会比较高效。 StarRocks 新的支持实时更新的 Primary Key 表就是用到这个 Delete+Insert 的方式实现的;另外还有阿里云的 ADB 和 Hologres 也用到在这种方式。
image
starrocks导入性能对比分析
image
spark load>broker load>Routine load>stream load
Stream load内部调用链路Routine Load内部调用链路
Spark Load内部调用链路
Broker Load内部调用链路
从 Kafka 导入数据时,推荐使用 Routine Load 实现导入。如果导入过程中有复杂的多表关联和 ETL 预处理,建议先使用 Apache Flink® 从 Kafka 读取数据并对数据进行处理,然后再通过 StarRocks 提供的标准插件 flink-connector-starrocks 把处理后的数据导入到 StarRocks 中。
从 Hive 导入数据时,推荐创建 Hive catalog、然后使用 INSERT 实现导入,或者通过 Broker Load 实现导入。
从 MySQL 导入数据时,推荐创建 MySQL 外部表、然后使用 INSERT 实现导入,或者通过 DataX 实现导入。如果要导入实时数据,建议您参考 从 MySQL 实时同步 实现导入。
从 Oracle、PostgreSQL 等数据源导入数据时,推荐创建 JDBC 外部表、然后使用 INSERT 实现导入,或者通过 DataX 实现导入。
导入效率对比: spark load>broker load>stream load>Routine load Routinue Load 底层是通过 Stream Load 方式来导入,每一次导入看做是一个 Task, 这个 Task 由 FE 下发 BE 执行, Task 完成一批数据导入后通知 FE,FE 更新 Offset 后继续下发新 Task, 不断重复这个过程,从而完成数据写入。
各种相关导入优化
FE 配置 以下配置属于FE的系统配置,可以通过修改FE的配置文件fe.conf来修改: max_load_timeout_second和min_load_timeout_second 设置导入超时时间的最大、最小取值范围,均以秒为单位。默认的最大超时时间为3天,最小超时时间为1秒。用户自定义的导入超时时间不可超过这个范围。该参数通用于所有类型的导入任务。 desired_max_waiting_jobs 等待队列可以容纳的最多导入任务数目,默认值为100。如FE中处于PENDING状态(即等待执行)的导入任务数目达到该值,则新的导入请求会被拒绝。此配置仅对异步执行的导入有效,如处于等待状态的异步导入任务数达到限额,则后续创建导入的请求会被拒绝。 max_running_txn_num_per_db 每个数据库中正在运行的导入任务的最大个数(不区分导入类型、统一计数),默认值为100。当数据库中正在运行的导入任务超过最大值时,后续的导入不会被执行。如果是同步作业,则作业会被拒绝;如果是异步作业,则作业会在队列中等待。 label_keep_max_second 导入任务记录的保留时间。已经完成的( FINISHED or CANCELLED )导入任务记录会在StarRocks系统中保留一段时间,时间长短则由此参数决定。参数默认值时间为3天。该参数通用于所有类型的导入任务。
BE 配置 以下配置属于BE的系统配置,可以通过修改BE的配置文件be.conf来修改: push_write_mbytes_per_sec BE上单个Tablet的写入速度限制。默认是10,即10MB/s。根据Schema以及系统的不同,通常BE对单个Tablet的最大写入速度大约在10-30MB/s之间。可以适当调整这个参数来控制导入速度。 write_buffer_size 导入数据在 BE 上会先写入到一个内存块,当这个内存块达到阈值后才会写回磁盘。默认大小是 100MB。过小的阈值可能导致 BE 上存在大量的小文件。可以适当提高这个阈值减少文件数量。但过大的阈值可能导致RPC超时,见下面的配置说明。 tablet_writer_rpc_timeout_sec 导入过程中,发送一个 Batch(1024行)的RPC超时时间。默认为600秒。因为该RPC可能涉及多个分片内存块的写盘操作,所以可能会因为写盘导致RPC超时,可以适当调整这个超时时间来减少超时错误(如 send batch fail 错误)。同时,如果调大write_buffer_size配置,也需要适当调大这个参数。 streaming_load_rpc_max_alive_time_sec 在导入过程中,StarRocks会为每个Tablet开启一个Writer,用于接收数据并写入。这个参数指定了Writer的等待超时时间。默认为600秒。如果在参数指定时间内Writer没有收到任何数据,则Writer会被自动销毁。当系统处理速度较慢时,Writer可能长时间接收不到下一批数据,导致导入报错:TabletWriter add batch with unknown id。此时可适当增大这个配置。 load_process_max_memory_limit_bytes和load_process_max_memory_limit_percent 这两个参数分别是最大内存和最大内存百分比,限制了单个BE上可用于导入任务的内存上限。系统会在两个参数中取较小者,作为最终的BE导入任务内存使用上限。 load_process_max_memory_limit_percent:表示对BE总内存限制的百分比。默认为30。(总内存限制 mem_limit 默认为 80%,表示对物理内存的百分比)。即假设物理内存为 M,则默认导入内存限制为 M * 80% * 30%。 load_process_max_memory_limit_bytes:默认为100GB。
几个重要的参数
parallel_fragment_exec_instance_num 指定并行片段执行的实例数。默认为1.表示每个BE上fragment的实例数量,如果希望提升单个查询的性能,可以设置为BE的CPU核数的一半。 query_mem_limit 用于设置每个 BE 节点上查询的内存限制。以字节为单位。当查询使用的内存超过该限制时,查询将被终止。 load_mem_limit 各 BE 节点上单个导入任务的内存限制,单位是 Byte。如果设置为 0,StarRocks 采用 exec_mem_limit 作为内存限制。 exec_mem_limit 设置执行内存限制,用于限制查询执行过程中的内存使用。 mem_limit BE 进程内存上限。可设为比例上限(如 "80%")或物理上限(如 "100GB")。 cpu_core_limit 该资源组在当前 BE 节点可使用的 CPU 核数软上限,实际使用的 CPU 核数会根据节点资源空闲程度按比例弹性伸缩。取值为正整数。 big_query_mem_limit 大查询任务可以使用的内存上限。单位为 Byte。只有大于 0 时才生效,默认值为 0。 load_parallel_instance_num 单个 BE 上每个作业允许的最大并发实例数。
starrocks-spark-connector参数优化:
starrocks.write.enable.transaction-stream-load 是否使用Stream Load的事务接口导入数据 starrocks.write.buffer.size 数据攒批的内存大小,达到该阈值后数据批量发送给 StarRocks,支持带单位k, m, g。增大该值能提高导入性能,但会带来写入延迟。 starrocks.write.flush.interval.ms 数据攒批发送的间隔,用于控制数据写入StarRocks的延迟。 starrocks.write.num.partitions Spark用于并行写入的分区数,数据量小时可以通过减少分区数降低导入并发和频率,默认分区数由Spark决定。使用该功能可能会引入 Spark Shuffle cost。 starrocks.request.tablet.size 一个 Spark RDD 分区对应的 StarRocks Tablet 的个数。参数设置越小,生成的分区越多,Spark 侧的并行度也就越大,但与此同时会给 StarRocks 侧造成更大的压力。 starrocks.batch.size 单次从 BE 读取的最大行数。调大参数取值可减少 Spark 与 StarRocks 之间建立连接的次数,从而减轻网络延迟所带来的的额外时间开销。对于StarRocks 2.2及以后版本最小支持的batch size为4096,如果配置小于该值,则按4096处理 starrocks.exec.mem.limit 单个查询的内存限制。单位:字节。默认内存限制为 2 GB。 starrocks.deserialize.arrow.async 是否支持把 Arrow 格式异步转换为 Spark Connector 迭代所需的 RowBatch。 starrocks.deserialize.queue.size 异步转换 Arrow 格式时内部处理队列的大小,当 starrocks.deserialize.arrow.async 为 true 时生效。 starrocks.filter.query 指定过滤条件。多个过滤条件用 and 连接。StarRocks 根据指定的过滤条件完成对待读取数据的过滤。可以通过 starrocks.filter.query 参数指定过滤条件来做合理的分区、分桶、前缀索引裁剪,减少拉取数据的开销 starrocks.filter.query.in.max.count 谓词下推中,IN 表达式支持的取值数量上限。如果 IN 表达式中指定的取值数量超过该上限,则 IN 表达式中指定的条件过滤在 Spark 侧处理。
flink-connector-starrocks参数优化:
sink.semantic 数据 sink 至 StarRocks 的语义。 sink.buffer-flush.max-bytes 数据攒批的大小,达到该阈值后将数据通过 Stream Load 批量写入 StarRocks。取值范围:[64MB, 10GB]。V1 版本 exactly-once 下只有 Flink checkpoint 触发时 flush,该参数不生效。 sink.buffer-flush.max-rows 数据攒批的条数,达到该阈值后将数据通过 Stream Load 批量写入 StarRocks。取值范围:[64000, 5000000]。V1 版本 exactly-once 下只有 Flink checkpoint 触发时 flush,该参数不生效。 sink.buffer-flush.interval-ms 数据攒批发送的间隔,用于控制数据写入 StarRocks 的延迟,取值范围:[1000, 3600000]。V1 版本 exactly-once 下只有 Flink checkpoint 触发时 flush,该参数不生效。 sink.max-retries Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。 sink.properties.timeout Stream Load 超时时间,单位为秒。 exactly-once 下需要确保该值大于 Flink checkpoint 间隔。 sink.properties.max_filter_ratio 用于指定导入作业的最大容错率,即导入作业能够容忍的因数据质量不合格而过滤掉的数据行所占的最大比例。取值范围:0~1。默认值:0。更多说明,请参见 STREAM LOAD。 scan.params.mem-limit-byte BE 节点中单个查询的内存上限。单位:字节。默认值:1073741824(即 1 GB)。 scan.max-retries 数据读取失败时的最大重试次数。默认值:1。超过该数量上限,则数据读取任务报错。
starrocks 几种join的场景和限制?
StarRocks 实际执行中会产生 5 种最基本的分布式 Plan:
Shuffle Join:分别将 A、B 两表的数据按照连接关系都 Shuffle 到同一批机器上,再进行 Join 操作。
Broadcast Join:通过将 B 表的数据全量的广播到 A 表的机器上,在 A 表的机器上进行 Join 操作,相比较于 Shuffle Join,节省了 A 表的数据 Shuffle,但是 B 表的数据是全量广播,适合 B 表是个小表的场景。
Bucket Shuffle Join:在 Broadcast 的基础上进一步优化,将 B 表按照 A 表的分布方式 Shuffle 到 A 表的机器上进行 Join 操作,B 表 Shuffle 的数据量全局只有一份,比 Broadcast 少传输了很多倍数据量。当然,有约束条件限制,Join 的连接关系必须和 A 表的分布一致。
Colocate Join:通过建表时指定 A 表和 B 表是同一个 Colocate Group,意味着 A、B 表的分布完全一致,那么当 Join 的连接关系和 A、B 表分布一致时,StarRocks 可以直接在 A、B 表的机器上直接 Join,不需要进行数据 Shuffle。
Replicate Join:StarRocks 的实验性功能,当每一台 A 表的机器上都存在一份完整的 B 表数据时,直接在本地进行 Join 操作,该 Join 的约束条件比较严格,基本上意味着 B 表的副本数需要和整个集群的机器数保持一致,所以实践意义并不理想。
Bucket Shuffle Join
Bucket Shuffle Join优点: 首先,Bucket-Shuffle-Join降低了网络与内存开销,使一些Join查询具有了更好的性能。尤其是当FE能够执行左表的分区裁剪与桶裁剪时。其次,同时与Colocate Join不同,它对于表的数据分布方式并没有侵入性,这对于用户来说是透明的。对于表的数据分布没有强制性的要求,不容易导致数据倾斜的问题。最后,它可以为Join Reorder提供更多可能的优化空间。 Bucket Shuffle Join的规划 在绝大多数场景之中,用户只需要默认打开session变量的开关就可以透明的使用这种Join方式带来的性能提升,但是如果了解Bucket Shuffle Join的规划规则,可以帮助我们利用它写出更加高效的SQL。 Bucket Shuffle Join只生效于Join条件为等值的场景,原因与Colocate Join类似,它们都依赖hash来计算确定的数据分布。 在等值Join条件之中包含两张表的分桶列,当左表的分桶列为等值的Join条件时,它有很大概率会被规划为Bucket Shuffle Join。 由于不同的数据类型的hash值计算结果不同,所以Bucket Shuffle Join要求左表的分桶列的类型与右表等值join列的类型需要保持一致,否则无法进行对应的规划。 Bucket Shuffle Join只作用于Doris原生的OLAP表,对于ODBC,MySQL,ES等外表,当其作为左表时是无法规划生效的。 对于分区表,由于每一个分区的数据分布规则可能不同,所以Bucket Shuffle Join只能保证左表为单分区时生效。所以在SQL执行之中,需要尽量使用where条件使分区裁剪的策略能够生效。 假如左表为Colocate的表,那么它每个分区的数据分布规则是确定的,Bucket Shuffle Join能在Colocate表上表现更好。
Colocation Join
Colocation Join 功能,是将一组拥有相同 CGS ( Colocation Group Schema)的 Table 组成一个 CG。并保证这些 Table 对应的数据分片会落在同一个 BE 节点上。使得当 CG 内的表进行分桶列上的 Join 操作时,可以通过直接进行本地数据 Join,减少数据在节点间的传输耗时。 Colocate Join 十分适合几张表按照相同字段分桶,并高频根据相同字段 Join 的场景,比如电商的不少应用都按照商家 Id 分桶,并高频按照商家 Id 进行 Join。 Colocation Join限制条件 为了使得表能够有相同的数据分布,同一 CG 内的表必须满足下列约束: Colocate Table 必须是 OLAP 类型的表 同一 CG 内的表的分桶键的类型、数量和顺序完全一致,并且桶数一致,从而保证多张表的数据分片能够一一对应地进行分布控制。分桶键,即在建表语句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定一组列。分桶键决定了一张表的数据通过哪些列的值进行 Hash 划分到不同的 Bucket Seq 下。同 CG 的表的分桶键的名字可以不相同,分桶列的定义在建表语句中的出现次序可以不一致,但是在 DISTRIBUTED BY HASH(col1, col2, ...) 的对应数据类型的顺序要完全一致。 同一个 CG 内所有表的所有分区的副本数必须一致。如果不一致,可能出现某一个子表的某一个副本,在同一个 BE 上没有其他的表分片的副本对应。 同一个 CG 内所有表的分区键,分区数量,分区列的类型可以不同。 同一个 CG 中的所有表的副本放置必须满足下列约束: CG 中所有表的 Bucket Seq 和 BE 节点的映射关系和 Parent Table 一致。 Parent Table 中所有分区的 Bucket Seq 和 BE 节点的映射关系和第一个分区一致。 Parent Table 第一个分区的 Bucket Seq 和 BE 节点的映射关系利用原生的 Round Robin 算法决定。 CG 内表的一致的数据分布定义和子表副本映射,能够保证分桶键取值相同的数据行一定在相同 BE 节点上,因此当分桶键做 Join 列时,只需本地 Join 即可。
Join 调优建议
最后我们总结 Join 优化调优的四点建议: 第一点:在做 Join 的时候,要尽量选择同类型或者简单类型的列,同类型的话就减少它的数据 Cast,简单类型本身 Join 计算就很快。 第二点:尽量选择 Key 列进行 Join, 原因前面在 Runtime Filter 的时候也介绍了,Key 列在延迟物化上能起到一个比较好的效果。 第三点:大表之间的 Join ,尽量让它 Colocation ,因为大表之间的网络开销是很大的,如果需要去做 Shuffle 的话,代价是很高的。 第四点:合理的使用 Runtime Filter,它在 Join 过滤率高的场景下效果是非常显著的。但是它并不是万灵药,而是有一定副作用的,所以需要根据具体的 SQL 的粒度做开关。 最后:要涉及到多表 Join 的时候,需要去判断 Join 的合理性。尽量保证左表为大表,右表为小表,然后 Hash Join 会优于 Nest Loop Join。必要的时可以通过 SQL Rewrite,利用 Hint 去调整 Join 的顺序。
starrocks 性能优化
通过建表优化性能 选择数据模型 使用 Colocate Table 使用星型模型 使用分区和分桶 选择Zstd作为压缩格式 使用稀疏索引和 Bloomfilter 使用倒排索引 使用物化视图 优化导入性能 优化 Schema Change 性能 CBO优化器开启 join优化,选择合理的join方式 利用query cache缓存中间计算结果 Sorted streaming aggregate完成有序的排序聚合 存算分离,实现数据的持久化 根据业务特点和需求调整Compaction相关参数 参数调优(BE、FE内存配置、高并发配置、BE、FE内存,并行度、buffer缓冲大小、线程数...等)
starrocks存算分离
存算分离架构下,FE 的功能保持不变。BE 原有的存储功能被抽离,数据存储从本地存储 (local storage) 升级为共享存储 (shared storage)。BE 节点升级为无状态的 CN 节点,只缓存热数据。CN 会执行数据导入、查询计算、缓存数据管理等任务。
StarRocks 的存储计算分离架构提供以下优势:
廉价且可无缝扩展的存储。 弹性可扩展的计算能力。由于数据不再存储在 BE 节点中,因此集群无需进行跨节点数据迁移或 Shuffle 即可完成扩缩容。 热数据的本地磁盘缓存,用以提高查询性能。 可选异步导入数据至对象存储,提高导入效率。
StarRocks 存算分离集群架构如下:
存算分离核心技术:
1 存储 StarRocks 存算分离的核心在于将存储与计算解耦,计算节点无状态,实现快速扩缩容。而且,对象存储通常具备更好的数据可靠性和更低的成本。目前,StarRocks 存算分离技术支持两种后端存储方式:兼容 AWS S3 协议的对象存储系统(主流对象存储均支持)以及传统数据中心部署的 HDFS。该技术将表数据统一存储至对象存储指定的 bucket 或 HDFS 目录中,按根据类型的不同被存储在data/、meta/ 等子目录下。 StarRocks 存算分离数据文件格式与存算一体相同,数据按照 Segment 文件组织,StarRocks 各种索引技术在存算分离表中也同样复用。 2 缓存 StarRocks 的存算分离架构解耦了数据存储和计算,实现了它们各自的弹性,从而节约成本并提升弹性能力。但是这种架构也会影响系统性能,尤其是对于追求极速的 StarRocks 而言更为明显。 为了解决存算分离对性能的影响,我们使用磁盘(Local Disk) 来缓存热点数据。用户可以选择在建表时打开热数据缓存并同时指定热数据的生命周期,从数据写入开始到生命周期结束,热数据会一直缓存在本地磁盘中,以提升查询效率。 用户可在建表时指定如下参数来控制数据缓存行为:
image
● enable_storage_cache:控制是否开启热数据磁盘缓存 ● storage_cache_ttl:热数据缓存生命周期,超过该时间后,热数据会被自动从缓存中淘汰 ● enable_async_write_back:是否开启异步写入
如果开启计算节点磁盘缓存,数据被写入时,会同时写入本地磁盘以及后端存储系统,当两者都成功后才返回。同时,storage_cache_ttl 控制了数据在磁盘上的生存周期。 如果开启 enable_async_write_back,数据写入本地磁盘后便返回,由后台任务负责将其写入对象存储。这种模式适用于追求高吞吐、低延迟的业务场景,但存在一定的数据可靠性风险。 在查询时,热数据通常会直接从缓存中命中,而冷数据则需要从对象存储中读取并填充至本地缓存,以加速后续访问。Profile 也能够清晰地观察到数据读取的冷热分布:
然而,在实际的业务场景中,会存在某些查询涉及历史数据访问量比较大,如果这些极低频访问的冷数据被缓存,那么可能会将热数据挤出,造成性能抖动。针对该问题,我们会在接下来的版本中允许用户在建表时指定策略,避免在访问这些冷数据时将其放入缓存从而淘汰那些真正的热点数据。 同时,针对未在上述任何缓存中命中的冷数据,StarRocks 也进行了优化,可根据应用访问模式,利用数据预读技术从后端读取接下来可能会访问的数据。此举可有效减少对于后端对象存储的访问频次,提升查询性能。我们在实际测试效果中也验证了该优化的有效性,细节可参考文章后面的“性能评估”。 通过内存、本地磁盘、远端存储,StarRocks 存算分离构建了一个多层次的数据访问体系,用户可以指定数据冷热规则以更好地满足业务需求,让热数据靠近计算,真正实现高性能计算和低成本存储。同时,对冷数据访问也进行了预读等针对性优化,有效提升了查询性能下限。 3 数据多版本 StarRocks 存算分离中引入了数据多版本技术。用户的每次导入都会在 StarRocks 中生成一个唯一版本号,数据版本一旦产生便不会再更改, Compaction 也只会产生新的数据版本。 StarRocks 存算分离中,每个数据版本包含 Tablet Meta 和 Tablet Data 文件,并且都写入后端对象存储。 TabletMeta 文件内记录了该版本所有的数据文件索引。而 Tablet Data 文件仍然按照 Segment 文件格式组织。一旦 BE 节点需要访问某个Tablet 时,会先根据版本号从后端存储加载对应的 Tablet Meta 文件,然后再根据索引访问相应的数据文件。 StarRocks 存算分离中引入了数据多版本技术,为未来实现 Time Travel 等高级功能打下了基础。 4 Compaction Compaction 是 StarRocks 中最重要的后台任务,它负责将用户的历史版本合并为一个更大的版本以提高查询性能。在存算一体中,BE 节点根据自身负载情况来调度 Compaction 任务。 在存算分离版本中,Compaction 任务调度被上移至 FE 节点。得益于存算分离的数据共享能力,FE 可以选择将 Compaction 任务发往任意 BE 节点执行,甚至未来 FE 可以将 Compaction 任务调度至专用集群,实现对业务的零干扰。同时,根据实际需要,执行 Compaction 任务的资源池也可以动态地进行扩缩容。这样,Compaction 不再成为困扰用户的头疼问题,真正实现了性能和成本的和谐统一。
starrocks一条sql的执行流程
在 StarRocks 中,需要经过以下 几个步骤: 1. SQL Parse:将 SQL 文本转换成一个 AST(抽象语法树)。(目前使用的 Parser 是 ANTLR4) 2. SQL Analyze:基于 AST 进行语法和语义分析 3. SQL Logical Plan:将 AST 转换成逻辑计划 4. SQL Optimize:基于关系代数、统计信息、Cost 模型,对逻辑计划进行重写、转换,选择出 Cost “最低” 的物理执行计划,CBO Transform。 5. 生成 Plan Fragment:将 Optimizer 选择的物理执行计划转换为 BE 可以直接执行的 Plan Fragment 6. 通过查询调度器选择合适的数据副本,并将分布式物理执行计划调度到合适的计算节点进行计算; 7. 通过 MPP 分布式执行框架充分利用多机的资源,做到查询性能可以随着机器数量近似线性扩展; 8. 通过 Pipeline 并行执行框架充分利用多核资源,做到查询性能可以随着机器核数近似线性扩展; 9. 通过向量化执行引擎充分利用 CPU 单核资源,将单核执行性能做到极致。通过向量化执行引擎充分利用 CPU 单核资源,将单核执行性能做到极致。
optimizer 的主要流程包括:
memo.init:优化器搜索空间初始化; optimizer 的主要流程包括: memo.init:优化器搜索空间初始化; logicalRuleRewrite:基于逻辑计划的启发式规则的优化,对应图中的Tree Rewriter阶段; memoOptimize:基于Cost优化,核心规则是JoinReorder/AggregateSplit,对应图中的Optimizer阶段; extractBestPlan:提取最优的物理执行计划; physicalRuleRewrite:基于物理计划的启发式规则优化。
几个概念:
OptExpression : 是逻辑执行树的基本节点单位,每个 OptExpression 中包含 Operator,表示一个操作节点。 Memo: 用于记录优化器搜索过程中产生的各种备选的 Plan,基本单位为 Group。 Group: 个 Group 中的 GroupExpression 是逻辑等价的,这里的逻辑等价是指:该 Group 中所有的GroupExpression 节点的输出数据都是相同的。因此,我们也可以这样认为,Group 表示了一组特定的数据,而 Group 中的 GroupExpression,表示了这一组特定数据是通过什么方式生成,而生成数据的方式可以有很多种。 GroupExpression: GroupExpression 和 OptExpression 类似,包含了一个具体的 Operator,用来表示 GroupExpression 的具体行为,和 OptExpression 的区别在于: GroupExpression 需要存储和物理计划相关的 Property,而 OptExpression 不需要,OptExpression只用于表述一个计划树; GroupExpression 的输入(孩子节点)是一个 Group,这意味着 GroupExpression 的输入可以是 Group 中的任意一个Grou pExpression(因为执行结果都是等价的),而 OptExpression 的输入还是一个 OptExpression,是一个已经确定的Operator。 Memo Init: Memo 空间初始化,将原始的 OptExpression 逻辑计划树 Copy 进 Memo 空间,代码按图索骥即可。Memo 空间初始化,将原始的 OptExpression 逻辑计划树 Copy 进 Memo 空间,代码按图索骥即可。
CBO优化器相关
主要包括下面的优化: 多阶段聚合优化:普通聚合(count, sum, max, min 等)会拆分成两阶段,单个 Count Distinct 查询会拆分成三阶段或是四阶段。 Join 左右表调整:StarRocks 始终用右表构建 Hash 表,所以右表应该是小表,StarRocks 可以基于 cost 自动调整左右表顺序,也会自动把 Left Join 转 Right Join。 Join 多表 Reorder:多表 Join 如何选择出正确的 Join 顺序,是 CBO 优化器的核心。当 Join 表的数量小于等于 5 时,StarRocks 会基于 Join 交换律和结合律进行 Join Reorder,大于 5 时,StarRocks 会基于贪心算法和动态规划进行 Join Reorder。 Join 分布式执行选择:StarRocks 支持的分布式 Join 方式有 Broadcast、Shuffle、单边 Shuffle、Colocate、Replicated。StarRocks 会基于 Cost 估算和 Property Enforce 机制选择出 “最佳” 的 Join 分布式执行方式。 Push Down Aggregate to Join 物化视图选择与重写
cost计算公式
CostModel 中完成不同节点的 Cost 的计算,Cost 的计算公式比较简单: CalculateCost = CpuCost * cpuCostWeight + MemoryCost * memoryCostWeight + NetworkCost * networkCostWeight 其中 CpuCost、MemoryCost 和 NetworkCost 目前使用 Statistics computeSize 计算。需要说明的是, 我们可以把 Statistics computeSize 简单理解为该节点需要处理的数据量(行数 * 列大小),需要处理的数据量越大, 节点的CPU开销、内存占用和数据 shuffle 的代价越大。 Statistics ComputeSize = Output RowCount * sum(column_size) 由于 CPU 计算和内存占用,网络数据传输的实际开销差距很大,这就需要我们通过 cpuCostWeight、memoryCostWeight 和 networkCostWeight 来调整 Cost 结果, 尽可能地和实际代价开销相似。
starrocks routing load 怎么调优?怎么降低磁盘io?
1) 任务调度周期 max_batch_interval 通过缩短任务调度周期加速数据消费。但是,更小的任务调度周期可能会带来更多的CPU资源消耗。 需要注意的是,任务调度周期最小值为5s。 2)任务并行度 max_routine_load_task_concurrent_num desired_concurrent_number 在partition数量和BE数量较多时,可以通过设置较大的该参数来加速任务执行。但是,更大的并行度可能会带来更多的CPU资源消耗。 单个 routine load 任务会根据kafka topic partition数、BE数等被拆分为若干个子任务,分发至多个BE执行。这里的任务并行度,实际上是指单个routine load拆分成的子任务个数。 任务并行度取决于 Job 配置 desired_concurrent_num、可用 BE 数、 FE 配置、topic partition数和 max_routine_load_task_concurrent_num 的最小值,具体的计算方式如下: concurrent_num = Min(partition_num, desired_concurrent_num, alive_be_num, Config.max_routine_load_task_concurrent_num) 因此,在调整该参数时需要综合考虑BE数量和partition数量。 3)任务批量大小 routine_load_task_consume_second 通过增大单次读取持续时间加速数据消费。 max_routine_load_batch_size 通过增大单次读取的数据量加速数据消费。 I0325 20:27:50.410579 15259 data_consumer_group.cpp:131] consumer group done: 41448fb1a0ca59ad-30e34dabfa7e47a0. consume time(ms)=3261, received rows=179190, received bytes=9855450, eos: 1, left_time: -261, left_bytes: 514432550, blocking get time(us): 3065086, blocking put time(us): 24855 可以根据如上的日志来判定当前的批量参数设置是否过小。正常情况下,该日志的 left_bytes 字段应该 >= 0,表示一次读取的数据量还未超过 max_routine_load_batch_size上限 。否则,说明 max_routine_load_batch_size 过小。
CBO、HBO、RBO
RBO 基于规则的优化器 指的是不需要额外的信息,通过用户下发的SQL语句进行的优化,主要通过改下SQL,比如SQL子句的前后执行顺序等。比较常见的优化包括谓语下推、字段过滤下推、常量折叠、索引选择、Join优化等等。RBO对数据不敏感,在表大小固定的情况下,无论中间结果数据怎么变化,只要SQL保持不变,生成的执行计划就都是固定的。 CBO 基于代价的优化器,根据收集的统 计信息来计算每种执行方式的代价,进而选择最优的执行方式。引人了重新排序 Join(JoinReorder)和自动MapJoin(AutoMapJoin )优化规则等,同时基于 Volcano 模型的优 化器会尽最大的搜索宽度来获取最优计划。 CBO优化器通过给CPU、IO、NETWORK赋予代价,来指导生成更优的执行计划 HBO(History-Based Optimizer,基于历史的优化器) 在任务稳定的情况下,可以考虑基于任务的历史执行情况进行资源评估,即采用HBO。
常见RBO优化
各种表达式的重写和化简 Cast 消除 谓词化简 公共谓词提取 列裁剪 Shuffle 列裁剪 (确保任何多余的列不要参与网络传输) 谓词下推 等价谓词推导(常量传播) Outer Join 转 Inner Join Limit Merge Limit 下推 聚合 Merge Intersect Reorder 常量折叠 公共表达式复用 子查询改写 Lateral Join 化简 Empty Node 优化 Empty Union, Intersect, Except 裁剪 In 转 Semi Join 或者 Inner Join 聚合算子复用 # 比如对于下面的 SQL: # SELECT AVG(x), SUM(x) FROM table #我们可以让 AVG(x) 复用 SUM(x) 的计算结果,减少计算量 Primary Key 相关优化 冗余 Group By 消除 Sum 常量转 Count Data Skipping
常见CBO优化点
多阶段聚合优化 Join 左右表 Reorder Join 多表 Reorder Join 分布式执行选择 Shuffle Join Broadcast Join Bucket Shuffle Join Colocate Join Replication Join Join 和 Aggregate Runtime Colocate, 避免 Shuffle CTE 复用 CTE 列裁剪 Agg 上拉 Agg 下推 Join Agg 下推 GroupingSets 窗口下推 Group By 算子融合 物化视图选择与改写 利用基数信息进行优化
HBO常见优化点
表元数据本地缓存(表来源、表定义、分区个数、分区信息、文件列表、统计信息等) hdfs文件缓存 sql执行记录结果集落表(sql提交失败、运行失败、运行成功) 保存历史sql执行时,预估内存值和实际消耗内存值 DataCache 多表物化视图
starrocks索引优化
前缀索引
StarRocks的底层数据是按照排序键排序后存储的。而前缀索引(shortkey index),就是在排序键的基础上实现的一种根据给定前缀列,有效加速查询的索引方式。 首先明确一点,前缀索引不需要我们单独手动创建或指定,在建表时其实已经默认完成了指定,它是StarRocks自带的一种加速方式。前缀索引的默认要求主要有: 1、前缀索引包含的列只能是排序键中的列; 2、前缀索引包含的列数不超过3; 3、前缀索引的字节数不超过36字节; 4、前缀索引不能包含FLOAT/DOUBLE类型的列; 5、前缀索引中VARCHAR类型列只能出现一次,并且是末尾位置(即使没有达到36个字节,如果遇到VARCHAR,也会直接截断,不再往后继续); 6、若在建表语句中指定PROPERTIES {"short_key" = "integer"}时, 可突破上面的第2、3条限制。
压缩索引
Starrocks 支持对索引数据的压缩,可以减少磁盘空间的使用,提高查询速度。请注意,压缩和解压缩数据需要额外的 CPU 资源。 StarRocks 支持四种数据压缩算法:LZ4、Zstandard(或 zstd)、zlib 和 Snappy
Bitmap 索引
Bitmap 索引所占的存储空间通常只有索引数据的一小部分,与其他索引技术相比,更节省存储空间。 支持为多个列创建 Bitmap 索引,提高多列查询的效率
Bloom filter 索引
Bloom filter 索引可以快速判断表的数据文件中是否可能包含要查询的数据,如果不包含就跳过,从而减少扫描的数据量。Bloom filter 索引空间效率高,适用于基数较高的列,如 ID 列。
Bitmap 索引与Bloom filter 索引适用场景与限制
bitmap使用场景
适应场景: 1 非前缀过滤 StarRocks对于建表中的前置列可以通过shortkey索引快速过滤,但是对于非前置列, 无法利用shortkey索引快速过滤,如果需要对非前置列进行快速过滤,就可以对这些列建立Bitmap索引。 2 多列过滤Filter 由于Bitmap可以快速的进行bitwise运算。所以在多列过滤的场景中,也可以考虑对每列分别建立Bitmap索引。 3:人群圈选,精确去重...等 优点 对于基数较低,值大量重复,例如 ENUM 类型的列,使用 Bitmap 索引能够减少查询的响应时间。如列基数较高,推荐使用 Bloom filter 索引。 Bitmap 索引所占的存储空间通常只有索引数据的一小部分,与其他索引技术相比,更节省存储空间。 支持为多个列创建 Bitmap 索引,提高多列查询的效率,具体参见多列查询。 限制: Bitmap 索引适用于可使用等值条件 (=) 查询或 [NOT] IN 范围查询的列。 主键模型和明细模型中所有列都可以创建 Bitmap 索引;聚合模型和更新模型中,只有维度列(即 Key 列)支持创建 bitmap 索引。 Bitmap索引适用于一些特定类型的查询, 应该在取值为枚举型, 取值大量重复, 较低基数, 并且用作等值条件查询或者可转化为等值条件查询的列上创建. bitmap索引不适用更新频繁的列。不适合列基数较高的场景。 不支持对Float、Double、Decimal 类型的列建Bitmap 索引。 如果要查看某个查询是否命中了Bitmap索引,可以通过查询的Profile信息中的 BitmapIndexFilterRows 字段查看
Bloom filter 适用场景
适应场景: 1)非前缀列过滤; 2)高基数列; 3)查询需对某列高频过滤,且查询条件是in和=(Bloom Filter索引只对in和=过滤查询有加速效果); 4)非Tinyint、Float、Double、DECIMAL类型的列(这些类型暂不支持)。 5)对于明细模型,所有列都可以创建Bloom Filter索引。聚合模型和更新模型,只有Key列可以建Bloom Filter索引。主键模型允许为主键列创建Bloom Filter索引,可在建表时创建,也可以建表后添加。不支持非主键列创建Bloom Filter索引。 优点: Bloom filter 索引可以快速判断表的数据文件中是否可能包含要查询的数据,如果不包含就跳过,从而减少扫描的数据量。 Bloom filter 底层实现是通过bitmap+多个hash函数来实现的,空间效率和时间效率都比较高,但是有一定的误判率。 限制: 主键模型和明细模型中所有列都可以创建 Bloom filter 索引;聚合模型和更新模型中,只有维度列(即 Key 列)支持创建 Bloom filter 索引。 不支持为 TINYINT、FLOAT、DOUBLE 和 DECIMAL 类型的列创建 Bloom filter 索引。 Bloom filter 索引只能提高包含 in 和 = 过滤条件的查询效率,例如 Select xxx from table where xxx in () 和 Select xxx from table where column = xxx。 如要了解一个查询是否命中了 Bloom filter 索引,可查看该查询的 Profile 中的 BloomFilterFilterRows 字段。
MPP架构和hadooop架构对比
MPP架构
是将许多单机数据库通过网络连接起来,相当于将一个个垂直系统横向连接,形成一个统一对外服务的分布式数据库系统,每个节点由一个单机数据库系统独立管理和操作该节点所在物理机上的所有资源(CPU、内存、磁盘、网络),节点内系统的各组件间的相互调用不需要通过控制节点,即对控制节点来说,每个节点的内部运行过程相对透明。
Hadoop架构
是将不同的资源管理与功能进行分层抽象设计,每层形成一类组件,实现一定程度的解耦,包括存储资源管理、计算资源管理、通用并行计算框架、各类分析功能等,在每层内进行跨节点的资源统一管理或功能并行执行,层与层之间通过接口调用,相互透明,节点内不同层的组件间的相互调用需要由控制节点掌握或通过控制节点协调,即控制节点了解每个节点内不同层组件间的互动过程。MPP不足:在中小规模的数据量下,处理结构化数据功能完整,易用,性能出色。但数据量一旦超过它能承受的上限,木桶效应,扩展性问题就会变为难以忽略的维护成本。
Hadoop 和 MPP 两种技术应根据具体业务以及场景进行选择。 (1)对于半结构化和非结构化数据,Hadoop 在处理上比 MPP 有一定优势,适合于海量数据批处理类应用,如海 量数据 ETL、非结构化数据分析与挖掘(关键词提取、情感 分析等)。若系统对非结构化数据存储需求较大且数据量巨大,需要动态扩展数据节点等,则使用 Hadoop 架构更为合适。 (2)MPP 架构更适合对现有关系型数据库和数据仓库 系统进行升级或替换,其在数据查询类业务上比 Hadoop 更具优势,适合处理 SQL 类事务请求、多维度数据分析、展 示数据报表等。若大部分存储数据是结构化数据,数据量不是很大,未来不会爆炸式增长,或业务人员习惯使用 SQL 场景,则可优先考虑使用 MPP 数据库。 (3)MPP架构更适合追求高性能低延迟,OLAP分析等场景。而Hadoop架构更适合海量数据,任务要求更稳定性,任务延时不太苛刻的场景。 (4)MPP+Hadoop 混合架构是未来海量数据处理发展趋势。用 MPP 处理 PB 级结构化数据存储与查询,提供 完整的 SQL 与事务支持功能。用 Hadoop 处理半结构化、 非结构化数据,提供灵活的自定义模型与算法开发能力, 同时满足多种数据类型处理需求,并在实时查询与离线分析上都能提供较高性能。但MPP+Hadoop混合架构开发成本及维护成本可能较高。 在数据爆炸时代,传统的数据库架构处理系统已经不 能满足行业需要。本文从理论及应用角度将两种主流的 海量数据处理架构 MPP 和 Hadoop 进行对比,分析各自的 技术特点,论述它们与传统数据处理的优势。通过分析两 大框架底层核心技术,对其优缺点进行了归纳。Hadoop 对 海量半结构化、非结构化数据存储与处理有一定优势,但 在处理速度和易用性上不及 MPP。Hadoop 灵活性较强,企业可根据自身业务特点进行定制开发。MPP优势在海量结构化数据处理、响应性能和衍生工具等方面,适用于查询业务场景较多的项目。随着 Hadoop 生态圈的不断发展,如Hadoop的SQL性能提升、BI工具的不断丰富,MPP 技术发展会向 Hadoop 靠拢。基于MPP与Hadoop框架并结合Spark内存计算、流计算等技术的混合架构平台,会成为大型数据处理项目的理想选择。
flink-starrocks-connector exactly-once怎么保证?
自 2.4 版本 StarRocks 开始支持Stream Load 事务接口。自 Flink connector 1.2.4 版本起, Sink 基于事务接口重新设计实现了 exactly-once,相较于原来基于非事务接口的实现,降低了内存使用和 checkpoint 耗时,提高了作业的实时性和稳定性。 自 Flink connector 1.2.4 版本起,sink 默认使用事务接口实现。如果需要使用非事务接口实现,则需要配置 sink.version 为V1。 注意 如果只升级 StarRocks 或 Flink connector,sink 会自动选择非事务接口实现。 基于Stream Load非事务接口实现的exactly-once,依赖flink的checkpoint-interval在每次checkpoint时保存批数据以及其label,在checkpoint完成后的第一次invoke中阻塞flush所有缓存在state当中的数据,以此达到精准一次。但如果StarRocks挂掉了,会导致用户的flink sink stream 算子长时间阻塞,并引起flink的监控报警或强制kill。 默认使用csv格式进行导入,用户可以通过指定'sink.properties.row_delimiter' = '\\x02'(此参数自 StarRocks-1.15.0 开始支持)与'sink.properties.column_separator' = '\\x01'来自定义行分隔符与列分隔符。 如果遇到导入停止的 情况,请尝试增加flink任务的内存。 如果代码运行正常且能接收到数据,但是写入不成功时请确认当前机器能访问BE的http_port端口,这里指能ping通集群show backends显示的ip:port。举个例子:如果一台机器有外网和内网ip,且FE/BE的http_port均可通过外网ip:port访问,集群里绑定的ip为内网ip,任务里loadurl写的FE外网ip:http_port,FE会将写入任务转发给BE内网ip:port,这时如果Client机器ping不通BE的内网ip就会写入失败。
DMP平台优缺点?
DMP的优点:
1. 数据整合和清洗功能: DMP可以对不同来源且格式不同的数据进行整合和清洗,使企业得以使用这些数据生成更高价值的洞察力和效益。 2. 精准营销: DMP利用数据分析,可以为企业提供个性化的营销策略,大大提高了广告投放的精准性。 3. 提高决策效率: DMP可以为企业提供更全面、更准确的数据分析,在企业决策过程中起到至关重要的作用。
DMP的缺点:
1. 高成本: DMP需要存储大量数据,并且需要进行数据整合和分析,因此相对来说比较昂贵。 2. 数据安全问题: 由于DMP中存储铭感信息的数量庞大,泄露数据的风险也因此增加,这成为DMP的一大隐患。 3. 技术门槛高: DMP的复杂性和技术门槛较高,需要专业的技术人员进行操作。
总体而言,DMP是一个可以帮助企业更好地利用数据的工具,但需要考虑成本、安全和技术等因素。企业应该根据自身情况权衡利弊,选择适合自己的DMP。
物化视图的应用场景和优缺点?
使用异步物化视图场景:
加速重复聚合查询 假设您的数仓环境中存在大量包含相同聚合函数子查询的查询,占用了大量计算资源,您可以根据该子查询建立异步物化视图,计算并保存该子查询的所有结果。建立成功后,系统将自动改写查询语句,直接查询异步物化视图中的中间结果,从而降低负载,加速查询。 周期性多表关联查询 假设您需要定期将数据仓库中多张表关联,生成一张新的宽表,您可以为这些表建立异步物化视图,并设定定期刷新规则,从而避免手动调度关联任务。异步物化视图建立成功后,查询将直接基于异步物化视图返回结果,从而避免关联操作带来的延迟。 数仓分层 假设您的基表中包含大量原始数据,查询需要进行复杂的 ETL 操作,您可以通过对数据建立多层异步物化视图实现数仓分层。如此可以将复杂查询分解为多层简单查询,既可以减少重复计算,又能够帮助维护人员快速定位问题。除此之外,数仓分层还可以将原始数据与统计数据解耦,从而保护敏感性原始数据。 湖仓加速 查询数据湖可能由于网络延迟和对象存储的吞吐限制而变慢。您可以通过在数据湖之上构建异步物化视图来提升查询性能。此外,StarRocks 可以智能改写查询以使用现有的物化视图,省去了手动修改查询的麻烦。
StarRocks 的异步物化视图具备以下原子能力,可助力数据建模:
自动刷新:在数据导入至基表后,物化视图可以自动刷新。您无需在外部维护调度任务。 分区刷新:通过有时序属性的报表,可以通过分区刷新实现近实时计算。 与视图协同使用:通过协同使用物化视图和逻辑视图,您可以实现多层建模,从而实现中间层的重复使用和数据模型的简化。 Schema Change:您可以通过简单的 SQL 语句更改计算结果,无需修改复杂的数据流水线。 借助以上功能,您可以设计全面且灵活的数据模型,以满足各种业务需求和场景。
基于异步物化视图的查询改写功能,在以下场景下特别有用:
指标预聚合 如果您需要处理高维度数据,可以使用物化视图来创建预聚合指标层。 宽表 Join 物化视图允许您在复杂场景下下透明加速包含大宽表 Join 的查询。 湖仓加速 构建基于 External Catalog 的物化视图可以轻松加速针对数据湖中数据的查询。
物化视图查询改写能力,StarRocks 目前存在以下限制:
StarRocks 不支持非确定性函数的改写,包括 rand、random、uuid 以及 sleep。 StarRocks 不支持窗口函数的改写。 如果物化视图定义语句中包含 LIMIT、ORDER BY、UNION、EXCEPT、INTERSECT、MINUS、GROUPING SETS、WITH CUBE 或 WITH ROLLUP,则无法用于改写。 基于 External Catalog 的物化视图不保证查询结果强一致。 基于 JDBC Catalog 表构建的异步物化视图暂不支持查询改写。
异步物化视图刷新机制
目前,StarRocks 支持两种 ON DEMAND 刷新策略,即异步刷新(ASYNC)和手动刷新(MANUAL)。 在此基础上,异步物化视图支持多种刷新机制控制刷新开销并保证刷新成功率: 支持设置刷新最大分区数。当一张异步物化视图拥有较多分区时,单次刷新将耗费较多资源。您可以通过设置该刷新机制来指定单次刷新的最大分区数量,从而将刷新任务进行拆分,保证数据量多的物化视图能够分批、稳定的完成刷新。 支持为异步物化视图的分区指定 Time to Live(TTL),从而减少异步物化视图占用的存储空间。 支持指定刷新范围,只刷新最新的几个分区,减少刷新开销。 支持设置数据变更不会触发对应物化视图自动刷新的基表。 支持为刷新任务设置资源组。
物化视图缺点:
存储预计算结果,有额外存储成本。 基表数据更新时,有更新开销。 物化视图查询改写能力有很多限制条件。
Starrocks实时精准去重
方案一:使用了HypoLogLog技术模糊去重后再count distinct,数据新鲜度比较好,但结果是不精确的。 方案二:(推荐方式)
第一层在明细数据上按照城市、时间做增量聚合,可以用bitmap技术和物化视图增量更新技术,先聚合成城市粒度、分钟级的数据。 第二层用物化视图做面向ADS的分钟级刷新视图,因为有几十个看板,所以视图非常多,分钟级刷新是能够比较好地权衡数据新鲜度和资源使用。
image
starrocks 创建外表和Catalog的区别?
StarRocks3.0之前需要手动创建外表DDR来查询外部数据源,在表很多的时候操作非常繁琐。3.0的Catalog功能可以直接查询Hive、Iceberg、Hudi、Deltalake、ES、Mysql、Oracle、Postgres和文件等各种数据源,覆盖了大部分的数据使用场景。 目前 StarRocks 外表使用上有以下限制: 仅可以在外表上执行 insert into 和 show create table 操作,不支持其他数据写入方式,也不支持查询和 DDL。 创建外表语法和创建普通表一致,但其中的列名等信息请保持同其对应的目标表一致。 外表会周期性从目标表同步元信息(同步周期为 10 秒),在目标表执行的 DDL 操作可能会延迟一定时间反应在外表上。 JDBC外部表使用限制 创建 JDBC 外部表时,不支持索引,也不支持通过 PARTITION BY、DISTRIBUTED BY 来指定数据分布规则。 查询 JDBC 外部表时,不支持下推函数。 对于 StarRocks 数据源,现阶段只支持 Insert 写入,不支持读取,对于其他数据源,现阶段只支持读取,还不支持写入。
而使用Catalog,只需要执行create external Catalog命令,就可以连到Hive Metastore自动获取元数据,然后就可以直接查询其中的数据。除此之外另一种场景是在S3上放了一堆文件,但没有将其组织成Iceberg的format,也可以创建Catalog直接去查询。 在 External Catalog 的基础上,结合 StarRocks 的内表存储,两种数据源可以 Join 起来同时查询。由于内表有自己的存储引擎,具有较好的实时性,可以服务实时workload;同时External Table可以用于存储历史数据,这样就可以联合使用多种不同的存储引擎,来覆盖更多的使用场景。
Starrocks点查原理?
倒排索引
StarRocks 的 Bitmap Index 主要包括两部分内容:字典和 Bitmap 索引本身。 字典保存了原始值到编码 Id的映射,Bitmap 索引记录了每个编码 ID 到 Bitmap 的映射。 StarRocks 的字典部分和 Bitmap index 部分都是以 Page 的格式存储,为了减少内存占用和加速索引,StarRocks 对字典和 Bitmap 的 Page 都建立了索引。 当然,如果建 Bitmap 索引列的基数很低,Dict Data Page 和 Bitmap Data Page 只有一个的话,我们就不需要 Dict Index Page 和 Bitmap Index Page。 当 StarRocks 利用 Bitmap Index 进行过滤的时候,只需要先加载 Dict Index Page 和部分 Dict Data Page 即可,按照字典值进行快速过滤,无需解码数据,也无需把所有 Bitmap Data Page 一次性加载进来。 综上,由于 StarRocks 支持Bitmap 索引的索引和支持按照字典值进行快速过滤,即使 Bitmap Index 列的基数很高,Bitmap Index 整体磁盘存储很大,内存占用也很小。
StarRocks 前缀索引加速点查
如果某个表的过滤条件基数很高,在StarRocks 中,就不需要使用 Bitmap 索引,将该列设置为 Sort Key,使用前缀索引过滤即可。对于高基数列,前缀索引的点查性能更好,同时也能节省大量的存储空间。
StarRocks RollUp || Materialized View 加速点查
当 Bitmap 索引的过滤度不大,需要 Seek 很多 Data Page 时,可以考虑进一步用空间换时间,利用 StarRocks 的 RollUp 和 Materialized View 将不同的过滤列都设置成 Sort Key,利用前缀索引加速点查。
StarRocks Generated Column 加速点查
当过滤的列是 Json 或者 Map 类型的 Key 列时,在 StarRocks 中,我们可以为这个 Key 列新增一个Generated Column,然后对 Key 列建立 Bitmap 索引,通过 Bitmap 索引加速点查。 当然,之后 StarRocks 支持了对 Json 或者 Map 类型的 Key 列直接建 Bitmap 索引时,就不需要依靠 Generated Column 了。
StarRocks支持两级分区分桶和分区裁剪加速查询
StarRocks 支持两级分区分桶,可以首先按照具有时间属性的列分区,再按照高基数列分桶,这样在保证很好的数据本地性的同时也避免了数据倾斜。 越好的数据本地性,在列式存储中可以带来越好的数据压缩比,查询的IO 代价会越低。 没有数据倾斜,StarRocks 就可以充分发挥多机和多核的能力。 如果查询的过滤条件包含分区列/分桶列, StarRocks 就可以触发分区裁剪,Scan 的数据量就会极大减少。
StarRocks数据副本?
名词解释
- Tablet:Doris 表的逻辑分片,一个表有多个分片。
- Replica:分片的副本,默认一个分片有3个副本。
- Healthy Replica:健康副本,副本所在 Backend 存活,且副本的版本完整。
- TabletChecker(TC):是一个常驻的后台线程,用于定期扫描所有的 Tablet,检查这些 Tablet 的状态,并根据检查结果,决定是否将 tablet 发送给 TabletScheduler。
- TabletScheduler(TS):是一个常驻的后台线程,用于处理由 TabletChecker 发来的需要修复的 Tablet。同时也会进行集群副本均衡的工作。
- TabletSchedCtx(TSC):是一个 tablet 的封装。当 TC 选择一个 tablet 后,会将其封装为一个 TSC,发送给 TS。
- Storage Medium:存储介质。Doris 支持对分区粒度指定不同的存储介质,包括 SSD 和 HDD。副本调度策略也是针对不同的存储介质分别调度的。副本状态
一个 Tablet 的多个副本,可能因为某些情况导致状态不一致。Doris 会尝试自动修复这些状态不一致的副本,让集群尽快从错误状态中恢复。 一个 Replica 的健康状态有以下几种: 1. BAD即副本损坏。包括但不限于磁盘故障、BUG等引起的副本不可恢复的损毁状态。 2. VERSION_MISSING版本缺失。Doris 中每一批次导入都对应一个数据版本。而一个副本的数据由多个连续的版本组成。而由于导入错误、延迟等原因,可能导致某些副本的数据版本不完整。 3. HEALTHY健康副本。即数据正常的副本,并且副本所在的 BE 节点状态正常(心跳正常且不处于下线过程中) 一个 Tablet 的健康状态由其所有副本的状态决定,有以下几种: 1. REPLICA_MISSING副本缺失。即存活副本数小于期望副本数。 2. VERSION_INCOMPLETE存活副本数大于等于期望副本数,但其中健康副本数小于期望副本数。 3. REPLICA_RELOCATING拥有等于 replication num 的版本完整的存活副本数,但是部分副本所在的 BE 节点处于 unavailable 状态(比如 decommission 中) 4. REPLICA_MISSING_IN_CLUSTER当使用多 cluster 方式时,健康副本数大于等于期望副本数,但在对应 cluster 内的副本数小于期望副本数。 5. REDUNDANT副本冗余。健康副本都在对应 cluster 内,但数量大于期望副本数。或者有多余的 unavailable 副本。 6. FORCE_REDUNDANT这是一个特殊状态。只会出现在当已存在副本数大于等于可用节点数,可用节点数大于等于期望副本数,并且存活的副本数小于期望副本数。这种情况下,需要先删除一个副本,以保证有可用节点用于创建新副本。 7. COLOCATE_MISMATCH针对 Colocation 属性的表的分片状态。表示分片副本与 Colocation Group 的指定的分布不一致。 8. COLOCATE_REDUNDANT针对 Colocation 属性的表的分片状态。表示 Colocation 表的分片副本冗余。 9. HEALTHY健康分片,即条件[1-8]都不满足。
副本修复
TabletChecker 作为常驻的后台进程,会定期检查所有分片的状态。对于非健康状态的分片,将会交给 TabletScheduler 进行调度和修复。修复的实际操作,都由 BE 上的 clone 任务完成。FE 只负责生成这些 clone 任务。 注1:副本修复的主要思想是先通过创建或补齐使得分片的副本数达到期望值,然后再删除多余的副本。 注2:一个 clone 任务就是完成从一个指定远端 BE 拷贝指定数据到指定目的端 BE 的过程。 针对不同的状态,我们采用不同的修复方式: 1. REPLICA_MISSING/REPLICA_RELOCATING选择一个低负载的,可用的 BE 节点作为目的端。选择一个健康副本作为源端。clone 任务会从源端拷贝一个完整的副本到目的端。对于副本补齐,我们会直接选择一个可用的 BE 节点,而不考虑存储介质。 2. VERSION_INCOMPLETE选择一个相对完整的副本作为目的端。选择一个健康副本作为源端。clone 任务会从源端尝试拷贝缺失的版本到目的端的副本。 3. REPLICA_MISSING_IN_CLUSTER这种状态处理方式和 REPLICA_MISSING 相同。 4. REDUNDANT通常经过副本修复后,分片会有冗余的副本。我们选择一个冗余副本将其删除。冗余副本的选择遵从以下优先级: a. 副本所在 BE 已经下线 b. 副本已损坏 c. 副本所在 BE 失联或在下线中 d. 副本处于 CLONE 状态(该状态是 clone 任务执行过程中的一个中间状态) e. 副本有版本缺失 f. 副本所在 cluster 不正确 g. 副本所在 BE 节点负载高 5. FORCE_REDUNDANT不同于 REDUNDANT,因为此时虽然存活的副本数小于期望副本数,但是因为已经没有额外的可用节点用于创建新的副本了。所以此时必须先删除一个副本,以腾出一个可用节点用于创建新的副本。 删除副本的顺序同 REDUNDANT。 6. COLOCATE_MISMATCH从 Colocation Group 中指定的副本分布 BE 节点中选择一个作为目的节点进行副本补齐。 7. COLOCATE_REDUNDANT删除一个非 Colocation Group 中指定的副本分布 BE 节点上的副本。 Doris 在选择副本节点时,不会将同一个 Tablet 的副本部署在同一个 host 的不同 BE 上。保证了即使同一个 host 上的所有 BE 都挂掉,也不会造成全部副本丢失。 调度优先级 TabletScheduler 里等待被调度的分片会根据状态不同,赋予不同的优先级。优先级高的分片将会被优先调度。目前有以下几种优先级。 1. VERY_HIGH ○ REDUNDANT。对于有副本冗余的分片,我们优先处理。虽然逻辑上来讲,副本冗余的紧急程度最低,但是因为这种情况处理起来最快且可以快速释放资源(比如磁盘空间等),所以我们优先处理。 ○ FORCE_REDUNDANT。同上。 2. HIGH ○ REPLICA_MISSING 且多数副本缺失(比如3副本丢失了2个) ○ VERSION_INCOMPLETE 且多数副本的版本缺失 ○ COLOCATE_MISMATCH 我们希望 Colocation 表相关的分片能够尽快修复完成。 ○ COLOCATE_REDUNDANT 3. NORMAL ○ REPLICA_MISSING 但多数存活(比如3副本丢失了1个) ○ VERSION_INCOMPLETE 但多数副本的版本完整 ○ REPLICA_RELOCATING 且多数副本需要 relocate(比如3副本有2个) 4. LOW ○ REPLICA_MISSING_IN_CLUSTER ○ REPLICA_RELOCATING 但多数副本 stable 手动优先级 系统会自动判断调度优先级。但是有些时候,用户希望某些表或分区的分片能够更快的被修复。因此我们提供一个命令,用户可以指定某个表或分区的分片被优先修复: ADMIN REPAIR TABLE tbl [PARTITION (p1, p2, ...)]; 这个命令,告诉 TC,在扫描 Tablet 时,对需要优先修复的表或分区中的有问题的 Tablet,给予 VERY_HIGH 的优先级。 注:这个命令只是一个 hint,并不能保证一定能修复成功,并且优先级也会随 TS 的调度而发生变化。并且当 Master FE 切换或重启后,这些信息都会丢失。 可以通过以下命令取消优先级: ADMIN CANCEL REPAIR TABLE tbl [PARTITION (p1, p2, ...)]; 优先级调度 优先级保证了损坏严重的分片能够优先被修复,提高系统可用性。但是如果高优先级的修复任务一直失败,则会导致低优先级的任务一直得不到调度。因此,我们会根据任务的运行状态,动态的调整任务的优先级,保证所有任务都有机会被调度到。 ● 连续5次调度失败(如无法获取资源,无法找到合适的源端或目的端等),则优先级会被下调。 ● 持续 30 分钟未被调度,则上调优先级。 ● 同一 tablet 任务的优先级至少间隔 5 分钟才会被调整一次。 同时为了保证初始优先级的权重,我们规定,初始优先级为 VERY_HIGH 的,最低被下调到 NORMAL。而初始优先级为 LOW 的,最多被上调为 HIGH。这里的优先级调整,也会调整用户手动设置的优先级。
副本均衡
Doris 会自动进行集群内的副本均衡。目前支持两种均衡策略,负载/分区。负载均衡适合需要兼顾节点磁盘使用率和节点副本数量的场景;而分区均衡会使每个分区的副本都均匀分布在各个节点,避免热点,适合对分区读写要求比较高的场景。但是,分区均衡不考虑磁盘使用率,使用分区均衡时需要注意磁盘的使用情况。 策略只能在fe启动前配置tablet_rebalancer_type ,不支持运行时切换。 负载均衡 负载均衡的主要思想是,对某些分片,先在低负载的节点上创建一个副本,然后再删除这些分片在高负载节点上的副本。同时,因为不同存储介质的存在,在同一个集群内的不同 BE 节点上,可能存在一种或两种存储介质。我们要求存储介质为 A 的分片在均衡后,尽量依然存储在存储介质 A 中。所以我们根据存储介质,对集群的 BE 节点进行划分。然后针对不同的存储介质的 BE 节点集合,进行负载均衡调度。 同样,副本均衡会保证不会将同一个 Tablet 的副本部署在同一个 host 的 BE 上。 BE 节点负载 我们用 ClusterLoadStatistics(CLS)表示一个 cluster 中各个 Backend 的负载均衡情况。TabletScheduler 根据这个统计值,来触发集群均衡。我们当前通过 磁盘使用率 和 副本数量 两个指标,为每个BE计算一个 loadScore,作为 BE 的负载分数。分数越高,表示该 BE 的负载越重。 磁盘使用率和副本数量各有一个权重系数,分别为 capacityCoefficient 和 replicaNumCoefficient,其 和恒为1。其中 capacityCoefficient 会根据实际磁盘使用率动态调整。当一个 BE 的总体磁盘使用率在 50% 以下,则 capacityCoefficient 值为 0.5,如果磁盘使用率在 75%(可通过 FE 配置项 capacity_used_percent_high_water 配置)以上,则值为 1。如果使用率介于 50% ~ 75% 之间,则该权重系数平滑增加,公式为: capacityCoefficient= 2 * 磁盘使用率 - 0.5 该权重系数保证当磁盘使用率过高时,该 Backend 的负载分数会更高,以保证尽快降低这个 BE 的负载。 TabletScheduler 会每隔 20s 更新一次 CLS。 分区均衡 分区均衡的主要思想是,将每个分区的在各个 Backend 上的 replica 数量差(即 partition skew),减少到最小。因此只考虑副本个数,不考虑磁盘使用率。 为了尽量少的迁移次数,分区均衡使用二维贪心的策略,优先均衡partition skew最大的分区,均衡分区时会尽量选择,可以使整个 cluster 的在各个 Backend 上的 replica 数量差(即 cluster skew/total skew)减少的方向。 skew 统计 skew 统计信息由ClusterBalanceInfo表示,其中,partitionInfoBySkew以 partition skew 为key排序,便于找到max partition skew;beByTotalReplicaCount则是以 Backend 上的所有 replica 个数为key排序。ClusterBalanceInfo同样保持在CLS中, 同样 20s 更新一次。 max partition skew 的分区可能有多个,采用随机的方式选择一个分区计算。 均衡策略 TabletScheduler 在每轮调度时,都会通过 LoadBalancer 来选择一定数目的健康分片作为 balance 的候选分片。在下一次调度时,会尝试根据这些候选分片,进行均衡调度。
存算分离的好处?
资源效率优化
资源利用率提高:计算与存储资源解耦,资源使用成本优化。作为底层的资源平台,基础IT环境的资源总是有限的,站在业务的角度是往往是存储先于计算达到瓶颈,到达时间点是不一样的。由于计算和存储的耦合设计,无论扩计算还是扩存储,都在会造成资源的浪费; 异构计算的资源负载混部:在统一存储平台提供面向异构计算的工作资源负载下的多维度查询分析服务。在线与离线计算共用计算和存储资源。解决资源波峰波谷问题,实现资源动态削峰填谷 存储降本: 存储利用率+冷热分层,支持存储弹性扩展,能支持多种廉价的存储方式。支持基于分布式\对象存储系统上的多层存储(热存储/标准存储/冷存储等)。举例来说,存储降本优化主要依赖于归档与冷存储占比大小。如冷存储占比40+%,存储成本大致下降20+%。 性能优化:由于存储和计算资源可以独立地进行优化和升级,存算分离架构可以更好地适应不同的计算负载和存储需求,不用分担额外的成本进行存储,能更充分利用cpu、内存从而提升查询效率。此外,数据的本地化存储可以降低数据访问的延迟和网络带宽的需求,从而提高系统的性能。 查询优化:算子落盘,缓存结果,能复用结果,加速查询。
系统稳定性提升
SRE可靠性提升: 业务稳定性-计算集群与存储集群分别高可用。运维易用提升:独立热升级-支持原地升级、滚动升级、补丁升级,升级时间短。原先耦合造成扩容不便:计算和存储耦合在一起的典型问题,例如每次扩容都需要考虑数据的迁移,给本来简单的扩容工作带来很多风险和不可控因素。 易于管理:存算分离架构可以简化系统管理和维护的难度,通过独立的存储和计算资源管理,可以更好地监控系统的性能和资源使用情况,提高系统的可管理性。 安全性:存算分离架构可以更好地保护数据的安全性和隐私性,通过数据的本地化存储和数据访问的控制,可以减少数据泄露和攻击的风险。
技术创新/业务增效
创新数据应用:基于统一存储平台之上对多源异构数据的深度分析数据价值的挖掘,大数据+AI的整合应用,以提升数据协同应用效率。 硬件利旧降本: 从运维的角度来讲,降低服务器的款型是降低运维难度和工作量的有效手段。随着业务复杂度的增加和新业务线上的加快,对服务器新机型与资源配比的要求也会随之增加。降低服务器的款型很难做到。私有化存算分离改造目的是有两个, 通过存算分离来更顺滑的兼容更多硬件款型, 支持不同计算引擎间混部以降低硬件成本。 行业趋势:行业大数据改造升级。随着反全球化浪潮的加剧,政企/金融行业信创自建大数据升级改造的需求越来越多,如何实现基于自研和信创大数据体系下的存算分离架构成为客户普遍诉求
总结:存算分离的好处:资源优化:解决数据快速移动,实现计算、存储弹性扩展,按需分配【按需/弹性】,能更方便快捷的支撑多种业务场景。 存算分离需要考虑: 存算分离:将存储和计算分离。 对象存储:使用对象存储来解决结构化、非结构化和半结构化数据的存储问题。 弹性计算:采用弹性计算技术来解决拆分小集群后集群资源不充分利用的问题。 容器化:采用容器化技术来解决深度学习计算任务和资源管理的问题,从而实现更高效地降本增效。