AutoMQ 对象存储数据高效组织的秘密: Compaction

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: AutoMQ是一款使用对象存储的消息系统,通过内存攒批和EBS持久化降低API调用成本。它有两种对象类型:Stream Set Object和Stream Object。Compaction过程用于数据清理、减少元数据和提升读取性能。Compaction包括SSO和SO两阶段,本文聚焦于SSO Compaction,涉及索引解析、排序、数据段分裂和迭代计划。每个迭代按内存限制划分任务,读取数据段并上传新对象,最后提交元数据更新。AutoMQ还有其他特性如Force Split和分级限流。团队由Apache RocketMQ和Linux LVS背景成员组成,致力于提供低成本、高弹性的消息服务。

01

前言

AutoMQ 作为一款使用对象存储作为主要存储介质的消息系统,在写入链路,会将所有 Partition 的数据在内存中进行攒批(同时持久化至 EBS),当攒批大小达到一定阈值则将该批次的数据上传至对象存储,通过这种方式,使得对象存储的 API 调用成本和文件数量仅和吞吐相关,且不会随着分区数量的增加而线性增大,如下图:

在将攒批数据上传至对象存储的过程中可能产生两类对象(从分区到 Stream 的映射关系可参考「AutoMQ 如何做到 Apache Kafka 100% 协议兼容」[3]),首次了解的读者可以简单理解为一个分区的数据对应着一个 Stream ):

  • Stream Set Object(下简称 SSO):同一个 Object 中包含多个 Stream 的连续数据段

  • Stream Object(下简称 SO):同一个 Object 中只包含一个 Stream 的连续数据段

上传时,会将积攒的数据中同一 Stream 连续数据段长度超过一定阈值的数据直接上传为一个 SO,剩余的多个分区的数据按照 Stream Id 从小到大的顺序写入同一个 SSO 中,如下图:

02

Compaction 的目的

与 LSM-Tree Compaction [4] 机制类似,AutoMQ 的 Compaction 主要用于数据清理、减少元数据量以及增大数据内聚程度以提高读取性能。

  • 数据清理:通过 Compaction 来删除已经过期的分区数据

  • 减少元数据量:通过将多个小对象 Compact 成大对象,能够有效减少所需维护的元数据量

  • 提升读取性能:在 Apache Kafka 的文件结构下,消费一个分区的历史数据仅需要定位到该分区相应的 Segment 文件即可,但由于 AutoMQ 采用了攒批写入的方式,当分区数量较多时,一个 SSO 中可能只包含了一个分区的小部分数据,此时消费该分区的一段历史数据时,需要向多个 SSO 发起 API 调用,在调用成本增加的同时也容易影响冷读吞吐量。通过 Compaction,我们能将同一个分区的数据组织在尽可能少的对象上,从而提升消费性能。

03

Compaction 过程

AutoMQ 实现了两级 Compaction:

  1. SSO Compaction:将多个 SSO Compact 成不超过一个 SSO 和多个 SO

  2. SO Compaction:将属于同一 Stream 的多个 SO Compact 成更大的 SO

由于篇幅原因,本文将着重介绍 SSO Compaction。

3.1 准备工作

在 SSO Compaction 开始时,会先获取当前节点产生的所有 SSO,并读取各 SSO 的索引文件,解析出各个对象中的 Stream 和对应的数据范围,在这个过程中,各 Stream 过期的数据段将直接被忽略。本文将以下图所示的三个 SSO 的 Compaction 过程为例(需要注意的是,图上的色块长度仅用于表示对应数据段的长度,在这一步中,并未实际读取对应的数据段):

获取到各个 SSO 的索引后,按照 Stream Id 从小到大,同 Stream 数据偏移量从小到大的顺序对索引进行排序:

排序完成后,同一 Stream 的连续数据段大于分裂阈值的需要被分裂成单独的 SO,剩余的数据段将组成新的 SSO:

3.2 生成迭代计划

由于 Compaction 是周期性任务(默认周期为 20 分钟),对于一个大流量的线上集群而言,每次 Compaction 覆盖的 SSO 数据量可能达到上百 GB 甚至更多,想要将这些数据一次性拉取到本地进行 Compact 几乎是不可能实现的,故 AutoMQ 会根据预先配置的 Compaction 任务可使用的最大内存空间来将本次 Compaction 划分为多个迭代,每次迭代完成后,清理内存数据,再开始下一次迭代,从而实现在可控的内存空间下完成大规模数据的 Compaction。依然以上图为例,假设 Compaction 可用内存限制为 150,则本次 Compaction 将分为两个迭代完成:

在第一轮迭代中,S0 的两个数据段将作为 SSO-3 的第一个 Part 被上传,S1 的前两个数据段(30-60、60-120)将被合并为一个对象(SO-0)被上传,而 S2 的数据段由于只能部分满足第一轮迭代的内存限制,将被截断成两个 SO,在第一轮迭代中将能够满足内存限制的前一部分(S2 400-435)上传。

在第二轮迭代中,此前被截断的 S2 剩余数据段(435-500)会被单独上传为一个 SO,S2 的剩余数据段会作为 SSO-3 的第二个 Part 被上传。

3.3 发起读写

