拉取消息长轮询机制|学习笔记

简介: 快速学习拉取消息长轮询机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)拉取消息长轮询机制 】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12500


拉取消息长轮询机制

 

长轮询的机制分析

RocketMQ 并未真正实现消息推送模式,而是消费者主动向消息服务器拉取消息, RocketMQ 推模式是循环向消息服务端发起消息拉取请求,如果消息消费者向 RocketMQ 拉取消息时,消息未到达消费队列时,如果不启用长轮询机制,则会在服务端等待时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端 puLL—NOT—FOUND (消息不存在)﹔如果开启长轮询模式, RocketMQ 一方面会每隔5s轮询检查一次消息是否到达,同时一有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息,如果是则从 CommitLog 中提取消息返回给消息拉取客户端,否则直到挂起超时,如果超时,那么也会给客户端返回puLL—NOT—FOUND 这么一个结果,超时时间的设置对于推送模式来讲默认的就是15秒。拉取模式可以通过这个客户端自己去设定。然后是否要支持长轮询?因为支持长轮循,它是每隔五秒它会去轮询一次,没有开启长轮循,它会每隔一秒去轮循一次。长轮询的机制可以在客户端去配置,在客户端 broker 配置文件当中去完成配置。

image.png

客户端发起的请求到服务端,服务端如果没有找到这个消息的话,就进到这个 case 中。注意这个类是 PullMessageProcessor 。这个类在 broker 当中,这个类是用来接收客户端拉取请求的这个核心的类,然后,在这里先做一个判段。判断现在是不是支持这个长轮询。

如果是开启这个长轮询的这种方式?就取出长轮询的这个等待的超时的时间 (pollingTimeMills),然后又封装拉取的请求。去处理这个请求,

代码如下:pullRequest pullRequest = new PullRequest(request,channel,pollingTimeMills,

this.brokerController.getMessageStore( ) .now(),offset, subscriptionData,messageFilter);

处理请求,在mpr.addPullRequest(PullRequest); 把请求放到队列中,放入manyPullRequest 队列中,

紧接着PullRequest是基于等待唤醒机制的,因为这是一个服务,一个线程。所以有一个 run 方法,

run 方法代码如下:

image.png它会去唤醒,等待的这个线程,然后去检查当前有没有新的消息。

怎么检查?如果开启了长轮询,就会等待五秒然后去查。

this.waitForRunning(interval:5*1000);

如果没有开启,根据下面的代码看出它会等待一秒。

Private long shortPollingTimeMills = 1000;

然后它会记录当前开始的时间。

Long beginLockTimestamp = this.systemClock.now();

This.checkHoldRequest();

