消息达到后实时推送机制|学习笔记

简介: 快速学习消息达到后实时推送机制

开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段)消息达到后实时推送机制】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/704/detail/12501


消息达到后实时推送机制


当开启长轮循之后会每隔五秒才去检查消息是否有新的消息到达。为了提高消息处理的一个实施性, RocketMQ 会在消息到达的时候也会去唤醒当前线程去做一次检查。那么消息到达之后,首先做的事情是进行消息的一个存储。this.reputMessageService.setReputFromoffset(maxPhysicalPosInLogicqueue);

this.reputNessageService.start();

在消息存储的这个类当中,有上面所示的两行代码,就是有这么一个 reputMessage Service。

然后进入到 start 的方法当中,代码如下:

public void start(){

Log.info(varl:"Try to start service thread;{} started:{} lastThread:{}"getServiceName (),started.get(), thread);

if(!started.compareAndSet( expect: false,update: true)) {

return;

}

stopped = false;

this.thread =new Thread(target:this, getServiceName());

this.thread.setDaemon(isDaemon);

this.thread.start();

}

进入这个 start 方法后,线程会被启动,然后会执行 run 方法,进入 run 方法,在run方法中,消息到达之后,在这里做doReput,然后我们看下面这段代码:

if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole()  && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){

DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),

dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());

这个 if 是判断,如果现在这个角色不是从节点,而是主节点,说明现在接收到了消息,它会去调用 messingarrivinglistener arriving 方法,那么这个方法的作用是什么?

看下图所示:

image.png它有一个结构的实现类是 notifymessagelistener,在notifymessagelistener类中回到了拉取请求后的Service,代码如下:

public void arriving(string topic,int queueId,long logicOffset,long tagsCode,

long msgStoreTime, byte[] filterBitMap,Map<String,String> properties) {

this.pullRequestHoldservice.notifyMessageArriving(topic,queueId,logicoffset,tagsCode,

mSgStoreTime,filterBitHap,properties);

在这个类中就是通知当前消息,如果有新的消息到达之后又回到如下代码的判断中,

if(match){

try{This.brokerController.getPullMessageProcessor().executeRequest.ggetClientChannel(),

Request.getRequestCommand());

}catch(Throwable e){

log.error(“execute request when wakeup failed.”,e);

}

continue;

}

这个消息如果是感兴趣(match),就通知客户端去开始做一个响应。如果不是,以下代码判断超时时间。

if(system.currentTimeMillis()>=(request.getSuspendTimestamp()+request.getTimeoutMillis())){

try{This.brokerController.getPullMessageProcessor().executeRequestwhenWakeup(request.getClientChannel(),

}catch(Throwable e){

log.error(“execute request when wakeup failed.”,e);

}

continue;

}

所以为了提高消息处理的一个实施性,代码:this.reputMessageService.start();

在消息到达之后。看它的 run 方法:

打开 doreput,然后在下面代码的位置做一个到达之后的一个通知。

if(BrokerRole,SLAVE != DefaultMessageStore.this.getMessagestorecontig().getBrokerRole()  && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()){

DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(),

dispatchiequest.getQueueId(),logicOffset:dispatchRequest.getConsumeQueueOffset() dispatchRequest.getTagsCode( ), disggtchRequest.getStoreTimestamp().dispatchRequest,getBitMap(),dispatchRequest.getPropertiesMap());

通知监听器,让他负责检查消息是不是感兴趣的,如果感兴趣,就直接返回。

image.png整个的这块就跟前面所讲的长轮询的机制合二为一。

相关文章
|
存储 编解码 安全
现代IM系统中聊天消息的同步和存储方案探讨
本文原作者:木洛,阿里云高级技术专家,内容有删减和修订,感谢原作者。 1、前言 IM全称是『Instant Messaging』,中文名是即时通讯。在这个高度信息化的移动互联网时代,生活中IM类产品已经成为必备品,比较有名的如钉钉、微信、QQ等以IM为核心功能的产品。
5140 0
|
4月前
|
程序员 数据库 UED
微信也在用的消息时序性技术,你知道多少?
本文由程序员小米撰写,探讨了在个人项目中如何保证消息的时序性。文章详细介绍了消息时序性的概念及其重要性,并提出了三种方案:ID设计(借鉴微信号段与跳跃式生成)、单聊场景下的单点序列化同步,以及群聊场景中的单点序列化处理。此外,还提供了多种优化方法,如消息时序对齐、本地时序记录等,帮助读者更好地解决消息乱序问题。适合所有关心即时通讯和社交应用技术细节的开发者阅读。
66 4
|
4月前
|
网络协议 程序员 UED
如何确保单聊消息100%送达?揭秘消息可靠传输的核心机制!
哈喽,大家好!我是技术好朋友小米,今天聊聊单聊消息的可靠传输。通过TCP的超时、重传、确认机制,结合去重和离线消息优化,我们可以设计出高效、可靠的消息传输系统。希望今天的分享能给大家带来帮助!如果有问题,欢迎留言交流。
59 0
如何确保单聊消息100%送达?揭秘消息可靠传输的核心机制!
|
Web App开发 缓存 网络协议
如何实现服务端向客户端推送数据
常见的http协议只能从客户端主动向服务端请求数据,而服务端无法向客户端发送数据.本文通过介绍几种方式来实现上述功能.
|
7月前
|
消息中间件 RocketMQ
消息队列 MQ产品使用合集之在开源延时消息插件方案中和原生延时消息方案中,同时设置参数是否会出现错乱
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
8月前
|
消息中间件 负载均衡 调度
个推延迟收到消息问题原因分析
个推延迟收到消息问题原因分析
141 1
|
8月前
|
应用服务中间件 数据安全/隐私保护 UED
事件推送技术
【5月更文挑战第4天】事件推送
95 12
|
7月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
8月前
|
移动开发 小程序 Go
【社区每周】小程序消息订阅插件升级为消息订阅接口(2022年8月第五期)
【社区每周】小程序消息订阅插件升级为消息订阅接口(2022年8月第五期)
50 0
|
消息中间件 存储 运维
阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台
RocketMQ5.0 的发布标志着阿里云消息从消息领域正式迈向了“消息、事件、流”场景大融合的新局面。未来阿里云消息产品的演进也将继续围绕消息、事件、流核心场景而开展。
1172 1
阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台