RocketMQ 消息集成:多类型业务消息——定时消息

本文涉及的产品
云原生网关 MSE Higress,422元/月
可观测可视化 Grafana 版,10个用户账号 1个月
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
简介: 本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。

作者:凯易、明锻


引言


Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了 100% 阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。


本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。


点击下方链接,查看直播讲解:

https://yqh.aliyun.com/live/detail/29063


概念:什么是定时消息


在业务消息集成场景中,定时消息是,生产者将一条消息发送到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间,消费者才可以消费到。


相似地,延迟消息其实是对于定时消息的另外一种解释,指的是生产者期望消息延迟一定时间,消费者才可以消费到。可以理解为定时到当前时间加上一定的延迟时间。


对比一下定时消息和普通消息的流程。普通消息,可以粗略的分为消息发送,消息存储和消息消费三个过程。当一条消息发送到 Topic 之后,那么这条消息就可以马上处于等待消费者消费的状态了。


1.png


而对于定时/延时消息来说,其可以理解为在普通消息的基础上叠加了定时投递到消费者的特性。生产者发送了一条定时消息之后,消息并不会马上进入用户真正的Topic里面,而是会被 RocketMQ 暂存到一个系统 Topic 里面,当到了设定的时间之后,RocketMQ 才会将这条消息投递到真正的 Topic 里面,让消费者可以消费到。image.gif


2.png


场景:为什么需要使用定时消息


在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。往往这类定时事件触发都会存在以下诉求:


  • 高性能吞吐:需要大量事件触发,不能有性能瓶颈。
  • 高可靠可重试:不能丢失事件触发。
  • 分布式可扩展:定时调度不能是单机系统,需要能够均衡的调度到多个服务负载。 


传统的定时调度方案,往往基于数据库的任务表扫描机制来实现。大概的思路就是将需要定时触发的任务放到数据库,然后微服务应用定时触发扫描数据库的操作,实现任务捞取处理。


这类方案虽然可以实现定时调度,但往往存在很多不足之处:


  • 重复扫描:在分布式微服务架构下,每个微服务节点都需要去扫描数据库,带来大量冗余的任务处理,需要做去重处理。
  • 定时间隔不准确:基于定时扫描的机制无法实现任意时间精度的延时调度。
  • 横向扩展性差:为规避重复扫描的问题,数据库扫表的方案里往往会按照服务节点拆分表,但每个数据表只能被单节点处理,这样会产生性能瓶颈。


3.png


在这类定时调度类场景中,使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。


  • 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。消息队列 RocketMQ 版的定时消息具有高并发和水平扩展的能力。


image.gif4.png


案例:使用定时消息实现金融支付超时需求


利用定时消息可以实现在一定的时间之后才进行某些操作而业务系统不用管理定时的状态。下面介绍一个典型的案例场景:金融支付超时。现在有一个订单系统,希望在用户下单 30 分钟后检查用户的订单状态,如果用户还没有支付,那么就自动取消这笔订单。


5.png

image.gif

基于 RocketMQ 定时消息,我们可以在用户下单之后发送一条定时到 30 分钟之后的定时消息。同时,我们可以使用将订单 ID 设置为 MessageKey。当 30 分钟之后,订单系统收到消息之后,就可以通过订单 ID 检查订单的状态。如果用户超时未支付,那么就自动的将这笔订单关闭。


原理:RocketMQ 定时消息如何实现


固定间隔定时消息


如前文介绍,定时消息的核心是如何在特定的时间把处于系统定时 Topic 里面的消息转移到用户的 Topic 里面去。


6.png


Apache RocketMQ 4.x 的版本的定时消息是先将定时消息放到按照 DelayLevel 放到 SCHEDULE_TOPIC_XXXX 这个系统的不同 Queue 里面,然后为每一个 Queue 启动一个定时任务,定时的拉取消息并将到了时间的消息转投到用户的 Topic 里面去。这样虽然实现简单,但也导致只能支持特定 DelayLevel 的定时消息。


当下,支持定时到任意秒级时间的定时消息的实现的 pr 提出到了社区,下面简单的介绍一下其基本的实现原理。


时间轮算法


在介绍具体的实现原理之前,先介绍一下经典的时间轮算法,这是定时消息实现的核心算法。


7.png


如上所示,这是一个一圈定时为 7 秒的时间轮,定时的最小精度的为秒。同时,时间轮上面会有一个指向当前时间的指针,其会定时的移向下一个刻度。


现在我们想定时到 1 秒以后,那么就将数据放到 “1” 这个刻度里面,同时如果有多个数据需要定时到同一个时间,


那么会以链表的方式添加到后面。当时间轮转到 “1” 这个刻度之后,就会将其读取并从链表出队。那如果想定到超过时间轮一圈的时间怎么处理呢?例如我们想定时到 14 秒,由于一圈的时间是 7 秒,那么我们将其放在“6”这个刻度里面。当第一次时间轮转到“6” 时,发现当前时间小于期望的时间,那么忽略这条数据。当第二次时间轮转到“6”时,这个时候就会发现已经到了我们期望的 14 秒了。


任意秒级定时消息


在 RocketMQ 中,使用 TimerWheel 对于时间轮进行描述和存储,同时使用一个 AppendOnly 的 TimerLog 记录时间轮上面每一个刻度所对应的所有的消息。


TimerLog 记录了一条定时消息的一些重要的元数据,用于后面定时的时间到了之后,将消息转移到用户的 Topic 里面去。其中几个重要的属性如下:

image.gif

