拉取消息长轮询机制|学习笔记

简介: 快速学习拉取消息长轮询机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)拉取消息长轮询机制 】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12500


拉取消息长轮询机制

 

长轮询的机制分析

RocketMQ 并未真正实现消息推送模式,而是消费者主动向消息服务器拉取消息, RocketMQ 推模式是循环向消息服务端发起消息拉取请求,如果消息消费者向 RocketMQ 拉取消息时,消息未到达消费队列时,如果不启用长轮询机制,则会在服务端等待时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端 puLL—NOT—FOUND (消息不存在)﹔如果开启长轮询模式, RocketMQ 一方面会每隔5s轮询检查一次消息是否到达,同时一有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息,如果是则从 CommitLog 中提取消息返回给消息拉取客户端,否则直到挂起超时,如果超时,那么也会给客户端返回puLL—NOT—FOUND 这么一个结果,超时时间的设置对于推送模式来讲默认的就是15秒。拉取模式可以通过这个客户端自己去设定。然后是否要支持长轮询?因为支持长轮循,它是每隔五秒它会去轮询一次,没有开启长轮循,它会每隔一秒去轮循一次。长轮询的机制可以在客户端去配置,在客户端 broker 配置文件当中去完成配置。

image.png

客户端发起的请求到服务端,服务端如果没有找到这个消息的话,就进到这个 case 中。注意这个类是 PullMessageProcessor 。这个类在 broker 当中,这个类是用来接收客户端拉取请求的这个核心的类,然后,在这里先做一个判段。判断现在是不是支持这个长轮询。

如果是开启这个长轮询的这种方式?就取出长轮询的这个等待的超时的时间 (pollingTimeMills),然后又封装拉取的请求。去处理这个请求,

代码如下:pullRequest pullRequest = new PullRequest(request,channel,pollingTimeMills,

this.brokerController.getMessageStore( ) .now(),offset, subscriptionData,messageFilter);

处理请求,在mpr.addPullRequest(PullRequest); 把请求放到队列中,放入manyPullRequest 队列中,

紧接着PullRequest是基于等待唤醒机制的,因为这是一个服务,一个线程。所以有一个 run 方法,

run 方法代码如下:

image.png它会去唤醒,等待的这个线程,然后去检查当前有没有新的消息。

怎么检查?如果开启了长轮询,就会等待五秒然后去查。

this.waitForRunning(interval:5*1000);

如果没有开启,根据下面的代码看出它会等待一秒。

Private long shortPollingTimeMills = 1000;

然后它会记录当前开始的时间。

Long beginLockTimestamp = this.systemClock.now();

This.checkHoldRequest();

