JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息

简介:

ActiveMQ是一个消息中间件,对于消费者而言有两种方式从消息中间件获取消息:

①Push方式:由消息中间件主动地将消息推送给消费者;②Pull方式:由消费者主动向消息中间件拉取消息。看一段官网对Push方式的解释:

To be able to achieve high performance it is important to stream messages to consumers as fast as possible 
so that the consumer always has a buffer of messages, in RAM, ready to process 
- rather than have them explicitly pull messages from the server which adds significant latency per message.

采用Push方式,可以尽可能快地将消息发送给消费者(stream messages to consumers as fast as possible)

而采用Pull方式,会增加消息的延迟,即消息到达消费者的时间有点长(adds significant latency per message)。

 

但是,Push方式会有一个坏处:如果消费者的处理消息的能力很弱(一条消息需要很长的时间处理),而消息中间件不断地向消费者Push消息,消费者的缓冲区可能会溢出。

那ActiveMQ是怎么解决这个问题的呢?那就是  prefetch limit

prefetch limit 规定了一次可以向消费者Push(推送)多少条消息。

 Once the prefetch limit is reached, no more messages are dispatched to the consumer 
until the consumer starts sending back acknowledgements of messages (to indicate that the message has been processed)

当推送消息的数量到达了perfetch limit规定的数值时,消费者还没有向消息中间件返回ACK,消息中间件将不再继续向消费者推送消息。

 

那prefetch limit的值设置为多少合适?视具体的应用场景而定。

 If you have very few messages and each message takes a very long time to process 
you might want to set the prefetch value to 1 so that a consumer is given one message at a time. 

如果消息的数量很少(生产者生产消息的速率不快),但是每条消息 消费者需要很长的时间处理,那么prefetch limit设置为1比较合适。这样,消费者每次只会收到一条消息,当它处理完这条消息之后,向消息中间件发送ACK,此时消息中间件再向消费者推送下一条消息。

prefetch limit 设置成0意味着什么?

Specifying a prefetch limit of zero means the consumer will poll for more messages, one at a time, 
instead of the message being pushed to the consumer.

意味着此时,消费者去轮询消息中间件获取消息。不再是Push方式了,而是Pull方式了。即消费者主动去消息中间件拉取消息。

 

perfetch limit是“消息预取”的值,这是针对消息中间件如何向消费者发消息 而设置的。与之相关的还有针对 消费者以何种方式向消息中间件返回确认ACK(响应):比如消费者是每次消费一条消息之后就向消息中间件确认呢?还是采用“延迟确认”---即采用批量确认的方式(消费了若干条消息之后,统一再发ACK)。这就是 Optimized Acknowledge

ActiveMQ can acknowledge receipt of messages back to the broker in batches (to improve performance). 

 

引用 一段话:“如果prefetchACK为true,那么prefetch必须大于0;当prefetchACK为false时,你可以指定prefetch为0以及任意大小的正数。
不过,当prefetch=0是,表示consumer将使用PULL(拉取)的方式从broker端获取消息,broker端将不会主动push消息给client端,直到client端发送PullCommand时;
当prefetch>0时,就开启了broker push模式,此后只要当client端消费且ACK了一定的消息之后,会立即push给client端多条消息。”

 

那么,在程序中如何采用Push方式或者Pull方式呢?

从是否阻塞来看,消费者有两种方式获取消息。同步方式和异步方式。

同步方式使用的是ActiveMQMessageConsumer的receive()方法。而异步方式则是采用消费者实现MessageListener接口,监听消息。

 

使用同步方式receive()方法获取消息时,prefetch limit即可以设置为0,也可以设置为大于0

prefetch limit为零 意味着:“receive()方法将会首先发送一个PULL指令并阻塞,直到broker端返回消息为止,这也意味着消息只能逐个获取(类似于Request<->Response)”

