小红书黄章衡:AutoMQ Serverless 基石-秒级分区迁移

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000次 1年
对象存储 OSS,恶意文件检测 1000次 1年
简介: Apache Kafka的分区迁移通常需要大量数据同步,耗时较长,但在AutoMQ中,由于存算分离架构,迁移时间缩短至秒级。本文深入解析了AutoMQ秒级迁移的原理和源码,包括构建迁移命令、Broker同步变更、元数据持久化、数据上传、选主以及数据恢复等六个步骤。这种高效迁移能力适用于高峰期快速扩容和Serverless按需扩容场景,提升了系统的弹性和运维效率。AutoMQ由Apache RocketMQ和Linux LVS团队创建,旨在提供成本优化和高弹性消息队列服务。

作者|黄章衡,小红书消息引擎研发专家

01 引言

Apache Kafka 因存算一体化架构,分区迁移依赖大量数据同步的完成,以一个 100MB/s 流量的 Kafka 分区为例,运行一天产生的数据量约为 8.2T,如果此时需要将该分区迁移到其他 Broker,则需要对全量数据进行复制,即使对拥有 1 Gbps 带宽的节点,也需要小时级的时间来完成迁移,这使得 Apache Kafka 集群几乎不具备实时弹性能力。

而得益于 AutoMQ Kafka 的存算分离架构,在实际进行分区迁移时无需搬迁任何数据,这使得将分区迁移时间缩短至秒级成为了可能。
本篇文章将详细解析 AutoMQ 秒级迁移能力对应的原理和源码部分,并在最后讨论秒级迁移能力的应用场景。

02 AutoMQ 分区迁移流程概述

如下图,以分区 P1 从 Broker-0 迁移至 Broker-1 为例,流程分为六步:
ꔷ Step1 构建分区迁移命令: Controller(ReplicationControlManager:AlterPartitionReassign)当 Kraft Controller 收到分区迁移命令时,会构建出相应的 PartitionChangeRecord 并 commit 至 Kraft Log 层,将 Broker-0 从 leader replica 列表中删除,并将 Broker-1 加入 follower replica 列表中。
ꔷ Step2 Broker 同步分区变更: Broker(ElasticReplicaManager:AsyncApplyDelta)Broker-0 同步 Kraft Log 监听到 P1 分区变更,进入分区关闭流程。
ꔷ Step3 元数据持久化与分区 Stream 关闭: Broker (ElasticLog: Close) ElasticLog 为 LocalLog 基于 S3Stream 的实现。ElasticLog 会先持久化分区元数据至 Meta Stream(包括 LeaderEpoch、ProducerSnapshot、SegmentList、StreamIds etc..),随后将 Meta 和 Data Stream 全部 Close。
ꔷ Step4 数据上传与 Stream 关闭: Stream (S3Stream: Close) 每个 Stream 关闭时,若还存在未上传至对象存储的数据,则会触发强制上传,而在一个稳定运行的集群中,这部分数据往往不超过数百 MB,结合目前云厂商提供的突发网络带宽能力,这个过程一般仅需秒级即可完成。当 Stream 的数据上传完成后,即可安全的上报 Controller 关闭该 Stream 并从 Broker-0 删除分区 P1。
ꔷ Step5 主动重新触发选主: Controller (ReplicationControlManager:ElectLeader) P1 从 Broker 完成关闭后会主动触发一次选主,此时 Broker-1 作为唯一的 replica 晋升为 P1 的 leader,进入分区恢复流程。
ꔷ Step6 分区恢复与数据恢复: Broker (ElasticLog: Apply) 分区恢复时,会先上报 Controller 打开 P1 对应的 Meta Stream,根据 Meta Stream 从对象存储中拉取 P1 对应的元数据,从而恢复出 P1 相应的 checkpoint(Leader Epoch/SegmentList etc..),后根据 P1 的关闭状态(是否为 cleaned shutdown)进行对应的数据恢复。

03 AutoMQ 分区迁移流程源码解析

接下来我们详细解析分区迁移六步骤的源码,仍然以分区 P1 从 Broker-0 迁移至 Broker-1 为例:

注:AutoMQ 关闭分区前,需要先上报 Controller 关闭分区对应的所有 Stream 使其变为 Closed State,以便分区恢复时能够重新打开 Stream 使其变为 Opened State。这么做的目的是防止脑裂(也即两台 Broker 同时打开同一个 Stream),统一由 Controller 调控 Stream 的 State 和 Owner。

