RabbitMQ异常重启,部分消费队列不消费问题

简介: RabbitMQ异常重启,部分消费队列不消费问题
[2021-03-04 094735.986][,][INFO][SimpleAsyncTaskExecutor-1][SimpleMessageListenerContainer$AsyncMessageProcessingConsumerrun1212] Restarting Consumer@504a9352 tags=[{amq.ctag-WWlW_DAeVfp570-uQvvXDg=queue1}], channel=Cached Rabbit Channel AMQChannel(amqproot@xxx.xx.xx.xx,4), conn Proxy@3920f51 Shared Rabbit Connection null, acknowledgeMode=MANUAL local queue size=0
[2021-03-04 094735.987][,][INFO][SimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
[2021-03-04 094735.989][,][ERROR][SimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).
org.springframework.amqp.AmqpConnectException java.net.ConnectException Connection refused (Connection refused)
  at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java62) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java484) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java626) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java240) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java1797) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1771) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1752) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java345) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java1604) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java995) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  at java.lang.Thread.run(Thread.java748) [1.8.0_275]
Caused by java.net.ConnectException Connection refused (Connection refused)
  at java.net.PlainSocketImpl.socketConnect(Native Method) ~[1.8.0_275]
  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java350) ~[1.8.0_275]
  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java206) ~[1.8.0_275]
  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java188) ~[1.8.0_275]
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java392) ~[1.8.0_275]
  at java.net.Socket.connect(Socket.java607) ~[1.8.0_275]
  at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java60) ~[amqp-client-5.1.2.jar5.1.2]
  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java955) ~[amqp-client-5.1.2.jar5.1.2]
  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java907) ~[amqp-client-5.1.2.jar5.1.2]
  at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java1066) ~[amqp-client-5.1.2.jar5.1.2]
  at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java466) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE]
  ... 9 more
[2021-03-04 094736.262][,][INFO][iccpaasSimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx
[2021-03-04 094736.264][,][ERROR][iccpaasSimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).

原因:消费者因为这个异常导致消费者线程没有继续重新启动。

解决方案:

  1. 可以监听ListenerContainerConsumerFailedEvent事件,其定义如下所示:其中有一个属性fatal,fatal为true时表示消费者出现了致命的错误,此时消费者不会自动重试进行重新启动,需要我们在事件处理逻辑中进行重启。当fatal为false时,我们可以忽略该事件,消费者容器中会自动重试启动。
public class ListenerContainerConsumerFailedEvent extends AmqpEvent {
  private final String reason;
  private final boolean fatal;
  private final Throwable throwable;
}
  1. 处理逻辑代码:判断event的fatal是true时,先判断container是否在运行,如果没有在运行则调用start进行启动,然后发送告警信息。
import java.util.Arrays;
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        log.error("消费者失败事件发生:{}", event);
        if (event.isFatal()) {
            log.error("Stopping container from aborted consumer. Reason::{}", event.getReason(), event.getThrowable());
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
            String queueNames = Arrays.toString(container.getQueueNames());
            try {
                try {
                    Thread.sleep(30000);
                } catch (Exception e) {
                    log.error(e.getMessage());
                }
                //判断此时消费者容器是否正在运行
                Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
                //消费者容器没有在运行时,进行启动
                container.start();
                log.info("重启队列{}的监听成功", queueNames);
            } catch (Exception e) {
                log.error("重启队列{}的监听失败", queueNames, e);
            }
            // TODO 短信/邮件/钉钉...告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等...
        }
    }
}
  1. 将missingQueuesFatal改成false,可以在抛出QueuesNotAvailableException异常时不改变aborted的值,这样在killOrRestart方法中就会自动自动调用重启的方法,但是这种处理方式仅限于QueuesNotAvailableException异常,不像上面的处理方式具有通用性。
# 即不管什么异常继续无限重启消费者线程
spring.rabbitmq.listener.simple.missing-queues-fatal=false


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
61 1
|
5月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
210 6
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
101 0
|
4月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
130 2
|
5月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
86 0
说说RabbitMQ延迟队列实现原理?
|
6月前
|
消息中间件 监控 应用服务中间件
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
192 1
消息队列 MQ操作报错合集之重启Broker后,积压数出现为负数是什么导致的