多类型业务消息专题-定时消息| 学习笔记

简介: 快速学习多类型业务消息专题-定时消息

开发者学堂课程【RocketMQ 消息集成:多类型业务消息专题多类型业务消息专题-定时消息】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/1161/detail/17325


多类型业务消息专题-定时消息

 

内容介绍:

一、   概念介绍:什么是定时消息

二、   使用方法:人如何使用定时消息

三、   使用场景:可以把定时消息用在什么场景里面

四、    实现原理:任意秒级精度的定时消息是如何实现的

 

一、   概念介绍:什么是定时消息


image.png


定时消息指的是生产者将一条消息发的消息队列,但并不期望这条消息马上可以被消费者消费到的期望,到了指定的时间过后,消费者才可以消费到。而对于原始消息来说,其实是对于消息的一个另外的一种解释,指的是生产者期望消费,希望消息延迟一段时间过后,消费者才可以消费到,其实可以理解为,它是定时到当前的时间,再加上一个延迟过后的时间。在实现层面上来说,定时消息和延迟消息其实完全一样的。对于一条消息来说,可以粗略的分为消息的发送,消息的存储以及消息的消费三个过程。


image.png

对于普通消息来说,一条消息发送过后,这条消息就处于一个马上等待消费者来消费的一个状态。而对于定时消息来说,其实可以理解为在普通消息的基础上,叠加了一个定时投递到消费者的一个属性,整体的流程大致如下,当生产者发送了一条定时消息发到topic过后,但它其实并不会马上进入到用户,真正的topic里面,但是到了内部的一个系统的一个定时topic当到了用户所设定的时间过后,才会转存到用户的真正的topic里。定时消费者才可以消费到这条消息,从而达到了一个定时投递的一个效果。


image.png


二、   使用方法:人如何使用定时消息


该怎么来使用消息呢?首先可以在控制台创建一个消费行为,定时延时消息的一个  topic


image.png


这是发送定时消息的一个 Producer 的一个 java 代码,首先设置 AK SK ,然后访问的 Body ,然后刚刚所在口袋上创建的 Topic 设置 VCK ,建议使用有一定业务属性的一个题,比如订单系统,就可以把订单 ID 作为T设在这里,然后是消息的 Tag ,然后消息到 Body 。唯一和普通消息不同的是定时消息,这里需要设置投递的时间,比如这里定时到五秒钟以后。然后调用 Producer 发送消息。然后发送成功过后,建议最好可以记录一下每个 ID ,这样如果有什么问题,便于后期的进行一些问题的排查。

 

定时消息的发送:

public static void main(String[]args){ClientServiceProviderproviderClientServiceProvider.loadService();      StaticSessionCredentialsProvider  staticSessionCredentialsProvider =

new StaticSessionCredentialsProvider("AccessKey","SecretKey");

ClientConfiguration      clientConfiguration      = ClientConfiguration.newBuilder()

l  setEndpoints("Endpoint")

l  setCredentialProvider(staticSessionCredentialsProvider)

l  build();

try{

Producer producer provider.newProducerBuilder()

l  setclientConfiguration(clientConfiguration)

l  build();

Message delayMessage provider.newMessageBuilder()

l  setTopic("Topic")

l  setKeys("Key")

l  setTag("Tag")

l  setBody("MessageBody".getBytes())

/设置定时到5秒之后

l  setDeliveryTimestamp(System.currentTimeMillis()+

Duration.ofSeconds(5).toMillis())

l  build();

SendReceipt sendReceipt producer.send(delayMessage);

/发送消息成功后,记录MessageI(0可用于异常问题的排道

System.out.println(sendReceipt.getMessageId());

}catch (ClientException e){

//捕获异常,进行异常处理

}

}

 

从前面可以看见的,当定时的消息的时间到了过后,这条消息其实就是一条投递到用户 Topic 的一个普通消息而已。所以对于消费者来说它的代码的编写,其实就和普通消息的消费者是一模一样的。首先设置 KSK 以前的控制,然后设置消费者Confidence,以及所订阅的 Topic 然后在 Listener 里面,就可以进行一些业务的处理,同样这里建议把 VC ID 记录下来,便于后期出现问题的时候的一个排查。

 

定时消息的消费:

public static void main(String[]args){

ClientServiceProviderproviderClientServiceProvider.loadService();      StaticSessionCredentialsProvider

staticSessionCredentialsProvider =

new StaticSessionCredentialsProvider("AccessKey","SecretKey");

ClientConfiguration      clientConfiguration ClientConfiguration.newBuilder()

l  setEndpoints("Endpoint")

l  setCredentialProvider(staticSessionCredentialsProvider)

l  build();

try{

PushConsumer  pushConsumer pravider.newPushConsumerBuilder()

l  setclientConfiguration(clkentConfiguration)

l  setConsumerGroup("ConsumerGroup")

l  setSubscriptionExpressions(Collections.singletonMap("Topic",new FilterExpression()))

l  setMessageListener(messageview ->{

System.out.println("Receive Message "

messageview.getMessageId());

return ConsumeResult.SUCCESS;

})

l  build();

}catch (ClientException e){

//桶获异常,进行异常处理

}

}

 

看一下历史消息的一些使用的约束,以及最佳时间定最好对于定时消息的进行,对应消息来说,消息类型和主题所支持的消息类型是需要保持一致的,就是使用的Topic 必须是空白上所创建的,特别的类型是定时或者延时,其次超大的消息可能会导致服务端的风险,从而会被请求会被拒绝掉,然后对于定时消息来说,指定只能定时到未来的一个时间,而且定时的时间不能超过所规定的一个最大的一个定时的时间。其次对于每条消息的唯一索引建议使用一个可以使用一个有业务上面有一个 ID 作为 CK ,便于快速的定位消息以及问题的最终。其次,如果发送消息的时候出现了失败,建议业务方需要对于这些异常进行感知,并主动进行重试以及一些容易的处理。最后在消费的过程中,这些链路可能在某些异常情况下会出现少量的一些重复,消费者建议做好消费能力,避免重复的消费所带来的一些风险。

 

三、   使用场景:可以把定时消息用在什么场景里面


image.png


了解如何使用以及一些最佳时间过后,历史消息可以用在哪些场景,利用地址消息,可以实现在一定的时间过后才进行一些操作,而业务系统就不用再关心管理这些定时的状态。例如现在有一个订单系统,希望用户在下单30分钟以后,去检查用户的订单状态。如果这时用户还没有进行支付的话,就自动的取消掉这笔订单,可以在用户下单之后,发送一条定时到30分钟以后的一个定时消息,同时可以将订单的ID设置为 Message ,当30分钟以后,订单系统收到这条消息过后,就可以去通过订单ID来检查这笔订单的一个状态。如果用户还没有进行支付,就可以自动地将这笔订单关闭掉。可以看见,通过定时消息,就可以很容易的实现一个简单的分布式的定时任务调度系统,来定时的触发一些任务。

 

四、   实现原理:任意秒级精度的定时消息是如何实现的


image.png


了解了使用的方法以及场景过后,最后一个话题是 Rocker MQ 的地址消息是怎么来实现的?如前面所介绍,这消息的核心是如何在特定的时间把处于系统 topic 定时投票里面的消息转移到用户的 topic 里面去。现在开版的4X版本的消息,是用户单发送到 Topic 用户的发送,到发送的是一个地址消息的时候, Que 首先会将其转到一个 schedule or topic XX一个系统的定时 topic 领取。并且其中的每一个Que,对应着不同的 Delay level 比如 Delay Leval-1 对应是 Q02对应的是 Q2,以此类推,把同样的内部的一些消息就给规整到一个 Que 里面。然后 Que 后台会不会每一个 Que 启动一个定时任务,去拉取 Que 里面的消息,然后去检查消息有没有到,所设定的时间,有没有测定时间。如果到时间了就把这消息转投到用户的Topic ,这样可以看见这时间其实是非常简单的,但是有一个很大的问题,就是时间只能支持特定的 Delay level 的定时消息,前不久也已将知识定时到任一秒级时间的定时消息,再简单介绍一下是怎么实现定时的任何时间再介绍具体的时间,先介绍一下时间论的一个算法,这也是定时消息实现的一个核心的算法。


image.png


如上所示这是一个一圈定时为七秒的一个时间轮,它的定时的最小精度十秒,同时时间轮上面会有一个指向当前时间的一个指针,它会定时的移动到下一个刻度。现在想定时取到一秒以后,就把这数据发送到这刻度里面。同时如果有多个数据需要定时到同一个时间里,就把这数据一个列表的形式追加添加到最后面。当时间轮转到一个刻度过后,就会将其读取出来把这列表里面的数据读取出来,然后把它给用户,就达到一个定时的一个效果。假设定的时间超过了时间一圈的时间该怎么处理呢?现在临时到十四由于一圈的时间是七秒,就将其放在六的一个刻度,当第一次时间段转到六的时候,发现当前时间小于期望的时间,就不把它投递给用户。当第二次再次转到六的时候,发现时间已经到了使用了14秒,就将其从列表里面出来,就达到了一个定时到14秒的一个效果。real 对于定时的时间文件描述。同时使用一个判断 only 的一个 log 来记录时间轮上每一个刻度所对应的所有的消息。它们 log 记录了一条定时消息的一些重要的元数据,用来后面定时的时间到了过后,将这些消息转移到用户的 Topic 里。其中一些比较主要的一些属性就如下,size 就是条记录的一个大小。prevPos 指的是层面 log 的前一条记录。接下来会详细的讲解。offsetReal ,指的是这条消息在 CommitLog 里面的 sight ,以及 size 这条消息在 CommitLog 里面的大小。对于 Commitreal 来说,可以抽象的认为它是一个哪一个定长的数组。然后数字中的每一格,代表时间来上的一个刻度。它们中的这每一个刻度,就拥有几个属性。FirstPos 指的是刻度的首条,记录了位置,lastPos指的是 TimeLog 最后一条,前面 log 记录的位置。


image.png


这些 first position last position previous position 它们之间到底是一个什么?


image.png  


它们会有动作,比如这地方,这一刻度,代表一个时间刻度,然后 first position 就指向 TimeLog 的第一个记录,比如 first position 就指向一个刻度第一条记录,就是1-1last Position 就指向 TimeLog 中的最后一条记录,比如这里的最后一个记录是1-3,它的 last position 就只向1-3。其实对于同一个客户, previous position 进行串联起来比如1-3pre verse 就是1-21-2preferred 就是1-1这样子就传承一个链表,当要新增一条记录,比如现在要新增一个1-4。就将新的记录的 position 指向当前的 last position,这里就是1-3,然后修改 last position 指向1-4。这样就将同一个客户上面的 log 记录全部都能看见,有了参考 real log 之后,再来看一下,把一条消息从发送到 Rocker MQ 之后,它是最终是怎么投一个用户?


image.png


首先系统发现这是发送了一条消息过后,实际上它会转发到一个系统的定时topic里面,这里用 target 来取代,然后在这 TimerMessageStore 里面,会有五个设备来进行分工的一个合作,但是可以整体把分为两个阶段,一个是入时间轮的工作,另一个是出时间轮的一个阶段,对于入时间轮,首先会有一个 service 负责从系统定时脱离里面去拉取消息,然后把它放到一个队列里面,等待下一步的处理。然后会有一个 Enqueue service ,负责把这些消费记录找到,找到对应时间段的那刻度,然后构建成 log ,然后把它放在时间里面去。对于出时间轮会有一个 Dequeue service 来负责去转动,这时间后并取出当前时间刻度的所有 log

 

将其放 dequeueGetQueue 的名称,之前有介绍每一个刻度,其实是个链表,所以对于 dequeueGetQueu ,其实就是把链表的最后一个节点,就是比如这里1-4,然后反向找到这条刻度里面的所有消息,把它取出来。然后取出这些消息过后,还有一个设计负责解析这些 Timelog ,然后通过参与 Timelog 里面的那些 log 读取到对应的消息,然后把这些消息又放到一个 Que 里面。最后还有一个 put message 的一个 service 来处理这些事,就是当前时间轮课读出来的这些消息,这条消息已经到了用户锁定的时间过后,就会把消息投递到用户。等待消费者来进行消费,如果发现这条消息还没有到定时的时间,就将其重新的头回到 topic 。然后又会进行新的一次循环,从而然后下一次从时间轮滚出来,发现到的时候它就会能投到用户的 topic 里面去,然后最终达到一个定时的一个效果。

相关文章
|
5月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
99 1
|
7月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,使用了RocketMQ的触发器,并且发送和接收消息都没有问题,但是消息轨迹中没有体现出来消费的情况如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
471 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 存储 运维
RocketMQ 消息集成:多类型业务消息-普通消息
本篇将从业务集成场景的诉求开始,介绍 RocketMQ 作为业务消息集成方案的核心能力和优势,通过功能场景、应用案例以及最佳实践等角度介绍 RocketMQ 普通消息类型的使用。
304 0
RocketMQ  消息集成:多类型业务消息-普通消息
|
消息中间件 运维 监控
多类型业务消息专题-普通消息 | 学习笔记(一)
快速学习多类型业务消息专题-普通消息
166 0
 多类型业务消息专题-普通消息 | 学习笔记(一)
|
消息中间件 存储 运维
多类型业务消息专题-普通消息 | 学习笔记(二)
快速学习多类型业务消息专题-普通消息
132 0
多类型业务消息专题-普通消息 | 学习笔记(二)
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
RocketMQ 消息集成:多类型业务消息——定时消息
|
消息中间件 存储 网络协议
多类型业务消息专题-事务消息 | 学习笔记
快速学习多类型业务消息专题-事务消息
153 0
多类型业务消息专题-事务消息 | 学习笔记
|
存储 消息中间件 Linux
多类型业务消息专题-顺序消息 | 学习笔记
快速学习多类型业务消息专题-顺序消息
多类型业务消息专题-顺序消息 | 学习笔记
|
消息中间件 存储 算法
【视频】定时消息 | 学习笔记
快速学习【视频】定时消息
144 0
【视频】定时消息 | 学习笔记