prefetch limit 大于零 意味着:“broker端将会批量push给client 一定数量的消息(<= prefetch),client端会把这些消息(unconsumedMessage)放入到本地的队列中,只要此队列有消息,那么receive方法将会立即返回,当一定量的消息ACK之后,broker端会继续批量push消息给client端。”

 

当使用MessageListener异步获取消息时,prefetch limit必须大于零了。因为,prefetch limit 等于零 意味着消息中间件不会主动给消费者Push消息,而此时消费者又用MessageListener被动获取消息(不会主动去轮询消息)。这二者是矛盾的。

 

此外,还有一个要注意的地方,即消费者采用同步获取消息(receive方法) 与 异步获取消息的方法(MessageListener) ,对消息的确认时机是不同的。

具体可参考:这篇文章

参考资料:  ActiveMQ消息传送机制以及ACK机制详解


本文转自hapjin博客园博客,原文链接:http://www.cnblogs.com/hapjin/p/5683648.html,如需转载请自行联系原作者

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
SQL
Mybatis.xml文件中大于小于等于
Mybatis.xml文件中大于小于等于
214 0
Vue3 复制 copy 功能实现(vue-clipboard3)
Vue3 复制 copy 功能实现(vue-clipboard3)
1938 0
|
10月前
|
消息中间件 负载均衡 Java
如何设计一个分布式配置中心?
这篇文章介绍了分布式配置中心的概念、实现原理及其在实际应用中的重要性。首先通过一个面试场景引出配置中心的设计问题,接着详细解释了为什么需要分布式配置中心,尤其是在分布式系统中统一管理配置文件的必要性。文章重点分析了Apollo这一开源配置管理中心的工作原理,包括其基础模型、架构模块以及配置发布后实时生效的设计。此外,还介绍了客户端与服务端之间的交互机制,如长轮询(Http Long Polling)和定时拉取配置的fallback机制。最后,结合实际工作经验,分享了配置中心在解决多台服务器配置同步问题上的优势,帮助读者更好地理解其应用场景和价值。
550 18
|
存储 数据库 数据安全/隐私保护
MVCC实现原理
【10月更文挑战第15天】MVCC 通过维护版本链和相关信息,实现了在多事务并发环境下的数据隔离和并发控制,提高了数据库的性能和可用性。
386 57
|
11月前
|
设计模式 存储 算法
分布式系统架构5:限流设计模式
本文是小卷关于分布式系统架构学习的第5篇,重点介绍限流器及4种常见的限流设计模式:流量计数器、滑动窗口、漏桶和令牌桶。限流旨在保护系统免受超额流量冲击,确保资源合理分配。流量计数器简单但存在边界问题;滑动窗口更精细地控制流量;漏桶平滑流量但配置复杂;令牌桶允许突发流量。此外,还简要介绍了分布式限流的概念及实现方式,强调了限流的代价与收益权衡。
472 12
|
网络协议 应用服务中间件 nginx
性能提升-如何设置Windows操作系统TIME_WAIT状态的TCP连接快速回收时间?
性能提升-如何设置Windows操作系统TIME_WAIT状态的TCP连接快速回收时间?
513 0
|
安全 Java API
深入解析 Java 8 新特性:LocalDate 的强大功能与实用技巧
深入解析 Java 8 新特性:LocalDate 的强大功能与实用技巧
302 1
无缝融合:使用 Python 和 PyFFmpeg 合并视频的完整指南
使用Python和PyFFmpeg合并视频教程:安装pyffmpeg和subprocess模块,编写merge_videos函数,通过ffmpeg命令行工具进行视频拼接。运行脚本将多个.mp4文件合并为一个,并保存为merged_video.mp4。简单易用,提升内容创作效率。
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
432 1
|
Java C++ Spring
谈谈springboot里面的守护线程与本地线程
【4月更文挑战第18天】在Spring Boot中,线程的概念同Java标准线程模型一致,即区分为守护线程和用户线程。Spring Boot本身并不直接提供创建守护线程或用户线程的特殊机制,但它允许你通过标准Java方式或者利用Spring的框架特性来管理这些线程
684 2