Step1: Controller 构建分区迁移命令
当 Controller 收到 alterPartitionReassignments 指令时,会构建 PartitionChangeBuilder 将该 Partition 的 TargetISR、Replicas 设置为目标 [1],但不会直接选举 Leader 而是选择延后选举,以保障选举前分区对应的 Stream 已经正常关闭。

此外,流程中还设置了分区选主超时器,若一段时间内源Broker 没有成功触发选主,则会在 Controller 中主动触发选主。

ReplicationControlManager:changePartitionReassignmentV2 {

    PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
        tp.topicId(),
        tp.partitionId(),
        // no leader election, isAcceptableLeader 直接返回 False,代表不选主
        brokerId -> false,
        featureControl.metadataVersion(),
        getTopicEffectiveMinIsr(topics.get(tp.topicId()).name.toString())
    );
    builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
    builder.setEligibleLeaderReplicasEnabled(isElrEnabled());
    // 设置 ISR、Replicas 为 [target.replicas().get(0)]
    builder.setTargetNode(target.replicas().get(0));
    TopicControlInfo topicControlInfo = topics.get(tp.topicId());
    if (topicControlInfo == null) {
        log.warn("unknown topicId[{}]", tp.topicId());
    } else {
        // 选主超时器
        TopicPartition topicPartition = new TopicPartition(topicControlInfo.name, tp.partitionId());
        addPartitionToReElectTimeouts(topicPartition);
    }
    return builder.setDefaultDirProvider(clusterDescriber).build();
}

Step2: Broker 同步分区变更
Controller 更新 Partition 的 Replicas 后,Broker-0 同步 Kraft Log 监听到 P1 分区变更,该分区不再属于 Broker-0,因此进入分区关闭流程。

ElasticReplicaManager: asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage) {
    if (!localChanges.deletes.isEmpty) {
      val deletes = localChanges.deletes.asScala
        .map { tp =>
          val isCurrentLeader = Option(delta.image().getTopic(tp.topic()))
            .map(image => image.partitions().get(tp.partition()))
            .exists(partition => partition.leader == config.nodeId)
          val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) && isCurrentLeader
          StopPartition(tp, deleteLocalLog = true, deleteRemoteLog = deleteRemoteLog)
        }
        .toSet

      def doPartitionDeletion(): Unit = {
        stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
        deletes.foreach(stopPartition => {
          val opCf = doPartitionDeletionAsyncLocked(stopPartition)
          opCfList.add(opCf)
        })
      }

      doPartitionDeletion()
    }    
}

Step3: Broker 元数据持久化与分区 Stream 关闭
当 ReplicasManager 调用 StopPartition 后,会一层层调用至 ElasticLog.Close.ElasticLog 为 LocalLog 基于 S3Stream 的实现,分区的数据和元数据与 S3Stream 的对应关系如下:

  • 每个 Segment 被映射到 DataStream

  • Segment 的 TxnIndex 和 TimeIndex 为别被映射为 Txn Stream 和 Time Stream

  • 分区的元数据 (producerSnapshot、LeaderEpoch、Streamids、SegmentList ...)则以KV的形式映射为 Meta Stream

ElasticLog 会先持久化分区元数据至 Meta Stream,随后将 Meta 和 Data Stream 全部 Close:

ElasticLog close(): CompletableFuture[Void] = {
    // already flush in UnifiedLog#close, so it's safe to set cleaned shutdown.
    // 标志为 Clean Shutdown
    partitionMeta.setCleanedShutdown(true)
    partitionMeta.setStartOffset(logStartOffset)
    partitionMeta.setRecoverOffset(recoveryPoint)

    maybeHandleIOException(s"Error while closing $topicPartition in dir ${dir.getParent}") {
        // 持久化元数据
        CoreUtils.swallow(persistLogMeta(), this)
        CoreUtils.swallow(checkIfMemoryMappedBufferClosed(), this)
        CoreUtils.swallow(segments.close(), this)
        CoreUtils.swallow(persistPartitionMeta(), this)
    }
    info("log(except for streams) closed")
    // 关闭分区对应的所有 Streams
    closeStreams()
}