8.png


对于 TimerWheel 来说,可以抽象的认为是一个定长的数组,数组中的每一格代表时间轮上面的一个“刻度”。TimerWheel 的一个“刻度”拥有以下属性。

image.gif

9.png


TimerWheel 和 TimerLog 直接的关系如下图所示:image.gif


10.png


TimerWheel 中的每一格代表着一个时间刻度,同时会有一个 firstPos 指向这个刻度下所有定时消息的首条 TimerLog 记录的地址,一个 lastPos 指向这个刻度下所有定时消息最后一条 TimerLog 的记录的地址。并且,对于所处于同一个刻度的的消息,其 TimerLog 会通过 prevPos 串联成一个链表。image.gif


11.png


当需要新增一条记录的时候,例如现在我们要新增一个 “1-4”。那么就将新记录的 prevPos 指向当前的 lastPos,即 “1-3”,然后修改 lastPos 指向 “1-4”。这样就将同一个刻度上面的 TimerLog 记录全都串起来了。


有了 TimerWheel 和 TimerLog 之后,我们再来看一下一条定时消息从发送到 RocketMQ 之后是怎么最终投递给用户的。image.gif


12.png


首先,当发现用户发送的是一个定时消息过后,RocketMQ 实际上会将这条消息发送到一个专门用于处理定时消息的系统 Topic 里面去


然后在 TimerMessageStore 中会有五个 Service 进行分工合作,但整体可以分为两个阶段:入时间轮和出时间轮


对于入时间轮:


  • TimerEnqueueGetService 负责从系统定时 Topic 里面拉取消息放入 enqueuePutQueue 等待 TimerEnqueuePutService 的处理
  • TimerEnqueuePutService 负责构建 TimerLog 记录,并将其放入时间轮的对应的刻度中 


对于出时间轮:


  • TimerDequeueGetService 负责转动时间轮,并取出当前时间刻度的所有 TimerLog 记录放入 dequeueGetQueue
  • TimerDequeueGetMessageService 负责根据 TimerLog 记录,从 CommitLog 中读取消息
  • TimerDequeuePutMessageService 负责判断队列中的消息是否已经到期,如果已经到期了,那么将其投入用户的 Topic 中,等待消费消费;如果还没有到期,那么重新投入系统定时 Topic,等待重新进入时间轮。 


实战:使用定时消息


了解了 RocketMQ 秒级定时消息的原理后,我们看下如何使用定时消息。首先,我们需要创建一个 “定时/延时消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。


13.png


从前面可以看出,对于定时消息来说,是在发送消息的时候 “做文章”。所以,对于生产者,相对于发送普通消息,我们可以在发送的时候设置期望的投递时间。


14.png


当定时的时间到了之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。image.gif


15.png


注意:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大。所以一般建议尽量不要设置大量相同触发时刻的消息。


点击此处,进入官网了解更多详情~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
开发框架 JavaScript 前端开发
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势
TypeScript 是一种静态类型的编程语言,它扩展了 JavaScript,为 Web 开发带来了强大的类型系统、组件化开发支持、与主流框架的无缝集成、大型项目管理能力和提升开发体验等多方面优势。通过明确的类型定义,TypeScript 能够在编码阶段发现潜在错误,提高代码质量;支持组件的清晰定义与复用,增强代码的可维护性;与 React、Vue 等框架结合,提供更佳的开发体验;适用于大型项目,优化代码结构和性能。随着 Web 技术的发展,TypeScript 的应用前景广阔,将继续引领 Web 开发的新趋势。
38 2
|
2月前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
6月前
|
消息中间件 运维 监控
ApsaraMQ Copilot for RocketMQ:消息数据集成链路的健康管家
阿里云消息队列 ApsaraMQ 始终围绕“高弹性低成本、更稳定更安全、智能化免运维”三大核心方向进行演进和拓展。在智能化免运维方面,通过 ApsaraMQ Copilot,为企业提供消息数据集成链路的健康管家,让消息服务走进智能化免运维的新时代。
71866 77
|
5月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
389 1
|
5月前
|
消息中间件 Java RocketMQ
Spring Boot与RocketMQ的集成
Spring Boot与RocketMQ的集成
|
7月前
|
消息中间件 数据采集 Serverless
云消息队列 RocketMQ 版-消息集成-概述
消息集成是助力企业数字化转型的全栈式消息与数据集成平台,简化流程,支持云上云下、跨区域集成。它提供低代码的事件流服务,具备数据源集成、数据清洗、Serverless自定义处理等功能,支持丰富的数据源和跨端连接。然而,使用时存在如单个任务数据限制、任务名称长度等约束。消息流入(Source)负责从各种数据源获取数据,消息流出(Sink)将数据分发到目标,数据处理(Transform)允许数据转换和分析,而任务(Task)则结合这些组件执行实际的集成操作。
263 3
|
消息中间件 Java RocketMQ
低版本的springboot(1.5.3)如何集成rocketmq
低版本的springboot(1.5.3)如何集成rocketmq
304 1
|
7月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在使用 DataWorks 数据集成同步 PostgreSQL 数据库中的 Geometry 类型数据如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
103 0
|
7月前
|
SQL JSON DataWorks
DataWorks产品使用合集之DataWorks 数据集成任务中,将数据同步到 Elasticsearch(ES)中,并指定 NESTED 字段中的 properties 类型如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
93 0
|
XML JSON API
API类型和集成规范指南
API类型和集成规范指南
213 0

相关产品

  • 云消息队列 MQ