RocketMQ Compaction Topic的设计与实现

简介: Compaction Topic 是一种基于 key 的数据过期机制,即对于相同 key 的数据只保留最新值。该特性的应用场景主要为维护状态信息,或者在需要用到 KV 结构时,可以通过 Compaction Topic 将 key-value 信息直接保存到 MQ,从而解除对外部数据库的依赖。快来了解下它的设计与实现原理吧~

本文作者:刘涛,阿里云智能技术专家。

01 Compaction Topic介绍

一般来说,消息队列提供的数据过期机制有如下几种,比如有基于时间的过期机制——数据保存多长时间后即进行清理,也有基于数据总量的过期机制——数据分区数据量达到一定值后进行清理。

而 Compaction Topic 是一种基于 key 的数据过期机制,即对于相同 key 的数据只保留最新值。

该特性的应用场景主要为维护状态信息,或者在需要用到 KV 结构时,可以通过 Compaction Topic 将 key-value 信息直接保存到 MQ,从而解除对外部数据库的依赖。比如维护消费位点,可以将消费组加分区作为 key ,将消费位点做 offset ,以消息形式发送到 MQ ,压缩之后,消费时获取最新 offset 信息即可。另外,像 connect 里的 source 信息比如 Binlog 解析位点或其他 source 处理的位点信息均可存到 Compaction Topic。同时 Compaction Topic 也支持 存储 RSQLDB 与 RStreams 的 checkpoint 信息。

02 需要解决的问题

Compaction 过程中,需要解决如下几个问题:

第一,数据写入过程中,数据如何从生产者发送到 broker 并且最终落盘,数据主备之间的 HA 如何保证?

第二,整个 compaction 的流程包括哪几个步骤?如果数据量太大,如何优化?

第三,数据消费时如何索引消息?如果找不到消息指定的 offset 消息,如何处理?

第四,如果有机器故障,如何恢复老数据?

03 方案设计与实现

第一,数据如何写入。

首先写入到 CommitLog,主要为复用 CommitLog 本身的 HA 能力。然后通过 reput线程将 CommitLog 消息按照 Topic 加 partition 的维度拆分到不同文件里,按分区整理消息,同时生成索引。这样最终消息就按 Topic 加 partition的粒度做了规整。

在 compaction 过程中,为什么不在原先的 commitLog 上做规整,而是再额外按分区做规整?原因如下:

  1. 所有数据都会写到 CommitLog ,因此单个 Topic 的数据不连续。如果要遍历单个 topic 的所有数据,可能需要跳着读,这样就会导致大量冷读,对磁盘 IO 影响比较大。
  2. CommitLog 数据有自动过期机制,会将老数据删除,因此不能将数据直接写到 CommitLog,而 CompactionLog 里的老数据为按 key 过期,不一定会删除。
  3. compact 以分区为维度进行。如果多个分区同时做 compact ,效率较低。因为很多分区的 key 同时在一个结构里,会导致同一个分区能够 compact 的数据比较少, 并且 compact 之后也需要重新写一份么,因此,索性就在 compact 之前将消息通过 reput service 重新归整一遍。

Compact 流程如下:

第一步,确定需要做 compaction 的数据文件列表。一般大于两个文件,需要排除当前正在写的文件。

第二步,将上一步筛选出的文件做遍历,得到 key 到 offset 的映射关系。

第三步,根据映射关系将需要保留的数据重新写到新文件。

第四步,用新文件替换老文件,将老文件删除。

第二步的构建 OffsetMap 主要目的在于可以知道哪文件需要被保留、哪文件需要被删除,以及文件的前后关系,这样就可以确定写入的布局,确定布局之后,就可以按照append 的方式将需要保留的数据写到新文件。

此处记录的并非 key 到 value 的信息,而是 key 到 Offset 的信息。因为 value 的数据 body 可能较长,比较占空间,而 offset 是固定长度,且通过 offset 信息也可以明确消息的先后顺序。另外,key 的长度也不固定,直接在 map 存储原始 key 并不合适。因此我们将 MD5 作为新 key ,如果 MD5 相同 key 认为也相同。