Step4: S3Strema 数据上传与关闭
每个 Stream 关闭时:

  1. 等待所有未完成的 request
  2. 若还存在未上传至对象存储的数据,则会触发强制上传,而在一个稳定运行的集群中,这部分数据往往不超过数百 MB,结合目前云厂商提供的突发网络带宽能力,这个过程一般仅需秒级即可完成
  3. 当 Stream 的数据上传完成后,即可安全的上报 Controller 关闭该 Stream
S3Stream:Close(){

    // await all pending append/fetch/trim request
    List<CompletableFuture<?>> pendingRequests = new ArrayList<>(pendingAppends);
    if (GlobalSwitch.STRICT) {
        pendingRequests.addAll(pendingFetches);
    }
    pendingRequests.add(lastPendingTrim);
    CompletableFuture<Void> awaitPendingRequestsCf = CompletableFuture.allOf(pendingRequests.toArray(new CompletableFuture[0]));
    CompletableFuture<Void> closeCf = new CompletableFuture<>();

    // Close0 函数触发强制上传和 Stream 关闭
    awaitPendingRequestsCf.whenComplete((nil, ex) -> propagate(exec(this::close0, LOGGER, "close"), closeCf));

}

private CompletableFuture<Void> close0() {
    return storage.forceUpload(streamId)
        .thenCompose(nil -> streamManager.closeStream(streamId, epoch));
}

Step5: Broker 主动重新触发选主
P1 从 Broker 完成关闭后会主动触发一次选主:

ElasticReplicaManager:StopPartitions(partitionsToStop: collection.Set[StopPartition]) {

    partitionsToStop.foreach { stopPartition =>
      val topicPartition = stopPartition.topicPartition
      if (stopPartition.deleteLocalLog) {
        getPartition(topicPartition) match {
          case hostedPartition: HostedPartition.Online =>
            if (allPartitions.remove(topicPartition, hostedPartition)) {
              maybeRemoveTopicMetrics(topicPartition.topic)
              // AutoMQ for Kafka inject start
              if (ElasticLogManager.enabled()) {
                // For elastic stream, partition leader alter is triggered by setting isr/replicas.
                // When broker is not response for the partition, we need to close the partition
                // instead of delete the partition.
                val start = System.currentTimeMillis()
                hostedPartition.partition.close().get()
                info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms, trigger leader election")

                // 主动触发选主
                alterPartitionManager.tryElectLeader(topicPartition)
              } else {
                // Logs are not deleted here. They are deleted in a single batch later on.
                // This is done to avoid having to checkpoint for every deletions.
                hostedPartition.partition.delete()
              }
              // AutoMQ for Kafka inject end
            }

          case _ =>
        }
        partitionsToDelete += topicPartition
      }

}

Controller 中, Broker-1 作为唯一的 replica 晋升为 P1 的 leader,进入分区恢复流程

Step6: Broker 分区恢复与数据恢复
Broker 分区恢复时,会先上报 Controller 打开 P1 对应的 Meta Stream,根据 Meta Stream 从对象存储中拉取 P1 对应的元数据,从而恢复出 P1 相应的 checkpoint(Leader Epoch/SegmentList etc..),后根据 P1 的关闭状态(是否为 cleaned shutdown)进行对应的数据恢复。

代码部分对应 ElasticLog:Apply
ꔷ 步骤一:Open Meta Stream

metaStream = if (metaNotExists) {
    val stream = createMetaStream(client, key, replicationFactor, leaderEpoch, logIdent = logIdent)
    info(s"${logIdent}created a new meta stream: stream_id=${stream.streamId()}")
    stream
} else {
    val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
    // open partition meta stream
    val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
        .thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
        .get()
    info(s"${logIdent}opened existing meta stream: stream_id=$metaStreamId")
    stream
}

ꔷ 步骤二:从 MetaStream 拉取 Partition MetaInfo、Producer Snapshot 等分区元信息

// load meta info for this partition
val partitionMetaOpt = metaMap.get(MetaStream.PARTITION_META_KEY).map(m => m.asInstanceOf[ElasticPartitionMeta])
if (partitionMetaOpt.isEmpty) {
    partitionMeta = new ElasticPartitionMeta(0, 0, 0)
    persistMeta(metaStream, MetaKeyValue.of(MetaStream.PARTITION_META_KEY, ElasticPartitionMeta.encode(partitionMeta)))
} else {
    partitionMeta = partitionMetaOpt.get
}
info(s"${logIdent}loaded partition meta: $partitionMeta")

//load producer snapshots for this partition
val producerSnapshotsMeta = metaMap.get(MetaStream.PRODUCER_SNAPSHOTS_META_KEY).map(m => m.asInstanceOf[ElasticPartitionProducerSnapshotsMeta]).getOrElse(new ElasticPartitionProducerSnapshotsMeta())
val snapshotsMap = new ConcurrentSkipListMap[java.lang.Long, ByteBuffer](producerSnapshotsMeta.getSnapshots)
if (!snapshotsMap.isEmpty) {
    info(s"${logIdent}loaded ${snapshotsMap.size} producer snapshots, offsets(filenames) are ${snapshotsMap.keySet()} ")
} else {
    info(s"${logIdent}loaded no producer snapshots")
}

// load leader epoch checkpoint
val leaderEpochCheckpointMetaOpt = metaMap.get(MetaStream.LEADER_EPOCH_CHECKPOINT_KEY).map(m => m.asInstanceOf[ElasticLeaderEpochCheckpointMeta])
val leaderEpochCheckpointMeta = if (leaderEpochCheckpointMetaOpt.isEmpty) {
    val newMeta = new ElasticLeaderEpochCheckpointMeta(LeaderEpochCheckpointFile.CURRENT_VERSION, List.empty[EpochEntry].asJava)
    // save right now.
    persistMeta(metaStream, MetaKeyValue.of(MetaStream.LEADER_EPOCH_CHECKPOINT_KEY, ByteBuffer.wrap(newMeta.encode())))
    newMeta
} else {
    leaderEpochCheckpointMetaOpt.get
}
info(s"${logIdent}loaded leader epoch checkpoint with ${leaderEpochCheckpointMeta.entries.size} entries")
if (!leaderEpochCheckpointMeta.entries.isEmpty) {
    val lastEntry = leaderEpochCheckpointMeta.entries.get(leaderEpochCheckpointMeta.entries.size - 1)
    info(s"${logIdent}last leaderEpoch entry is: $lastEntry")
}

ꔷ 步骤三:从 MetaStream 拉取 SegmentList 并恢复所有 Segment 状态:

val logMeta: ElasticLogMeta = metaMap.get(MetaStream.LOG_META_KEY).map(m => m.asInstanceOf[ElasticLogMeta]).getOrElse(new ElasticLogMeta())
logStreamManager = new ElasticLogStreamManager(logMeta.getStreamMap, client.streamClient(), replicationFactor, leaderEpoch)
val streamSliceManager = new ElasticStreamSliceManager(logStreamManager)

val logSegmentManager = new ElasticLogSegmentManager(metaStream, logStreamManager, logIdent)

// load LogSegments and recover log
val segments = new CachedLogSegments(topicPartition)
// 这里通过 ElasticLogLoader 恢复所有 elastic log segment 的状态
val offsets = new ElasticLogLoader(
    logMeta,
    segments,
    logSegmentManager,
    streamSliceManager,
    dir,
    topicPartition,
    config,
    time,
    hadCleanShutdown = partitionMeta.getCleanedShutdown,
    logStartOffsetCheckpoint = partitionMeta.getStartOffset,
    partitionMeta.getRecoverOffset,
    Optional.empty(),
    producerStateManager = producerStateManager,
    numRemainingSegments = numRemainingSegments,
    createAndSaveSegmentFunc = createAndSaveSegment(logSegmentManager, logIdent = logIdent)).load()
info(s"${logIdent}loaded log meta: $logMeta")

04 秒级分区迁移

1)高峰期快速扩容

Kafka 运维人员通常会根据历史经验准备 Kafka 集群容量,然而总会有一些非预期中的热门事件和活动导致集群流量陡增。这时候就需要快速的将集群扩容并重平衡分区,来应对突发流量。

在 Apache Kafka 中,由于存储和计算紧密耦合,集群扩容往往需要搬迁 Partition 数据,这个过程需要耗费大量的时间和资源,在高峰期则无法高效的完成扩容。

而在 AutoMQ 中,由于存储和计算分离,扩容过程则无需涉及数据的搬迁。这意味着在高峰期需要快速扩容时,AutoMQ 能够更加灵活地响应,减少了扩容过程的时间和对业务的影响。

AutoMQ 具备极强的弹性能力,能够在5分钟内完成支撑1GB流量的扩容流程:

2) Serverless 按需扩容

AutoMQ 架构的另一个优势在于其能够实现 Serverless 按需扩容。

