定时消息处理机制|学习笔记

简介: 快速学习定时消息处理机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)定时消息处理机制】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12504


定时消息处理机制

 

定时消息是消息发送到 broker之后,它并不会立即被消费者索要去处理,必须要等到时间到了之后才能发送到这个消费方。目前 RocketMQ并不支持任意时间的定时的设立,也就是它会提前设定一些时间,用户可以通过 delay  Level 级别去选择。可供的时间有1s、5s、10s、30s、1min、2min......1h、2h等等。在进行设定的时候,如果指定 delayLevel=1,用的延迟是1秒,如果设立的是2,设立的是延迟5秒,依次类推即可。

它不支持任意精度的设定的主要原因是如果做了任意精度的设定,如果要去做消息排序会很麻烦,根据时间去排序就会很麻烦,再加上如果要去持久化,对性能会带来巨大的损耗。所以目前不支持任意精度的设定。

定时消息在发送时,它的流程是:首先,在 DefaultMessageStore 消息存储的核心类当中,执行它的加载方法。那么这里的定时任务,也就会进行执行。

点击 load ,会显示下面的代码:

public boolean load() {

boolean result = super.load();

result = result && this.parseDelayLevel();

I

return result;

在这可以看出,它在执行的时候会去加载当前设定的级别。

它是一个线程,那么线程再去启动时,有一个start方法,我们搜索打开这个 start 方法。

image.pngstart 方法会拿到当前每一个的定时缓存表(delayLevelTable)当中的每一个定时队列(entry),它的信息当中的偏移量(offset),把这个偏移量取出来做判断,如果偏移量是为空,把偏移量设为0,将来在时间到了之后就可以根据偏移量,从队列当中去取消息。

if (started.compareAndSet( expect:false,update: true)) {

this.timer = new Timer(name:"ScheduleMessageTimerThread",isDaemon: true);

for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()){

Integer level =entry .getKey();

Long timeDelay = entry.getValue();

Long offset = this.offsetTable.get(leve1);

if (nuli == offset) {

offset = 0L;

)

if(timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask (level,offsee),FIRST_DELAY_TIME);

}}

以上代码是做定时的操作,如果时间到了,它就会去执行这里面它对应的任务。

定时任务是为了保证数据的一个安全性,

this.timer .scheduleAtFixedRate( new TimerTask( ){

@override

public void run(){

try {

if(started.get()) ,ScheduleMessageService.this.persist();

}catch (Throwable e){

log.error( "scheduleAtFixedRate flush exception" , e);

}

}

},delay: 10000,this.defaultMessageStore.getMessageStoreConfig( ).getFlushDelayoffsetInterval());

它会每隔一段时间,代码: private long flushDelayOffsetInterval = 1000*10;

所以每隔十秒它会将当前的数据持久化到磁盘。保证消息不丢失。

If(timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask (level,offsee),FIRST_DELAY_TIME);

}}

如上代码,当时间到了之后,他在这里边会去执行这个任务。这个任务其实就是 run 方法。

这个 run 方法,里边有一个 executeOnTimeup。

打开 executeOnTimeup ,代码如下:

public void executeOnTimeup(){

ConsumeQueue cq=ScheduleMessageservice.this.defaultMessagestore.findConsumeQueue(SCHEDULE_TOPIC,

delayLevel2Queueid(delayLevel));

long failscheduleoffset = offset;

首先它会根据延迟级别(delaylevel)拿到当前的这个队列。

拿到这个队列之后,根据偏移量从这个队列当中,取出当前的消息(SeletMappedBufferResult),然后去遍历这里的消息。

if( countdown <= 0) {

MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByoffset(offsetPy , sizePy);

if (msgExt != null) {

try {

MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

PutMessageResult  putMessageResult =

ScheduleMessageService.this.writeMessagestore

.putMessage(msgInner);

根据以上代码,针对每一个消息都封装了 message 的对象。然后把这个消息对象写到 MessageStore当中,写到正常的存储队列中,这样,延迟消息就把它转换成了正常消息,就能够被消费者正常去处理了。所以以上代码部分如果能够写过去就说明时间就可以正常的被消费了。

以上为定时消息处理机制的内容。

定时消息处理的入口:

image.png

public boolean load(){

boolean result = true;

try {

boolean lastExitOK = ! this.isTempFileExist();

Log.info("last shutdown {}", lastExitOK ? “normally" : "abnormally" );

if(null != scheduleMessageService) {

result = result && this.scheduleMessageService.load();}

这个定时任务会随着DefaultMessageStore的加载而加载。

那么当这个定时任务正在工作的时候,会调用里面的 start方法:

public void start(){

if( started.compareAndset( expect: false,update: true)) {

this.timer = new Timer(name:”ScheduleMessageTimerThread",isDaemon:true);

for (Map.Entry<integer,Long> entry : this.delayLevelTabie.entryset() ){

Integer level  = entry .getKey();

Long timeDelay = entry.getvalue();

Long offset = this.offsetTable.get( level);

if(null == offset) {

offset = 0L;}

if(timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask(level,offset),FIRST_DELAY_TINE);}

首先,先取出定时队列,它的偏移量(offset)如果为空会进行为零的一个校准,

this.timer .scheduleAtFixedRate(new TimerTask(){

@override

publie void run() {

try {

if(started.get()) ScheduleMessageService.this.persist();

}catch ( Throwable e){

Log.error( "scheduleAtFixedrate flush exception" , e);}

}}delay:10000,this.defaultMessageStore.getMessageStoreConfig().getFlushDelayoffsetInterval()); }

然后每隔十秒把这些定时队列进行一个存储。

时间到了之后就会执行定时任务当中的 run方法:

@override

public void run() {

try {

if(isStarted()){

this.executeOnTimeup();

}catch (Exception e) {

log.error("ScheduleMessageService,executeOnTimeup exception",e);

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(

this.delayLevel, this.offset),DELAY_FOR_A_PERIOD);}

}

这个 run 方法就是取出队列当中的消息,然后把它写到正常的一个存储的一个队列当中,保证能够被正常的消费。

相关文章
|
iOS开发 MacOS
苹果电脑 Mac OS X 系统上防止误按 command+Q 退出软件
在Mac系统操作中,Command + W 关闭一个窗口,Command + Q 退出整个程序,这用起来很方便快捷。然而,由于Q和W、A紧紧挨着已经有N次想关窗口的时候,或者想全选的时候,按成了Command Q,简直疯了有没有?…… 为了避免这种悲剧的发生,查找收集了几个方法: 1、针对Chrome 可以勾选退出前提示: 2、小工具 QBlocker QBlocker 是一款能够帮你暂时禁止使用 command + Q 的小工具。
10725 1
|
设计模式 机器学习/深度学习 SQL
软考高级系统架构设计师通关经验分享
为什么考系统架构设计师是国家设立的计算机技术与软件专业技术资格考试(简称软考)中的一个高级科目,属于工程师高级职称系列,具有一定含金量。浙江省每年通过软考高级的人数约为1000+人,其中系统架构设计师科目的通过人数约为200+人。从学习角度来说,通过准备系统架构设计师的考试的过程,可以查漏补缺,并且了解一些系统架构设计相关的基础知识,实现一定程度上的自我提升;从目的性的角度来说,通过考试,可以在一
14562 4
软考高级系统架构设计师通关经验分享
|
域名解析 Kubernetes Java
图文详述Nacos配置中心使用:应用间配置共享、扩展配置文件加载优先级、新老版本差异
图文详述Nacos配置中心使用:应用间配置共享、扩展配置文件加载优先级、新老版本差异
6131 1
图文详述Nacos配置中心使用:应用间配置共享、扩展配置文件加载优先级、新老版本差异
|
机器学习/深度学习 存储 算法
【类脑计算】突触可塑性模型之Hebbian学习规则和STDP
本文介绍了突触可塑性中的Hebbian学习规则和STDP(Spike-Timing Dependent Plasticity),两种基于神经元活动调节突触强度的机制,其中Hebbian规则强调同时活动的神经元间的连接增强,而STDP则考虑了脉冲时间差异对突触强度的调节作用。
727 2
|
10月前
|
数据采集 数据挖掘 数据格式
Pandas 数据清洗
10月更文挑战第27天
198 0
Pandas 数据清洗
WXM
|
存储 缓存 Java
|
11月前
|
前端开发 Java API
Swagger接口文档 —— 手把手教学,全方位超详细小白能看懂,百分百能用Java版
本文提供了一份详细的Swagger接口文档生成工具的使用教程,包括了导入依赖、配置类设置、资源映射、拦截器配置、Swagger注解使用、生成接口文档、在线调试页面访问以及如何设置全局参数(如token),旨在帮助Java开发者快速上手Swagger。
6395 0
Swagger接口文档 —— 手把手教学,全方位超详细小白能看懂,百分百能用Java版
|
Java 关系型数据库 数据库
|
SQL 存储 分布式计算
Hive详解以及CentOS下部署Hive和Mysql
Hive详解以及CentOS下部署Hive和Mysql
652 0
Hive详解以及CentOS下部署Hive和Mysql