做 compaction 时会遍历所有消息,将相同 key 且 offset 小于 OffsetMap 的值删除。最终通过原始数据与 map 结构得到压缩之后的数据文件。

上图为目录结构展示。写入时上面为数据文件,下面为索引,要 compact 的是标红两个文件。压缩后的文件存储于子目录,需要将老文件先标记为删除,将子目录文件与 CQ 同时移到老的根目录。注意,文件与 CQ 文件名一一对应,可以一起删除。


随着数据量越来越大,构建的 OffsetMap 也会越来越大,导致无法容纳。

因此不能使用全量构建方式,不能将所有要 compact 的文件的 OffsetMap 一次性构建,需要将全量构建改为增量构建,构建逻辑也会有小的变化。

第一轮构建:如上图,先构建上面部分的 OffsetMap ,然后遍历文件,如果 offset 小于 OffsetMap 中对应 key 的 offset 则删除,如果等于则保留。而下面部分的消息的offset 肯定大于 OffsetMap 内的 offset ,因此也需要保留。

第二轮构建:从上一次结束的点开始构建。如果上一轮中的某个 key 在新一轮中不存在,则保留上一轮的值;如果存在,则依然按照小于删除、大于保留的原则进行构建。

将一轮构建变为两轮构建后, OffsetMap 的大小显著降低,构建的数据量也显著降低。

原先的索引为 CommitLog Position、Message Size 和 Tag Hush,而现在我们复用了bcq 结构。由于 Compact 之后数据不连续,无法按照先前的方式直接查找数据所在物理位置。由于 queueOffset 依然为单调增排列,因此可以通过二分查找方式将索引找出。

二分查找需要 queueoffset 信息,索引结构也会发生变化,而 bcq 带有 queueoffse 信息,因此可以复用 bcq 的结构。

Queueoffset 在 compact 前后保持不变。如果 queueoffset 不存在,则获取第一个大于 queueoffset 的消息,然后从头开始将所有全量数据发送给客户端。

机器故障导致消息丢失时,需要做备机的重建。因为 CommitLog 只能恢复最新数据,而 CompactionLog 需要老数据。之前的 HA 方式下,数据文件可能在 compact 过程中被被删除,因此也不能基于复制文件的方式做主备间同步。

因此,我们实现了基于 message 的复制。即模拟消费请求从 master 上拉取消息。拉取位点一般从 0 开始,大于等于 commitLog 最小offset 时结束。拉取结束之后,再做一次 force compaction 将 CommitLog 数据与恢复时的数据做一次 compaction ,以保证保留的数据是被压缩之后的数据。后续流程不变。

04 使用说明

生产者侧使用现有生产者接口,因为要按分区做 compact ,因此需要将相同 key 路由到相同的 MessageQueue,需要自己实现相关算法。

消费者侧使用现有消费者接口,消费到消息后,存入本地类 Map 结构中再进行使用。我们的场景大多为从头开始拉数据,因此需要在开始时将消费位点重置到0。拉取完以后,将消息 key 与 value 传入本地 kv 结构,使用时直接从该结构拿取即可。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 RocketMQ
RocketMQ报错:MQClientException:no route info of this topic的解决
RocketMQ报错:MQClientException:no route info of this topic的解决
400 0
|
2月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
226 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
5月前
|
消息中间件 Kubernetes RocketMQ
消息队列 MQ产品使用合集之topic是怎么选择分布在哪里brocker上面的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
96 2
|
3月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
71 2
|
4月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 测试技术 Apache
消息队列 MQ产品使用合集之在测试环境中拥有大量的topic会有什么影响
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
114 1
|
5月前
|
消息中间件 Java API
消息队列 MQ产品使用合集之遇到"No topic route info in name server for the topic"错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 RocketMQ
消息队列 MQ操作报错合集之无法自动创建topic,该怎么办
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
181 0