在传统的架构中,扩容往往需要手动调整服务器的规模,或者预先分配一定的资源。然而,AutoMQ 的存算分离架构使得扩容过程变得更加灵活和自动化。由于存储和计算分离,可以结合容器 HPA、云厂商的弹性部署组,根据实际流量需求自动地调整计算资源,而无需考虑存储数据的搬迁问题。这使得系统能够更好地应对流量的波动,同时也降低了运维的复杂性和机器成本。

布道师计划

加入 AutoMQ 布道师,共创开源生态!**详情点击:2024 AutoMQ 布道师计划**

*AutoMQ 在法律允许范围内,保留对本次活动的最终解释权

关于我们

我们是来自 Apache RocketMQ 和 Linux LVS 项目的核心团队,曾经见证并应对过消息队列基础设施在大型互联网公司和云计算公司的挑战。现在我们基于对象存储优先、存算分离、多云原生等技术理念,重新设计并实现了 Apache Kafka 和 Apache RocketMQ,带来高达 10 倍的成本优势和百倍的弹性效率提升。
🌟 GitHub 地址:https://github.com/AutoMQ/automq
💻 官网:https://www.automq.com
👀 B站:https://space.bilibili.com/3546572478482870?spm_id_from=333.337.0.0
🔍 微信公众号:AutoMQ

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
7月前
|
关系型数据库 Serverless 分布式数据库
|
4月前
|
Cloud Native Java Serverless
一键上天!如何将Spring PetClinic瞬间迁移到云端函数计算平台
【8月更文挑战第8天】在现代云原生开发中,将Spring应用迁移到Serverless环境正成为趋势。本文通过对比传统部署与函数计算,指导如何快速部署Spring PetClinic应用。传统部署需手动配置服务器和中间件,而函数计算则免除了这些步骤,仅需上传代码。首先,准备好Spring PetClinic源码或jar包;接着选择函数计算平台,本文以阿里云为例;随后对应用进行适配,并使用Maven构建部署包;登录阿里云控制台上传jar包并配置HTTP触发器;最后测试应用确保正常运行。
49 3
|
6月前
|
关系型数据库 Serverless 分布式数据库
PolarDB产品使用问题之普通版本的集群如何迁移到Serverless集群
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
6月前
|
运维 Serverless API
Serverless 应用引擎产品使用合集之如何实现一键迁移Web框架
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
7月前
|
弹性计算 运维 Java
Serverless 应用引擎产品使用之在Serverless 应用引擎中,将 Java 应用从 ECS 迁移到 SAE如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
109 2
|
关系型数据库 Serverless 分布式数据库
PolarDB Serverless能力测评:秒级弹升、无感伸缩与强一致性,助您实现高效云数据库管理!
云原生数据库 PolarDB MySQL 版是阿里云自研产品,100%兼容 MySQL。PolarDB产品具有多主多写、多活容灾、HTAP 等特性,交易性能最高可达开源数据库的6倍,分析性能最高可达开源数据库的400倍,TCO 低于自建数据库50%。【评测用!】
70525 15
|
关系型数据库 Serverless 分布式数据库
PolarDB Serverless能力测评:秒级弹升、无感伸缩与强一致性,助您实现高效云数据库管理!
云原生数据库 PolarDB MySQL 版是阿里云自研产品,100%兼容 MySQL。PolarDB产品具有多主多写、多活容灾、HTAP 等特性,交易性能最高可达开源数据库的6倍,分析性能最高可达开源数据库的400倍,TCO 低于自建数据库50%。
|
人工智能 运维 Kubernetes
阿里云 Serverless 容器服务全面升级:新增组件全托管、AI 镜像秒级拉取能力
阿里云 Serverless 容器服务全面升级:新增组件全托管、AI 镜像秒级拉取能力
|
容灾 Serverless
《云迁移与云容灾-Serverless架构企业数据备份和迁移》电子版地址
云迁移与云容灾-Serverless架构企业数据备份和迁移
535 0
《云迁移与云容灾-Serverless架构企业数据备份和迁移》电子版地址
|
存储 关系型数据库 MySQL
今日14点开播!打造智能无感、秒级弹性的Serverless数据库
2022年阿里云数据库在云栖大会宣布:All in Serverless。核心产品PolarDB、AnalyticDB、RDS均实现了Serverless能力。
今日14点开播!打造智能无感、秒级弹性的Serverless数据库
下一篇
DataWorks