迭代计划制定完成后,就可以发起实际的读写请求了,为了最小化对象存储的 API 调用成本,在每轮迭代开始前会将本轮迭代需要读取的数据段按照所属的对象进行分组,由于 Compaction 的迭代顺序本身就是按照 Stream Id -> Offset 排序的,所以 SSO 中相邻的数据段可以被合并成一个 API 被读取,当一次迭代中的数据段被读取到本地完成拼装后即可触发上传。每次迭代中所需产生的对象都完成上传后,即可将此次迭代读取到内存中的数据段全部清除,从而为下一次迭代留出空间。以上文提到的两次迭代为例:

3.3.1 第一次迭代

1. 分别向三个 SSO 发起异步读取:

  • SSO-0 一次 Batch Read 读取 S0 (0-20) 以及 S1 (30-60) 两个数据段

  • SSO-1 一次 Batch Read 读取 S0 (20-25) 以及 S1 (60-120) 两个数据段

  • SSO-2 一次 Batch Read 读取 S2 (400-435) 数据段

  1. S0 (0-20) 和 S0 (20-25) 读取完成后作为 SSO-3 的第一个 Part 上传3. S1 (30-60) 和 S1 (60-120) 读取完成后通过 Multi-Part Upload 完成 SO-0 的上传4. S2 (400-435) 读取完成后通过 PutObject 完成 SO-1 的上传

3.3.2 第二次迭代:

  1. 分别向两个 SSO 发起异步读取:
  • SSO-0 一次 Batch Read 读取 S3 (210-230) 数据段

  • SSO-2 一次 Batch Read 读取 S2 (435-500) 和 S3 (230-270) 两个数据段

  1. S2 (435-500) 读取完成后通过 Multi-Part Upload 完成 SO-2 的上传3. S3 (210-230) 和 S3 (230-270) 读取完成后作为 SSO-3 的最后一个 Part 上传

3.4 Commit 元数据

当所有的迭代都执行完成后,对象存储中已经生成了本次 Compaction 中产生的所有对象,此时 Broker 节点将向 Controller 发起一次 Commit 请求,将被 Compact 的对象标记为删除,并使用新生成的对象索引对元数据进行替换。若在 Compaction 过程中由于节点下线或其他异常导致了 Compaction 终止,则此次 Compaction 过程中生成的对象将在 Commit 超时时间过后被清理。

04

结语

本文介绍了 AutoMQ 如何在有限的内存下实现大规模 SSO 对象的 Compaction。除本文覆盖的内容外,AutoMQ 还实现了诸如 Force Split、Compaction 分级限流、基于 UploadPartCopy 的 SO Compaction 等一系列特性,受限于篇幅本文不一一展开介绍,感兴趣的同学欢迎深入 AutoMQ 代码仓库进行了解。

参考资料

[1] KIP-405: Kafka Tiered Storage: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405\%3A+Kafka+Tiered+Storage
[2] S3Stream: https://github.com/AutoMQ/automq/tree/main/s3stream
[3] AutoMQ 如何做到 Apache Kafka 100% 协议兼容: https://mp.weixin.qq.com/s/ZOTu5fA0FcAJlCrCJFSoaw
[4] Log-structured merge-tree: https://en.wikipedia.org/wiki/Log-structured_merge-tree
[5] AWS S3 UploadPartCopy https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html

往期推荐

关于我们

我们是来自 Apache RocketMQ 和 Linux LVS 项目的核心团队,曾经见证并应对过消息队列基础设施在大型互联网公司和云计算公司的挑战。现在我们基于对象存储优先、存算分离、多云原生等技术理念,重新设计并实现了 Apache Kafka 和 Apache RocketMQ,带来高达 10 倍的成本优势和百倍的弹性效率提升。

🌟 GitHub 地址:https://github.com/AutoMQ/automq
💻 官网:https://www.automq.com
👀 B站:AutoMQ官方账号
🔍 视频号:AutoMQ

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
4月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
5月前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之同步数据到OSS时,文件的切分单位如何设置
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
存储 安全 大数据
对象存储的意义:探索数据新纪元的关键基石
在信息爆炸时代,数据成为核心资产,而高效安全的数据存储至关重要。对象存储作为一种新兴技术,起源于20世纪90年代,旨在解决传统文件系统的局限性。随着云计算和大数据技术的发展,它已成为关键技术之一。对象存储具备高可扩展性、高可靠性、低成本、易于管理和多协议支持等优点。它支撑大数据发展、推动云计算繁荣、助力企业数字化转型并保障数据安全。未来,对象存储将进一步提升性能,实现智能化管理,并与边缘计算融合,获得政策支持,成为数据新时代的关键基石。
160 3
|
4月前
|
DataWorks 安全 定位技术
DataWorks产品使用合集之如何同步OSS中的Parquet数据,并解析里面的数组成多个字段
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之将按日分区的表同步数据到OSS数据源,该如何配置
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
155 1
|
5月前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用问题之如何直接加载oss中的parque数据,无需指定列和分区
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现OSS数据到Kafka的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 分布式计算 大数据
MaxCompute产品使用合集之是否支持创建OSS外部表为分区表,并访问OSS上以分区方式存储的数据
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
存储 分布式计算 大数据
MaxCompute产品使用合集之读取OSS数据出现重复的情况是什么导致的
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
JSON 运维 Serverless
函数计算产品使用问题之如何实现数据的读取和修改,而不需要每次都从OSS下载完整的数据
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。