private void checkHoldRequest(){

for (String key : this.pullRequestTable.keyset()){

String[] kArray = key . split( TOPIC_QUEUEID_SEPARATOR);

if (2 == kArray.length){

string topic = kArray[0];

int queueId = Integer.parseInt(kArray[1]);

final long offset = this.brokerController.getMessageStore().getMaxoffsetInQueue(topic,queueId);

try {

this.notifyMessageArriving(topic,queueId,offset);

}catch (Throwable e){

log .error( var1: "check hold request failed. topic={), queueId=(}" , topic,queueId,e);}

然后去检查一下当前的请求对象(checkHoldRequest),把这个请求对象取出来。拿到主题(topic),还有队列(queueId),然后发起一个查找的请求,在这里要去找 getmessagestore,看一下有没有新的消息,

如果有的话就通知 messingeariving。

public void notifyMessagaArriving(final string topic,final int queueId,final long maxOffset) {

notifyMessaggrriving(topic,queueId,maxoffset,tagsCode: null,msgStoreTime:0,filterBitMap: null,properties: null);}

上面所示代码表示已经有新的消息,然后要看这个消息是不是感兴趣的。

然后进入到 notifyMesageArriving 方法当中。

先判断当前是不是有新的消息,也就是这个 newestoffset 是不是大于当前 request 当中所持有的 offset?

对应代码:

if(newestoffset > request.getPullFromThisOffset())

如果大于就代表有新的消息,然后判断这个消息是不是感兴趣的,

如果是 (match),就去唤醒客户端来处理这个消息。

对应代码:if(match){

try{

this.brokerController.getPullMessageProcessor( ).executeRequestWhenwakeup(request.getClientChanne1(),

request.getRequestComnand());

}catch (Throwable e){

Log.error("execute request when wakeup failed.",e);

如果不是他当前感兴趣的。那么做时间的判断,就是如果等待的时间大于当前它的超时时间,就直接给客户端做一个响应回复当前消息没找到。

对应代码:if (System.currentTimeMillis() >= (request.getSuspendTimestamp( ) + request.getTimeOutMillis())){

try (

this.brokerController.getPullMessageProcessor( ).executeRequestwhenwakeup(request.getClientchanne1(),

request. getRequestCoemand());

}catch (Throwable e){

log.error("execute request when wakeup failed.",e);}

continue;}

以上就是整个长轮巡的机制。

那么,长轮巡的入口在哪里?

image.png长轮询的入口就在 PULL_NOT_FOUND 这里。

它会重新发起一个拉取的这个请求,

PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(),offset.subscriptionData, messageFilter);

然后提交这个请求去处理:

public void suspendPullRequest(final string topic,final int queueId,final PullRequest pullRequest) {

String key = this.buildkey(topic,queueId);

ManyPullRequest mpr = this.pullRequestTable-get(key);

if(null =mpr){

mpr = new ManyPullRequest();

ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key,mpr);

if (prev != null){

mpr = prev;

}

}

mpr .addPullRequest(pullRequest);

这个线程通过等待唤醒机制,一边去放这个队列 (mpr.addPullRequest(pullRequest)),

另外一边,看下面的代码:

if (this.brokerController.getBrokerconfig( ).isLongPollingEnable()){this.waitForRunning( interval: 5 * 1000);

}else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}

如果是长轮询,就是每隔五秒去检查消息,如果不是就每隔一秒去检查消息。

然后去处理这个请求:

long beginLockTimestamp = this.systemclock. now();

this.checkHoldRequest();

long costTime = this.systemClock.now( ) - beginLockTimestamp;

if (costTime > 5 * 1000) {

Log.info("[NOTIFYME] check hold request cost {} ms." , costTime);}

处理请求之后,要去进行一个通知,主要就是检查当前的消息是不是感兴趣。如果是感兴趣的,就进行一个客户端线程的一个唤醒,然后去做一个响应。

if(match) {

try {

this.brokerController.getPullMessageProcessor( ) .executeRequestWhenWakeup(request.getClientchannel(),request.getRequestcommand());

}catch ( Throwable e){

log.error ( "execute request when wakeup failed.” ,e);

continue;}

}

如果不是就继续做长轮询,若超时就直接返回当前没有找到的信息。

if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis()))

{ try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientchannel(),request.getRequestCommand());

}catch (Throwable e){

log.error( "execute request when wakeup failed.",e);

}

continue;}

如果开启了长轮询机制, PullRequestService 会每隔5秒被唤醒去检查当前是不是有新的消息到来,给客户端做响应,或者是直接超时给客户端做响应。消息的实施性会比较差,但是 RocketMQ 引入了另外一种机制,当消息到达时直接就唤醒挂起线程,然后触发一次检查。如何去唤醒触发一次检查。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3天前
|
消息中间件 Serverless 网络性能优化
消息队列 MQ产品使用合集之客户端和服务器之间的保活心跳检测间隔是怎么设置的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
29天前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
18 1
|
11月前
|
消息中间件
RabbitMQ如何支持事务性消息的发送和接收
RabbitMQ消息的发送和接收
171 0
|
12月前
|
消息中间件 存储 缓存
MQ 学习日志(五) 如何保证消息的幂等性
如何保证消息的幂等性 简述
85 0
MQ 学习日志(五) 如何保证消息的幂等性
|
11月前
|
消息中间件
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)1
47 0
|
11月前
|
消息中间件 存储 缓存
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2
RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)2
49 0
同步消息和异步消息
同步消息和异步消息
105 0
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
979 1
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 RocketMQ
简述RocketMQ消息拉取过程【一】
简述RocketMQ消息拉取过程【一】
684 0