并发
默认情况一下,一个listener对应一个consumer,如果想对应多个,有两种方式。
- 在消费端,配置prefetch和concurrency参数便可以实现消费端MQ并发处理消息
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: # acknowledge-mode: manual # 手动确定(默认自动确认) concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。) max-concurrency: 10 # 消费端的监听最大个数 prefetch: 10 connection-timeout: 15000 # 超时时间
这个是个全局配置,应用里的任何队列对应的listener都至少有5个consumer,但是最好别这么做,因为一般情况下,一个listener对应一个consumer是够用的。只是针对部分场景,才需要一对多。
- 在注解中配置该参数
在RabbitListeners注解上加上concurrency属性
会覆盖配置文件中的参数
- 直接在@RabbitListener上配置
@Component public class SpringBootMsqConsumer { @RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10") public void receive(Message message) { System.out.println("receive message:" + new String(message.getBody())); } }
//@RabbitListener(queues = {"kinson"}, concurrency = "2") @RabbitListener(queues = {"kinson"}, concurency="min-max") // 最小并发数是5,最大并发是10 public void receiver(Message msg, Channel channel) throws InterruptedException { // Thread.sleep(10000); byte[] messageBytes = msg.getBody(); if (messageBytes != null && messageBytes.length > 0) { //打印数据 String message = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("【消3】:{}", message); } }
@RabbitListener( bindings = @QueueBinding( value = @Queue(value = "myQueue1"), exchange = @Exchange(value = "myExchange1"), key = "routingKey1" ), concurrency = "10" ) public void process1(Message message) throws Exception { // 执行程序 }
- 利用@RabbitListener中的concurrency属性进行指定就行,例如上面的
concurrency = “5-10”
就表示最小5个,最大10个consumer。启动消费端应用后,找到spring-boot-direct-queue这个队列的consumer,会发现有5个。
这5个消费者都可以从spring-boot-direct-queue这个队列中获取消息,加快队列中消息的消费速度,提高吞吐量。
限流
我们经过压测,来判断consumer的消费能力,如果单位时间内,consumer到达的消息太多,也可能把消费者压垮。
得到压测数据后,可以在@RabbitListener中配置prefetch count。
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringBootMsqConsumer { @RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10",containerFactory = "mqConsumerlistenerContainer") public void receive(Message message) { System.out.println("receive message:" + new String(message.getBody())); } }
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { @Autowired private CachingConnectionFactory connectionFactory; @Bean(name = "mqConsumerlistenerContainer") public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(50); return factory; } }
factory.setPrefetchCount(50);,就是用于设置prefetch count的,启动后,会在spring-boot-direct-queue队列的consumer中体现出来。
配置成功后,consumer单位时间内接收到消息就是50条。
@Autowired CachingConnectionFactory cachingConnectionFactory; @Bean(name="limitContainerFactory") public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){ SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(cachingConnectionFactory); factory.setConcurrentConsumers(60); // 并发消费者数量 factory.setPrefetchCount(3); // 每次最多拿3个,等这三个处理完之后,再去队列中拿第二批,起到了限流的作用 return factory; }
@Bean("limitContainerFactory") public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setPrefetchCount(2); //每个消费者预取数量 factory.setConcurrentConsumers(60); //并发消费者数量 configurer.configure(factory, connectionFactory); return factory; }
@RabbitHandler @RabbitListener(queues = QueueContent.MESSAGE_QUEUE_NAME,containerFactory = "limitContainerFactory") public void handler(String msg,Channel channel, Message message) throws IOException { ...... }