消息达到后实时推送机制|学习笔记

简介: 快速学习消息达到后实时推送机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息达到后实时推送机制】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12501


消息达到后实时推送机制


当开启长轮循之后会每隔五秒才去检查消息是否有新的消息到达。为了提高消息处理的一个实施性, RocketMQ 会在消息到达的时候也会去唤醒当前线程去做一次检查。那么消息到达之后,首先做的事情是进行消息的一个存储。this.reputMessageService.setReputFromoffset(maxPhysicalPosInLogicqueue);

this.reputNessageService.start();

在消息存储的这个类当中,有上面所示的两行代码,就是有这么一个 reputMessage Service。

然后进入到 start 的方法当中,代码如下:

public void start(){

Log.info(varl:"Try to start service thread;{} started:{} lastThread:{}"getServiceName (),started.get(), thread);

if(!started.compareAndSet( expect: false,update: true)) {

return;

}

stopped = false;

this.thread =new Thread(target:this, getServiceName());

this.thread.setDaemon(isDaemon);

this.thread.start();

}

进入这个 start 方法后,线程会被启动,然后会执行 run 方法,进入 run 方法,在run方法中,消息到达之后,在这里做doReput,然后我们看下面这段代码:

if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole()  && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){

DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),

dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());

这个 if 是判断,如果现在这个角色不是从节点,而是主节点,说明现在接收到了消息,它会去调用 messingarrivinglistener arriving 方法,那么这个方法的作用是什么?

看下图所示:

image.png它有一个结构的实现类是 notifymessagelistener,在notifymessagelistener类中回到了拉取请求后的Service,代码如下:

public void arriving(string topic,int queueId,long logicOffset,long tagsCode,

long msgStoreTime, byte[] filterBitMap,Map<String,String> properties) {

this.pullRequestHoldservice.notifyMessageArriving(topic,queueId,logicoffset,tagsCode,

mSgStoreTime,filterBitHap,properties);

在这个类中就是通知当前消息,如果有新的消息到达之后又回到如下代码的判断中,

if(match){

try{This.brokerController.getPullMessageProcessor().executeRequest.ggetClientChannel(),

Request.getRequestCommand());

}catch(Throwable e){

log.error(“execute request when wakeup failed.”,e);

}

continue;

}

这个消息如果是感兴趣(match),就通知客户端去开始做一个响应。如果不是,以下代码判断超时时间。

if(system.currentTimeMillis()>=(request.getSuspendTimestamp()+request.getTimeoutMillis())){

try{This.brokerController.getPullMessageProcessor().executeRequestwhenWakeup(request.getClientChannel(),

}catch(Throwable e){

log.error(“execute request when wakeup failed.”,e);

}

continue;

}

所以为了提高消息处理的一个实施性,代码:this.reputMessageService.start();

在消息到达之后。看它的 run 方法:

打开 doreput,然后在下面代码的位置做一个到达之后的一个通知。

if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole()  && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){

DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),

dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());

通知监听器,让他负责检查消息是不是感兴趣的,如果感兴趣,就直接返回。

image.png整个的这块就跟前面所讲的长轮询的机制合二为一。

相关文章
|
1月前
|
数据库
avue中实现消息的实时展示
avue中实现消息的实时展示
7 0
|
存储 编解码 安全
现代IM系统中聊天消息的同步和存储方案探讨
本文原作者:木洛,阿里云高级技术专家,内容有删减和修订,感谢原作者。 1、前言 IM全称是『Instant Messaging』,中文名是即时通讯。在这个高度信息化的移动互联网时代,生活中IM类产品已经成为必备品,比较有名的如钉钉、微信、QQ等以IM为核心功能的产品。
4952 0
|
消息中间件 SQL 存储
解析 RocketMQ 多样消费功能-消息过滤
在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。
236 0
解析 RocketMQ  多样消费功能-消息过滤
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
427 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 SQL 缓存
RocketMQ 5.0 多样消费功能详解消息过滤.|学习笔记(二)
快速学习 RocketMQ 5.0 多样消费功能详解消息过滤.
244 0
RocketMQ 5.0 多样消费功能详解消息过滤.|学习笔记(二)
|
消息中间件 存储 SQL
RocketMQ 5.0 多样消费功能详解消息过滤|学习笔记(一)
快速学习 RocketMQ 5.0 多样消费功能详解消息过滤.
332 0
RocketMQ 5.0 多样消费功能详解消息过滤|学习笔记(一)
|
消息中间件 存储 算法
多类型业务消息专题-定时消息| 学习笔记
快速学习多类型业务消息专题-定时消息
138 0
多类型业务消息专题-定时消息| 学习笔记
|
消息中间件 RocketMQ 开发者
拉取消息长轮询机制|学习笔记
快速学习拉取消息长轮询机制
238 0
拉取消息长轮询机制|学习笔记
|
消息中间件 SQL 存储
解析 RocketMQ 多样消费功能-消息过滤
在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。
解析 RocketMQ 多样消费功能-消息过滤
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
RocketMQ 消息集成:多类型业务消息——定时消息

热门文章

最新文章