开发者学堂课程【消息队列 RocketMQ 消息集成:【视频】定时消息】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/1189/detail/18099
【视频】定时消息
内容介绍:
一、概念介绍
二、使用方法
三、使用场景
四、实现原理
一、概念介绍
首先会分为这四个部分来介绍定时消息,首先是关于概念的介绍,其次是使用的方法,然后是使用的场景,最后是定时消息实现的那个原理。
首先是介绍一下什么是定时消息。这个消息指的是生产者加一条消息发的消息队列,但并不期望这条消息马上的可以被消费者消费到,而是希望到了指定的时间过后,消费者才可以消费到。而对于延时消息来说,他其实对于这个消息的一个另外的一种解释,指的是生产者希望消息延迟一段时间过后,消费者才可以消费到。那么其实可以理解为它是定时到当前的时间,再加上一个延迟过后的那个时间。所以在实现层面上来说,定时消息和延迟消息其实完全一样的。
那么对于一条消息来说,可以粗略的分为消息的发送、消息的存储、消息的消费这三个过程,那么对于普通消息来说,他这条消息发送到客户,那么这条消息就处于一个马上等待消费者来消费的一个状态。
而对于定时消息来说的话,其实可以理解为在普通消息的基础上叠加了一个定时投递到消费者的这么一个属性,整体的流程大致如下,当生产者发送了这条消息,发到 topic 过后,但他其实并不会马上进入到用户中 而是到了我们内部的一个系统的一个就是 topic 当中,用户所设定的时间过后,才会转存到用户的那个真正的topic里去,这个时候消费者才可以消费到这条消息,从而达到一个定时投递的一个效果。
二、使用方法
该怎么来使用这个消息,首先可以在控制台创建一个消费行为,定时延迟消息的这么一个通知。然后这是发送定时消息的一个 producer 的一个交代,
首先设置我们的 AK 和 SK ,然后我们把握的 airport ,然后我们的刚刚所在控台上创建的那个托盘,然后设置我们的 CK,这个 key 建议使用有一定业务属性的一个品,
比如我们的订单系统,就可以把这个 APP 作为这个 T 设在这里,然后是消息的 tag ,然后消息的巴黎。然后唯一和普通消息不同的是定时消息,这里需要设置投递的时间,比如说这里定时到五秒钟以后。然后然后用 producer 发送消息,然后发送成功之后,建议最好可以记录一下 vcid ,这样如果有什么问题,也便于后续的进行一些问题的排查。
看见了当定时的那个消息的时间到了过后,这条消息其实就是一条投递到用户的一个普通消息而已。所以对于消费者来说的话,他的代码的分析其实就和普通消费者是一模一样的。首先设置的 Key ,然后设置我们的消费者客户以及我们所订阅的topic ,然后这里在 list 里面,就可以进行一些业务的处理。同样,这里建议把这个 VID 记录下来,便于我们后期的一些异常的异常出现问题的时候的一个排查。
使用方法:
可以看一下我们的历史消息的一些使用的约束以及最佳实践,最好对于定时消息的进行对应消息来说,这个消息类型和主题支持的消息类型保持一致。
比如我们使用的那个创客。topic 是空白上所创建的,这个类型是定制或者延时。其次,超大的那个消息可能会导致服务端的风险,从而会被请求会被拒绝掉。然后对于定时消息,指定只能定时到未来的一个时间。不能超过所规定的一个最大的一个定时的时间。其次的话,对于那个每条消息的唯一索引。建议使用一个可以使用一个有业务上面的一个 ID 作为 CKey ,然后便于快速的那个定位消息以及问题的追踪,其次还其次的话,如果发送消息的时候出现了失败,建议业务方的话,需要对于这些异常信息进行感知,并主动进行从事以及一些冗余的处理。最后的话,在消费的过程中,这些链路可能在某些异常情况下会出现少量的一些重复。建议消费者做好消费者密度,避免重复的消费所带来的一些风险。
三、使用场景
了解了怎么使用,定时消息可以用在哪些场景里面?定时消息可以实现在一定的时间过后才进行一些操作,而我们的业务系统就不用再关心管理这些定时的状态,
例如我们现在有一个订单系统,希望用户在下单30分钟以后,去检查用户的订单状态。如果这个时候还没有支付的话,那么就自动的取消掉这个订单。那么我们可以在用户下单之后,发送一条定时到30分钟以后的一个定时消息,同时可以将订单的ID设置为 Message Key,到30分钟以后,订单系统收到这条消息过后,我们就可以去通过订单ID来检查这笔订单的一个状态,如果你用这个时候还没有进行支付的话,那么我们就可以自动的将这个订单关闭掉。我们可以看见通过定时消息,我们就可以很容易的实现一个简单的分布式的定时任务调度系统,来定时触发一些任务。
四、实现原理
知道了使用方法与场景过后?那么最后一个话题是,定时消息是怎么来实现的。如前面所介绍,定时消息的核心是如何在特定的时间把处于系统。定时里面的消息转移到用户的那个topic里面去。
现在开源版的4X版本的这个消息,是用户当发送到用户的是一个订阅消息的时候,咱们首先会将其转到一个 KXX 这么一个系统的一个定时 topic 的工具。并且其中的每一个 Q,对应着不同的 level,比如说 DelayLevel-1 对应的是 Q0,2就对应的是 Q2Q1,然后以此类推,那这样子的话,把同样的消息,把他给规整到一个Q里面。然后 Q 后台会为每一个 Q 启动一个定时任务,去拉取这个 Q 里面的消息,然后去检查这个消息有没有到所设定的时间。然后如果到时间了,就把这个消息反馈到用户的那个里面去。这样可以看见说这个实践其实是非常简单的,但是这有一个很大的问题,就是这个实现只能支持特定别落的定时消息。前不久,我们也已经将这个知识定时到任意两集时间的这个消息的一个实现的那个PR提出的社区,
下面再简单介绍一下怎么实现定时到任意时间的。再介绍具体的时间之前,先介绍一下时间能有这么一个算法,这也是定时消息实现的一个核心的算法。如上所示,这是一个一圈定时为七秒的一个时间轮,它的定时的最小精度十秒,同时时间上面会有一个直线,当前时间的一个指针,还会定时的移动到下一个刻度。现在想定取道医疗以后,那么我们就把这个数据放到一这个刻度里面。然后同时,如果有多个数据需要订到同一个时间里面,那么就把这个数据一个链表的形式追加添加到最后面。当时看到的这个刻度过后,就会将其读取出来,就是把这个列表里面的数据读取出来,然后把它投递给用户,那么就达到一个定时的一个这么一个效果,那么假设定的时间超过了这这个时间的一圈的时间该怎么处理。例如我们想现在定时定到14秒。那么由于一圈的时间是七秒,那么就将其放在六这么一个刻度里面,当第一次时间轮转到六的时候,发现当前时间小于我们期望的时间14,那么就不把他投递给用户,当第二次再次转到的时候,发现这个时间已经到了我们期望的14秒,那么就将其从电表里面出来,那么就达到了一个定时到14秒的一个效果。
那么在Q的使用,使用 Vue 可以定时的时间进行描述和存储。同时使用一个 only的进入时间轮上每一个刻度所对应的所有的消息。TimerLog记录了定时消息的一些重要的数据。用于后面定时的那个时间到了过后,将这个消息转到用户的那个 topic 里面去,其中一些比较主要的一些属性就如下。size 就是这个 title 的一个这条记录的一个大小,prepos 指的是这个前一条记录。这个我们接下来会详细的讲解。然后这个 sizeReal 指的是这条消息在 CommitLog 里面的那种 offset ,以及size这条消息在里面的一个大小。对于 TimerWheel 会来说的话,可以抽象的认为它是一个定长的数组,然后数组中的每一个代表时间轮上的一个刻度,TimerWheel 中的每一个客户就拥有这么几个属性。the first position 指的是这个客户的首条这个位置,然后 lastPos 就是最后一条 TimerLog 的记录的位置。
prepos、firstpos、lastpos 那么他们之间到底是一个什么的一个关系?我们一起来看一下。他们没有做这个,这个地方代表一个时间刻度,然后这个 first position 就指向 turn out 的第一,第一个记录,比如说就指向一个客户的第一条,就是一杠一,lastpos 就指向了中的这个刻度的最后一个记录,比如说这里的最快纪录是123123。其实对于同一个客户的这个团购的所有记录,会有这个 previous position 进行串联起来,比如说一杠三的 previous 就是一杠二,,这样子的话就串成一个链表。
那么当我们要新增一条记录的时候,比如就将新的记录的那个 ascription 事项,当前的 last position,这里就是一杠三,然后修改的指向一杠四。这样就将同一个客户上面能够记录全部都串起来了。
再来看一下,一条定时消息发送到了 Q 之后,那么他是最终是一个用户。首先系统发现这是发送一条消息过后,实际上它会转发到一个系统 topic ,这里用这个timer_topic,然后在这个里面,会有五个设备来进行分工的一个合作,但是可以整体的分为两个阶段,一个是入时间的工作,另一个是初始阶段的这个阶段。那么对于中午时间来说的话,这个首先,会有一个 service 负责从系统定时脱离里面去拉取消息,然后把它放到一个队列里面,等待下一步的处理。然后会有一个 input service,负责从这些同时把这些消息,把这些消息记录把它找到,找到对应时间的那个刻度,然后构建,把它放在时间里面去。然后对于初时间来,这边的话会有一个 Q 的类型来负责去转动这个时间,然后并取出当前时间客户的所有,才能够将其放这个的 dequeueputqueue。可以具体之前有介绍说每一个课程,其实是个列表,所以说对于 timerdequeuegetservice 来说,其实是把整个链表的最后一个节点,就是比如说这里一杠四,然后反向找到刻度的所有消息,把它给取出来,然后取出了这些这些消息过后,那么还有一个 service 是负责负责解析这些 log,然后通过 timerlog 里面的那些里面的那个读取到对应的消息。然然后把这些消息就放到一个 Q 里面。最后还有一个普通的一个 service 。然后来处理从就是当前这个时间读出来的这些消息这个,如果发现说这条消息已经到了用户锁定的那个时间过后,他就会把这个消息投递到用户真正的那个 topic 里面去。然后等待消费者来进行消费,如果发现这条消息还没有到那个定的时间,那么就将其重新的头回到timer topic。然后他又会进行新的一次循环,从而然后等到当当到下一次时间滚出来发现到的时候,他就会投到用户的 topic 里面去,然后把然后最终达到一个定时的这么一个效果。