ActiveMQ:设置多个并行的消费者

简介:

ActiveMQ:设置多个并行的消费者

消息队列本来就是一种经典的生产者与消费者模式。生产者向消息队列中发送消息,消费者从消息队列中获取消息来消费。

消息的传送一般由一个代理来实现的,那就是Message broker(即消息代理)。Message broker有两大职责,一是消息路由,二是数据转换。这就好比A给B寄信,如果不使用邮局的话,就要自己想办法送达,费时费力,而通过邮局的话,只要B的地址在邮局中注册过,那么天涯海角也能送达。这里的邮局扮演的角色就像消息系统中的Message broker。

众所周知,消息队列是典型的’send and forget’原则的体现,生产者只管发送,不管消息的后续处理。为了最大效率的完成对消息队列中的消息的消费,一般可以同时起多个一模一样的消费者,以并行的方式来拉取消息队列中的消息。这样的好处有多个:

  1. 加快处理消息队列中的消息。
  2. 增强稳定性,如果一个消费者出现问题,不会影响对消息队列中消息的处理。

使用Spring JMS来配置多个Listener实例其实也相当简单,只需要配置下MessageListenerContainer就行。

1
2
3
4
5
6
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" value="${jms.queue.name}"/>
    <property name="messageListener" ref="messageReceiver"/>
    <property name="concurrentConsumers" value="4"/>
</bean>

 

 

多配置一个属性concurrentConsumers,设置值为4,就是同时启动4个Listener实例来消费消息。

使用MessageSender来发送100条消息,可以检查消息处理的顺序会发生变化。

1
2
3
    for (int i = 0; i < 100; i++) {
        messageSender.send(String.format("message %d",i));
    }
1
2
3
4
5
6
7
8
9
...
Received: message 4
Received: message 7
Received: message 6
Received: message 5
Received: message 8
Received: message 10
Received: message 9

 

除了设置一个固定的Listener数量,也可以设置一个Listener区间,这样MessageListenerContainer可以根据消息队列中的消息规模自动调整并行数量。

1
2
3
4
5
6
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="${jms.queue.name}"/>
        <property name="messageListener" ref="messageReceiver"/>
        <property name="concurrency" value="4-8"/>
    </bean>

 

这次使用的是concurrency属性,4-8表示最小并发数是4,最大并发数为8,当然也可以给一个固定值,比如5,这样就相当于concurrentConsumers属性了。

原文地址http://www.bieryun.com/1871.html

相关文章
|
消息中间件 Java
RabbitMQ消费者并发数设置
RabbitMQ消费者并发数设置
RabbitMQ消费者并发数设置
|
消息中间件 RocketMQ 存储
rocketMq - 并发消费过程
rocketMq消费过程包括两种,分别是并发消费和有序消费,每个消费方式都可以单独拿出来进行分享,这篇文章单独用来分析并发消费问题。 并发消费需要理解的几个核心点:并发消费的消息拉取,并发消费的消息重试,并发消息的ack机制,消费进度的持久化,这篇分享会就这几个问题分解展开。
3596 0
|
2月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
64 3
|
3月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
2月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
36 0
|
3月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
49 0
|
4月前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
47 1
|
5月前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
|
5月前
|
消息中间件 RocketMQ
在RocketMQ中,消息的读写与生产者消费者的数量以及Broker数量都有关
在RocketMQ中,消息的读写与生产者消费者的数量以及Broker数量都有关
177 1
|
5月前
|
消息中间件 监控 中间件
【工作中问题解决实践 十一】Kafka消费者消费堆积且频繁rebalance
【工作中问题解决实践 十一】Kafka消费者消费堆积且频繁rebalance
335 0
下一篇
无影云桌面