ActiveMQ:设置多个并行的消费者

简介:

ActiveMQ:设置多个并行的消费者

消息队列本来就是一种经典的生产者与消费者模式。生产者向消息队列中发送消息,消费者从消息队列中获取消息来消费。

消息的传送一般由一个代理来实现的,那就是Message broker(即消息代理)。Message broker有两大职责,一是消息路由,二是数据转换。这就好比A给B寄信,如果不使用邮局的话,就要自己想办法送达,费时费力,而通过邮局的话,只要B的地址在邮局中注册过,那么天涯海角也能送达。这里的邮局扮演的角色就像消息系统中的Message broker。

众所周知,消息队列是典型的’send and forget’原则的体现,生产者只管发送,不管消息的后续处理。为了最大效率的完成对消息队列中的消息的消费,一般可以同时起多个一模一样的消费者,以并行的方式来拉取消息队列中的消息。这样的好处有多个:

  1. 加快处理消息队列中的消息。
  2. 增强稳定性,如果一个消费者出现问题,不会影响对消息队列中消息的处理。

使用Spring JMS来配置多个Listener实例其实也相当简单,只需要配置下MessageListenerContainer就行。

1
2
3
4
5
6
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" value="${jms.queue.name}"/>
    <property name="messageListener" ref="messageReceiver"/>
    <property name="concurrentConsumers" value="4"/>
</bean>

 

 

多配置一个属性concurrentConsumers,设置值为4,就是同时启动4个Listener实例来消费消息。

使用MessageSender来发送100条消息,可以检查消息处理的顺序会发生变化。

1
2
3
    for (int i = 0; i < 100; i++) {
        messageSender.send(String.format("message %d",i));
    }
1
2
3
4
5
6
7
8
9
...
Received: message 4
Received: message 7
Received: message 6
Received: message 5
Received: message 8
Received: message 10
Received: message 9

 

除了设置一个固定的Listener数量,也可以设置一个Listener区间,这样MessageListenerContainer可以根据消息队列中的消息规模自动调整并行数量。

1
2
3
4
5
6
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="${jms.queue.name}"/>
        <property name="messageListener" ref="messageReceiver"/>
        <property name="concurrency" value="4-8"/>
    </bean>

 

这次使用的是concurrency属性,4-8表示最小并发数是4,最大并发数为8,当然也可以给一个固定值,比如5,这样就相当于concurrentConsumers属性了。

原文地址http://www.bieryun.com/1871.html

相关文章
|
消息中间件 网络协议 Java
微服务——SpringBoot使用归纳——Spring Boot中集成ActiveMQ——ActiveMQ集成
本文介绍了在 Spring Boot 中集成 ActiveMQ 的详细步骤。首先通过引入 `spring-boot-starter-activemq` 依赖并配置 `application.yml` 文件实现基本设置。接着,创建 Queue 和 Topic 消息类型,分别使用 `ActiveMQQueue` 和 `ActiveMQTopic` 类完成配置。随后,利用 `JmsMessagingTemplate` 实现消息发送功能,并通过 Controller 和监听器实现点对点消息的生产和消费。最后,通过浏览器访问测试接口验证消息传递的成功性。
874 0
|
安全 API 数据安全/隐私保护
深入理解 PUT 和 POST 的区别
本文深入解析了HTTP请求中PUT与POST方法的区别及其应用场景。POST为非幂等方法,常用于创建资源或提交数据,每次请求可能改变服务器状态;PUT是幂等的,主要用于更新或完全替换特定资源,重复请求不会产生额外影响。文章通过对比两者特性、操作语义及实际使用场景,帮助开发者在RESTful API设计中做出更合理的选择,提升系统效率与可维护性。
2648 1
|
关系型数据库 测试技术 Linux
PostgreSQL配置文件修改及启用方法
总的来说,修改和启用PostgreSQL的配置文件是一个直接而简单的过程。只需要找到配置文件,修改你想要改变的选项,然后重启服务器即可。但是,你需要注意的是,不正确的配置可能会导致服务器性能下降,甚至导致服务器无法启动。因此,在修改配置文件之前,你应该充分理解每个选项的含义和影响,如果可能的话,你应该在测试环境中先进行试验。
1063 72
|
缓存 测试技术 API
解锁开源模型高性能服务:SGLang Runtime 应用场景与实践
SGLang 是一个用于大型语言模型和视觉语言模型的推理框架。
|
Ubuntu 持续交付 API
如何使用 dotnet pack 打包 .NET 跨平台程序集?
`dotnet pack` 是 .NET Core 的 NuGet 包打包工具,用于将代码打包成 NuGet 包。通过命令 `dotnet pack` 可生成 `.nupkg` 文件。使用 `--include-symbols` 和 `--include-source` 选项可分别创建包含调试符号和源文件的包。默认情况下,`dotnet pack` 会先构建项目,可通过 `--no-build` 跳过构建。此外,还可以使用 `--output` 指定输出目录、`-c` 设置配置等。示例展示了创建类库项目并打包的过程。更多详情及命令选项,请参考官方文档。
1056 12
|
SQL 分布式计算 关系型数据库
Clickhouse时间日期函数一文详解+代码展示
Clickhouse时间日期函数一文详解+代码展示
4757 0
Clickhouse时间日期函数一文详解+代码展示
|
缓存 关系型数据库 MySQL
MySQL参数优化之thread_cache_size
MySQL参数优化之thread_cache_size
3948 0
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之遇到消费速度是固定的并且导致了堆积,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
SQL 关系型数据库 MySQL
(十六)MySQL调优篇:单机数据库如何在高并发场景下健步如飞?
在当前的IT开发行业中,系统访问量日涨、并发暴增、线上瓶颈等各种性能问题纷涌而至,性能优化成为了现时代中一个炙手可热的名词,无论是在开发、面试过程中,性能优化都是一个常谈常新的话题。而MySQL作为整个系统的后方大本营,由于是基于磁盘的原因,性能瓶颈往往也会随着流量增大而凸显出来。
1963 0