开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息达到后实时推送机制】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12501
消息达到后实时推送机制
当开启长轮循之后会每隔五秒才去检查消息是否有新的消息到达。为了提高消息处理的一个实施性, RocketMQ 会在消息到达的时候也会去唤醒当前线程去做一次检查。那么消息到达之后,首先做的事情是进行消息的一个存储。this.reputMessageService.setReputFromoffset(maxPhysicalPosInLogicqueue);
this.reputNessageService.start();
在消息存储的这个类当中,有上面所示的两行代码,就是有这么一个 reputMessage Service。
然后进入到 start 的方法当中,代码如下:
public void start(){
Log.info(varl:"Try to start service thread;{} started:{} lastThread:{}"getServiceName (),started.get(), thread);
if(!started.compareAndSet( expect: false,update: true)) {
return;
}
stopped = false;
this.thread =new Thread(target:this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
进入这个 start 方法后,线程会被启动,然后会执行 run 方法,进入 run 方法,在run方法中,消息到达之后,在这里做doReput,然后我们看下面这段代码:
if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),
dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().
dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());
这个 if 是判断,如果现在这个角色不是从节点,而是主节点,说明现在接收到了消息,它会去调用 messingarrivinglistener arriving 方法,那么这个方法的作用是什么?
看下图所示:
它有一个结构的实现类是 notifymessagelistener,在notifymessagelistener类中回到了拉取请求后的Service,代码如下:
public void arriving(string topic,int queueId,long logicOffset,long tagsCode,
long msgStoreTime, byte[] filterBitMap,Map<String,String> properties) {
this.pullRequestHoldservice.notifyMessageArriving(topic,queueId,logicoffset,tagsCode,
mSgStoreTime,filterBitHap,properties);
在这个类中就是通知当前消息,如果有新的消息到达之后又回到如下代码的判断中,
if(match){
try{
This.brokerController.getPullMessageProcessor().executeRequest.ggetClientChannel(),
Request.getRequestCommand());
}catch(Throwable e){
log.error(“execute request when wakeup failed.”,e);
}
continue;
}
这个消息如果是感兴趣(match),就通知客户端去开始做一个响应。如果不是,以下代码判断超时时间。
if(system.currentTimeMillis()>=(request.getSuspendTimestamp()+request.getTimeoutMillis())){
try{
This.brokerController.getPullMessageProcessor().executeRequestwhenWakeup(request.getClientChannel(),
}catch(Throwable e){
log.error(“execute request when wakeup failed.”,e);
}
continue;
}
所以为了提高消息处理的一个实施性,代码:this.reputMessageService.start();
在消息到达之后。看它的 run 方法:
打开 doreput,然后在下面代码的位置做一个到达之后的一个通知。
if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){
DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),
dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().
dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());
通知监听器,让他负责检查消息是不是感兴趣的,如果感兴趣,就直接返回。
整个的这块就跟前面所讲的长轮询的机制合二为一。