定时消息处理机制|学习笔记

简介: 快速学习定时消息处理机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)定时消息处理机制】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12504


定时消息处理机制

 

定时消息是消息发送到 broker之后,它并不会立即被消费者索要去处理,必须要等到时间到了之后才能发送到这个消费方。目前 RocketMQ并不支持任意时间的定时的设立,也就是它会提前设定一些时间,用户可以通过 delay  Level 级别去选择。可供的时间有1s、5s、10s、30s、1min、2min......1h、2h等等。在进行设定的时候,如果指定 delayLevel=1,用的延迟是1秒,如果设立的是2,设立的是延迟5秒,依次类推即可。

它不支持任意精度的设定的主要原因是如果做了任意精度的设定,如果要去做消息排序会很麻烦,根据时间去排序就会很麻烦,再加上如果要去持久化,对性能会带来巨大的损耗。所以目前不支持任意精度的设定。

定时消息在发送时,它的流程是:首先,在 DefaultMessageStore 消息存储的核心类当中,执行它的加载方法。那么这里的定时任务,也就会进行执行。

点击 load ,会显示下面的代码:

public boolean load() {

boolean result = super.load();

result = result && this.parseDelayLevel();

I

return result;

在这可以看出,它在执行的时候会去加载当前设定的级别。

它是一个线程,那么线程再去启动时,有一个start方法,我们搜索打开这个 start 方法。

image.pngstart 方法会拿到当前每一个的定时缓存表(delayLevelTable)当中的每一个定时队列(entry),它的信息当中的偏移量(offset),把这个偏移量取出来做判断,如果偏移量是为空,把偏移量设为0,将来在时间到了之后就可以根据偏移量,从队列当中去取消息。

if (started.compareAndSet( expect:false,update: true)) {

this.timer = new Timer(name:"ScheduleMessageTimerThread",isDaemon: true);

for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()){

Integer level =entry .getKey();

Long timeDelay = entry.getValue();

Long offset = this.offsetTable.get(leve1);

if (nuli == offset) {

offset = 0L;

)

if(timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask (level,offsee),FIRST_DELAY_TIME);

}}

以上代码是做定时的操作,如果时间到了,它就会去执行这里面它对应的任务。

定时任务是为了保证数据的一个安全性,

this.timer .scheduleAtFixedRate( new TimerTask( ){

@override

public void run(){

try {

if(started.get()) ,ScheduleMessageService.this.persist();

}catch (Throwable e){

log.error( "scheduleAtFixedRate flush exception" , e);

}

}

},delay: 10000,this.defaultMessageStore.getMessageStoreConfig( ).getFlushDelayoffsetInterval());

它会每隔一段时间,代码: private long flushDelayOffsetInterval = 1000*10;

所以每隔十秒它会将当前的数据持久化到磁盘。保证消息不丢失。

If(timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask (level,offsee),FIRST_DELAY_TIME);

}}

如上代码,当时间到了之后,他在这里边会去执行这个任务。这个任务其实就是 run 方法。

这个 run 方法,里边有一个 executeOnTimeup。

打开 executeOnTimeup ,代码如下:

public void executeOnTimeup(){

ConsumeQueue cq=ScheduleMessageservice.this.defaultMessagestore.findConsumeQueue(SCHEDULE_TOPIC,

delayLevel2Queueid(delayLevel));

long failscheduleoffset = offset;

首先它会根据延迟级别(delaylevel)拿到当前的这个队列。

拿到这个队列之后,根据偏移量从这个队列当中,取出当前的消息(SeletMappedBufferResult),然后去遍历这里的消息。

if( countdown <= 0) {

MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByoffset(offsetPy , sizePy);

if (msgExt != null) {

try {

MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

PutMessageResult  putMessageResult =

ScheduleMessageService.this.writeMessagestore

.putMessage(msgInner);

根据以上代码,针对每一个消息都封装了 message 的对象。然后把这个消息对象写到 MessageStore当中,写到正常的存储队列中,这样,延迟消息就把它转换成了正常消息,就能够被消费者正常去处理了。所以以上代码部分如果能够写过去就说明时间就可以正常的被消费了。

以上为定时消息处理机制的内容。

定时消息处理的入口:

image.png

public boolean load(){

boolean result = true;

try {

boolean lastExitOK = ! this.isTempFileExist();

Log.info("last shutdown {}", lastExitOK ? “normally" : "abnormally" );

if(null != scheduleMessageService) {

result = result && this.scheduleMessageService.load();}

这个定时任务会随着DefaultMessageStore的加载而加载。

那么当这个定时任务正在工作的时候,会调用里面的 start方法:

public void start(){

if( started.compareAndset( expect: false,update: true)) {

this.timer = new Timer(name:”ScheduleMessageTimerThread",isDaemon:true);

for (Map.Entry<integer,Long> entry : this.delayLevelTabie.entryset() ){

Integer level  = entry .getKey();

Long timeDelay = entry.getvalue();

Long offset = this.offsetTable.get( level);

if(null == offset) {

offset = 0L;}

if(timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask(level,offset),FIRST_DELAY_TINE);}

首先,先取出定时队列,它的偏移量(offset)如果为空会进行为零的一个校准,

this.timer .scheduleAtFixedRate(new TimerTask(){

@override

publie void run() {

try {

if(started.get()) ScheduleMessageService.this.persist();

}catch ( Throwable e){

Log.error( "scheduleAtFixedrate flush exception" , e);}

}}delay:10000,this.defaultMessageStore.getMessageStoreConfig().getFlushDelayoffsetInterval()); }

然后每隔十秒把这些定时队列进行一个存储。

时间到了之后就会执行定时任务当中的 run方法:

@override

public void run() {

try {

if(isStarted()){

this.executeOnTimeup();

}catch (Exception e) {

log.error("ScheduleMessageService,executeOnTimeup exception",e);

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(

this.delayLevel, this.offset),DELAY_FOR_A_PERIOD);}

}

这个 run 方法就是取出队列当中的消息,然后把它写到正常的一个存储的一个队列当中,保证能够被正常的消费。

相关文章
|
15天前
|
Java Android开发
Android系统 获取用户最后操作时间回调实现和原理分析
Android系统 获取用户最后操作时间回调实现和原理分析
17 0
|
8月前
|
Android开发
为什么会触发ANR,从源码中扒一扒
为什么会触发ANR,从源码中扒一扒
49 0
|
5月前
|
监控
异步处理机制如何处理消息处理失败的情况?
异步处理机制如何处理消息处理失败的情况?
50 0
|
10月前
|
消息中间件 存储 JavaScript
Electron开发自定义通知 & 多并发下接收消息处理
Electron 专门提供了 Notification 可以用来实现`系统通知`,但是如果想实现自定义(如自定义UI样式等)通知,Notification 则不能实现。 下面摘录了我在`系统通知`和如何实现`自定义通知`的思路,另外也阐述了`多并发下接收消息`方案实现。
378 0
|
前端开发
前端学习案例1-异步和事件轮询
前端学习案例1-异步和事件轮询
67 0
前端学习案例1-异步和事件轮询
|
消息中间件 存储 算法
【视频】定时消息 | 学习笔记
快速学习【视频】定时消息
116 0
【视频】定时消息 | 学习笔记
|
消息中间件 负载均衡 Java
延迟消息|学习笔记
快速学习延迟消息
110 0
延迟消息|学习笔记
【EventBus】EventBus 源码解析 ( 事件发送 | 发布线程为 子线程 切换到 主线程 执行订阅方法的过程分析 )
【EventBus】EventBus 源码解析 ( 事件发送 | 发布线程为 子线程 切换到 主线程 执行订阅方法的过程分析 )
146 0
|
Android开发
【EventBus】事件通信框架 ( 发送事件 | 判断发布线程是否是主线程 | 子线程切换主线程 | 主线程切换子线程 )(一)
【EventBus】事件通信框架 ( 发送事件 | 判断发布线程是否是主线程 | 子线程切换主线程 | 主线程切换子线程 )(一)
160 0
【EventBus】事件通信框架 ( 发送事件 | 判断发布线程是否是主线程 | 子线程切换主线程 | 主线程切换子线程 )(二)
【EventBus】事件通信框架 ( 发送事件 | 判断发布线程是否是主线程 | 子线程切换主线程 | 主线程切换子线程 )(二)
108 0

热门文章

最新文章