开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):定时消息处理机制】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12504
定时消息处理机制
定时消息是消息发送到 broker之后,它并不会立即被消费者索要去处理,必须要等到时间到了之后才能发送到这个消费方。目前 RocketMQ并不支持任意时间的定时的设立,也就是它会提前设定一些时间,用户可以通过 delay Level 级别去选择。可供的时间有1s、5s、10s、30s、1min、2min......1h、2h等等。在进行设定的时候,如果指定 delayLevel=1,用的延迟是1秒,如果设立的是2,设立的是延迟5秒,依次类推即可。
它不支持任意精度的设定的主要原因是如果做了任意精度的设定,如果要去做消息排序会很麻烦,根据时间去排序就会很麻烦,再加上如果要去持久化,对性能会带来巨大的损耗。所以目前不支持任意精度的设定。
定时消息在发送时,它的流程是:首先,在 DefaultMessageStore 消息存储的核心类当中,执行它的加载方法。那么这里的定时任务,也就会进行执行。
点击 load ,会显示下面的代码:
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
I
return result;
在这可以看出,它在执行的时候会去加载当前设定的级别。
它是一个线程,那么线程再去启动时,有一个start方法,我们搜索打开这个 start 方法。
start 方法会拿到当前每一个的定时缓存表(delayLevelTable)当中的每一个定时队列(entry),它的信息当中的偏移量(offset),把这个偏移量取出来做判断,如果偏移量是为空,把偏移量设为0,将来在时间到了之后就可以根据偏移量,从队列当中去取消息。
if (started.compareAndSet( expect:false,update: true)) {
this.timer = new Timer(name:"ScheduleMessageTimerThread",isDaemon: true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()){
Integer level =entry .getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(leve1);
if (nuli == offset) {
offset = 0L;
)
if(timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask (level,offsee),FIRST_DELAY_TIME);
}}
以上代码是做定时的操作,如果时间到了,它就会去执行这里面它对应的任务。
定时任务是为了保证数据的一个安全性,
this.timer .scheduleAtFixedRate( new TimerTask( ){
@override
public void run(){
try {
if(started.get()) ,ScheduleMessageService.this.persist();
}catch (Throwable e){
log.error( "scheduleAtFixedRate flush exception" , e);
}
}
},delay: 10000,this.defaultMessageStore.getMessageStoreConfig( ).getFlushDelayoffsetInterval());
它会每隔一段时间,代码: private long flushDelayOffsetInterval = 1000*10;
所以每隔十秒它会将当前的数据持久化到磁盘。保证消息不丢失。
If(timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask (level,offsee),FIRST_DELAY_TIME);
}}
如上代码,当时间到了之后,他在这里边会去执行这个任务。这个任务其实就是 run 方法。
这个 run 方法,里边有一个 executeOnTimeup。
打开 executeOnTimeup ,代码如下:
public void executeOnTimeup(){
ConsumeQueue cq=
ScheduleMessageservice.this.defaultMessagestore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2Queueid(delayLevel));
long failscheduleoffset = offset;
首先它会根据延迟级别(delaylevel)拿到当前的这个队列。
拿到这个队列之后,根据偏移量从这个队列当中,取出当前的消息(SeletMappedBufferResult),然后去遍历这里的消息。
if( countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByoffset(offsetPy , sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessagestore
.putMessage(msgInner);
根据以上代码,针对每一个消息都封装了 message 的对象。然后把这个消息对象写到 MessageStore当中,写到正常的存储队列中,这样,延迟消息就把它转换成了正常消息,就能够被消费者正常去处理了。所以以上代码部分如果能够写过去就说明时间就可以正常的被消费了。
以上为定时消息处理机制的内容。
定时消息处理的入口:
public boolean load(){
boolean result = true;
try {
boolean lastExitOK = ! this.isTempFileExist();
Log.info("last shutdown {}", lastExitOK ? “normally" : "abnormally" );
if(null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();}
这个定时任务会随着DefaultMessageStore的加载而加载。
那么当这个定时任务正在工作的时候,会调用里面的 start方法:
public void start(){
if( started.compareAndset( expect: false,update: true)) {
this.timer = new Timer(name:”ScheduleMessageTimerThread",isDaemon:true);
for (Map.Entry<integer,Long> entry : this.delayLevelTabie.entryset() ){
Integer level = entry .getKey();
Long timeDelay = entry.getvalue();
Long offset = this.offsetTable.get( level);
if(null == offset) {
offset = 0L;}
if(timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level,offset),FIRST_DELAY_TINE);}
首先,先取出定时队列,它的偏移量(offset)如果为空会进行为零的一个校准,
this.timer .scheduleAtFixedRate(new TimerTask(){
@override
publie void run() {
try {
if(started.get()) ScheduleMessageService.this.persist();
}catch ( Throwable e){
Log.error( "scheduleAtFixedrate flush exception" , e);}
}
}delay:10000,this.defaultMessageStore.getMessageStoreConfig().getFlushDelayoffsetInterval()); }
然后每隔十秒把这些定时队列进行一个存储。
时间到了之后就会执行定时任务当中的 run方法:
@override
public void run() {
try {
if(isStarted()){
this.executeOnTimeup();
}catch (Exception e) {
log.error("ScheduleMessageService,executeOnTimeup exception",e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset),DELAY_FOR_A_PERIOD);}
}
这个 run 方法就是取出队列当中的消息,然后把它写到正常的一个存储的一个队列当中,保证能够被正常的消费。