private void checkHoldRequest(){

for (String key : this.pullRequestTable.keyset()){

String[] kArray = key . split( TOPIC_QUEUEID_SEPARATOR);

if (2 == kArray.length){

string topic = kArray[0];

int queueId = Integer.parseInt(kArray[1]);

final long offset = this.brokerController.getMessageStore().getMaxoffsetInQueue(topic,queueId);

try {

this.notifyMessageArriving(topic,queueId,offset);

}catch (Throwable e){

log .error( var1: "check hold request failed. topic={), queueId=(}" , topic,queueId,e);}

然后去检查一下当前的请求对象(checkHoldRequest),把这个请求对象取出来。拿到主题(topic),还有队列(queueId),然后发起一个查找的请求,在这里要去找 getmessagestore,看一下有没有新的消息,

如果有的话就通知 messingeariving。

public void notifyMessagaArriving(final string topic,final int queueId,final long maxOffset) {

notifyMessaggrriving(topic,queueId,maxoffset,tagsCode: null,msgStoreTime:0,filterBitMap: null,properties: null);}

上面所示代码表示已经有新的消息,然后要看这个消息是不是感兴趣的。

然后进入到 notifyMesageArriving 方法当中。

先判断当前是不是有新的消息,也就是这个 newestoffset 是不是大于当前 request 当中所持有的 offset?

对应代码:

if(newestoffset > request.getPullFromThisOffset())

如果大于就代表有新的消息,然后判断这个消息是不是感兴趣的,

如果是 (match),就去唤醒客户端来处理这个消息。

对应代码:if(match){

try{

this.brokerController.getPullMessageProcessor( ).executeRequestWhenwakeup(request.getClientChanne1(),

request.getRequestComnand());

}catch (Throwable e){

Log.error("execute request when wakeup failed.",e);

如果不是他当前感兴趣的。那么做时间的判断,就是如果等待的时间大于当前它的超时时间,就直接给客户端做一个响应回复当前消息没找到。

对应代码:if (System.currentTimeMillis() >= (request.getSuspendTimestamp( ) + request.getTimeOutMillis())){

try (

this.brokerController.getPullMessageProcessor( ).executeRequestwhenwakeup(request.getClientchanne1(),

request. getRequestCoemand());

}catch (Throwable e){

log.error("execute request when wakeup failed.",e);}

continue;}

以上就是整个长轮巡的机制。

那么,长轮巡的入口在哪里?

image.png长轮询的入口就在 PULL_NOT_FOUND 这里。

它会重新发起一个拉取的这个请求,

PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(),offset.subscriptionData, messageFilter);

然后提交这个请求去处理:

public void suspendPullRequest(final string topic,final int queueId,final PullRequest pullRequest) {

String key = this.buildkey(topic,queueId);

ManyPullRequest mpr = this.pullRequestTable-get(key);

if(null =mpr){

mpr = new ManyPullRequest();

ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key,mpr);

if (prev != null){

mpr = prev;

}

}

mpr .addPullRequest(pullRequest);

这个线程通过等待唤醒机制,一边去放这个队列 (mpr.addPullRequest(pullRequest)),

另外一边,看下面的代码:

if (this.brokerController.getBrokerconfig( ).isLongPollingEnable()){this.waitForRunning( interval: 5 * 1000);

}else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}

如果是长轮询,就是每隔五秒去检查消息,如果不是就每隔一秒去检查消息。

然后去处理这个请求:

long beginLockTimestamp = this.systemclock. now();

this.checkHoldRequest();

long costTime = this.systemClock.now( ) - beginLockTimestamp;

if (costTime > 5 * 1000) {

Log.info("[NOTIFYME] check hold request cost {} ms." , costTime);}

处理请求之后,要去进行一个通知,主要就是检查当前的消息是不是感兴趣。如果是感兴趣的,就进行一个客户端线程的一个唤醒,然后去做一个响应。

if(match) {

try {

this.brokerController.getPullMessageProcessor( ) .executeRequestWhenWakeup(request.getClientchannel(),request.getRequestcommand());

}catch ( Throwable e){

log.error ( "execute request when wakeup failed.” ,e);

continue;}

}

如果不是就继续做长轮询,若超时就直接返回当前没有找到的信息。

if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis()))

{ try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientchannel(),request.getRequestCommand());

}catch (Throwable e){

log.error( "execute request when wakeup failed.",e);

}

continue;}

如果开启了长轮询机制, PullRequestService 会每隔5秒被唤醒去检查当前是不是有新的消息到来,给客户端做响应,或者是直接超时给客户端做响应。消息的实施性会比较差,但是 RocketMQ 引入了另外一种机制,当消息到达时直接就唤醒挂起线程,然后触发一次检查。如何去唤醒触发一次检查。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
JSON Prometheus 监控
使用redis exporter轻松实现redis监控
上一篇我们讲到使用prometheus和grafana可以实现监控平台,本篇我们以监控redis为例展示如何对中间件进行监控配置。
5467 0
|
1月前
|
人工智能 JavaScript 前端开发
用 Go 语言轻松构建 MCP 服务器
本文介绍了使用 Go 语言构建 MCP 服务器的完整过程,涵盖创建服务器实例、注册工具、资源和提示词,以及通过 stdio 和 sse 模式启动服务的方法,帮助开发者快速集成 LLM 应用与外部系统。
|
监控 安全 生物认证
网络安全中的身份认证与访问控制技术详解
【6月更文挑战第30天】网络安全聚焦身份认证与访问控制,确保合法用户身份并限制资源访问。身份认证涉及生物和非生物特征,如密码、指纹。访问控制通过DAC、MAC、RBAC策略管理权限。最佳实践包括多因素认证、定期更新凭证、最小权限、职责分离和审计监控。这些措施旨在增强系统安全,防范未授权访问。
1719 2
使用队列解决高并发下使用Client对象调用webService接口
使用队列解决高并发下使用Client对象调用webService接口
|
安全 Java 数据库
第4章 Spring Security 的授权与角色管理(2024 最新版)(上)
第4章 Spring Security 的授权与角色管理(2024 最新版)
632 0
|
存储 消息中间件 API
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
|
消息中间件 Kafka
为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?
Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。
459 2
|
关系型数据库 MySQL 索引
MySQL系列-优化之精准解读in和exists
MySQL系列-优化之精准解读in和exists 1.解读in和exists 这两个关键字的区别主要是在于子查询上面,in是独立子查询,exists是相关子查询,例如: 用in查询有员工的部门       :select dept_name from dept where id in (select dept_id from emp); 用exists查询有员工的部门:select dept_name from dept where exists (select 1 from emp where dept.id=emp.dept_id); 当然,执行结果完全一致。
3478 0
|
Java
java身份证、手机号、邮箱正则校验工具类
java身份证、手机号、邮箱正则校验工具类
883 0