开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):拉取消息长轮询机制 】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12500
拉取消息长轮询机制
长轮询的机制分析
RocketMQ 并未真正实现消息推送模式,而是消费者主动向消息服务器拉取消息, RocketMQ 推模式是循环向消息服务端发起消息拉取请求,如果消息消费者向 RocketMQ 拉取消息时,消息未到达消费队列时,如果不启用长轮询机制,则会在服务端等待时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端 puLL—NOT—FOUND (消息不存在)﹔如果开启长轮询模式, RocketMQ 一方面会每隔5s轮询检查一次消息是否到达,同时一有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息,如果是则从 CommitLog 中提取消息返回给消息拉取客户端,否则直到挂起超时,如果超时,那么也会给客户端返回puLL—NOT—FOUND 这么一个结果,超时时间的设置对于推送模式来讲默认的就是15秒。拉取模式可以通过这个客户端自己去设定。然后是否要支持长轮询?因为支持长轮循,它是每隔五秒它会去轮询一次,没有开启长轮循,它会每隔一秒去轮循一次。长轮询的机制可以在客户端去配置,在客户端 broker 配置文件当中去完成配置。
客户端发起的请求到服务端,服务端如果没有找到这个消息的话,就进到这个 case 中。注意这个类是 PullMessageProcessor 。这个类在 broker 当中,这个类是用来接收客户端拉取请求的这个核心的类,然后,在这里先做一个判段。判断现在是不是支持这个长轮询。
如果是开启这个长轮询的这种方式?就取出长轮询的这个等待的超时的时间 (pollingTimeMills),然后又封装拉取的请求。去处理这个请求,
代码如下:pullRequest pullRequest = new PullRequest(request,channel,pollingTimeMills,
this.brokerController.getMessageStore( ) .now(),offset, subscriptionData,messageFilter);
处理请求,在mpr.addPullRequest(PullRequest);
把请求放到队列中,放入manyPullRequest 队列中,
紧接着PullRequest是基于等待唤醒机制的,因为这是一个服务,一个线程。所以有一个 run 方法,
run 方法代码如下:
它会去唤醒,等待的这个线程,然后去检查当前有没有新的消息。
怎么检查?如果开启了长轮询,就会等待五秒然后去查。
this.waitForRunning(interval:5*1000);
如果没有开启,根据下面的代码看出它会等待一秒。
Private long shortPollingTimeMills = 1000;
然后它会记录当前开始的时间。
Long beginLockTimestamp = this.systemClock.now();
This.checkHoldRequest();
private void checkHoldRequest(){
for (String key : this.pullRequestTable.keyset()){
String[] kArray = key . split( TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length){
string topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxoffsetInQueue(topic,queueId);
try {
this.notifyMessageArriving(topic,queueId,offset);
}catch (Throwable e){
log .error( var1: "check hold request failed. topic={), queueId=(}" , topic,queueId,e);}
然后去检查一下当前的请求对象(checkHoldRequest),把这个请求对象取出来。拿到主题(topic),还有队列(queueId),然后发起一个查找的请求,在这里要去找 getmessagestore,看一下有没有新的消息,
如果有的话就通知 messingeariving。
public void notifyMessagaArriving(final string topic,final int queueId,final long maxOffset) {
notifyMessaggrriving(topic,queueId,maxoffset,tagsCode: null,msgStoreTime:0,filterBitMap: null,properties: null);}
上面所示代码表示已经有新的消息,然后要看这个消息是不是感兴趣的。
然后进入到 notifyMesageArriving 方法当中。
先判断当前是不是有新的消息,也就是这个 newestoffset 是不是大于当前 request 当中所持有的 offset?
对应代码:
if(newestoffset > request.getPullFromThisOffset())
如果大于就代表有新的消息,然后判断这个消息是不是感兴趣的,
如果是 (match),就去唤醒客户端来处理这个消息。
对应代码:if(match){
try{
this.brokerController.getPullMessageProcessor( ).executeRequestWhenwakeup(request.getClientChanne1(),
request.getRequestComnand());
}catch (Throwable e){
Log.error("execute request when wakeup failed.",e);
如果不是他当前感兴趣的。那么做时间的判断,就是如果等待的时间大于当前它的超时时间,就直接给客户端做一个响应回复当前消息没找到。
对应代码:if (System.currentTimeMillis() >= (request.getSuspendTimestamp(
) + request.getTimeOutMillis())){
try (
this.brokerController.getPullMessageProcessor( ).executeRequestwhenwakeup(request.getClientchanne1(),
request. getRequestCoemand());
}catch (Throwable e){
log.error("execute request when wakeup failed.",e);}
continue;}
以上就是整个长轮巡的机制。
那么,长轮巡的入口在哪里?
长轮询的入口就在 PULL_NOT_FOUND 这里。
它会重新发起一个拉取的这个请求,
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(),offset.subscriptionData, messageFilter);
然后提交这个请求去处理:
public void suspendPullRequest(final string topic,final int queueId,final PullRequest pullRequest) {
String key = this.buildkey(topic,queueId);
ManyPullRequest mpr = this.pullRequestTable-get(key);
if(null =mpr){
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key,mpr);
if (prev != null){
mpr = prev;
}
}
mpr .addPullRequest(pullRequest);
这个线程通过等待唤醒机制,一边去放这个队列 (mpr.addPullRequest(pullRequest)),
另外一边,看下面的代码:
if (this.brokerController.getBrokerconfig( ).isLongPollingEnable()){this.waitForRunning( interval: 5 * 1000);
}else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}
如果是长轮询,就是每隔五秒去检查消息,如果不是就每隔一秒去检查消息。
然后去处理这个请求:
long beginLockTimestamp = this.systemclock. now();
this.checkHoldRequest();
long costTime = this.systemClock.now( ) - beginLockTimestamp;
if (costTime > 5 * 1000) {
Log.info("[NOTIFYME] check hold request cost {} ms." , costTime);}
处理请求之后,要去进行一个通知,主要就是检查当前的消息是不是感兴趣。如果是感兴趣的,就进行一个客户端线程的一个唤醒,然后去做一个响应。
if(match) {
try {
this.brokerController.getPullMessageProcessor( ) .executeRequestWhenWakeup(request.getClientchannel(),request.getRequestcommand());
}catch ( Throwable e){
log.error ( "execute request when wakeup failed.” ,e);
continue;}
}
如果不是就继续做长轮询,若超时就直接返回当前没有找到的信息。
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis()))
{ try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientchannel(),request.getRequestCommand());
}catch (Throwable e){
log.error( "execute request when wakeup failed.",e);
}
continue;}
如果开启了长轮询机制, PullRequestService 会每隔5秒被唤醒去检查当前是不是有新的消息到来,给客户端做响应,或者是直接超时给客户端做响应。消息的实施性会比较差,但是 RocketMQ 引入了另外一种机制,当消息到达时直接就唤醒挂起线程,然后触发一次检查。如何去唤醒触发一次检查。