[2021-03-04 094735.986][,][INFO][SimpleAsyncTaskExecutor-1][SimpleMessageListenerContainer$AsyncMessageProcessingConsumerrun1212] Restarting Consumer@504a9352 tags=[{amq.ctag-WWlW_DAeVfp570-uQvvXDg=queue1}], channel=Cached Rabbit Channel AMQChannel(amqproot@xxx.xx.xx.xx,4), conn Proxy@3920f51 Shared Rabbit Connection null, acknowledgeMode=MANUAL local queue size=0 [2021-03-04 094735.987][,][INFO][SimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx [2021-03-04 094735.989][,][ERROR][SimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s). org.springframework.amqp.AmqpConnectException java.net.ConnectException Connection refused (Connection refused) at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java62) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java484) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java626) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java240) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java1797) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1771) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java1752) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java345) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java1604) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java995) [spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] at java.lang.Thread.run(Thread.java748) [1.8.0_275] Caused by java.net.ConnectException Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) ~[1.8.0_275] at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java350) ~[1.8.0_275] at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java206) ~[1.8.0_275] at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java188) ~[1.8.0_275] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java392) ~[1.8.0_275] at java.net.Socket.connect(Socket.java607) ~[1.8.0_275] at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java60) ~[amqp-client-5.1.2.jar5.1.2] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java955) ~[amqp-client-5.1.2.jar5.1.2] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java907) ~[amqp-client-5.1.2.jar5.1.2] at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java1066) ~[amqp-client-5.1.2.jar5.1.2] at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java466) ~[spring-rabbit-2.0.5.RELEASE.jar2.0.5.RELEASE] ... 9 more [2021-03-04 094736.262][,][INFO][iccpaasSimpleAsyncTaskExecutor-2][AbstractConnectionFactorycreateBareConnection463] Attempting to connect to xxx.xx.xx.xx [2021-03-04 094736.264][,][ERROR][iccpaasSimpleAsyncTaskExecutor-2][AbstractMessageListenerContainerredeclareElementsIfNecessary1618] Failed to checkredeclare auto-delete queue(s).
原因:消费者因为这个异常导致消费者线程没有继续重新启动。
解决方案:
- 可以监听ListenerContainerConsumerFailedEvent事件,其定义如下所示:其中有一个属性fatal,fatal为true时表示消费者出现了致命的错误,此时消费者不会自动重试进行重新启动,需要我们在事件处理逻辑中进行重启。当fatal为false时,我们可以忽略该事件,消费者容器中会自动重试启动。
public class ListenerContainerConsumerFailedEvent extends AmqpEvent { private final String reason; private final boolean fatal; private final Throwable throwable; }
- 处理逻辑代码:判断event的fatal是true时,先判断container是否在运行,如果没有在运行则调用start进行启动,然后发送告警信息。
import java.util.Arrays; import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import lombok.extern.slf4j.Slf4j; @Slf4j @Component public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> { @Override public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) { log.error("消费者失败事件发生:{}", event); if (event.isFatal()) { log.error("Stopping container from aborted consumer. Reason::{}", event.getReason(), event.getThrowable()); SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource(); String queueNames = Arrays.toString(container.getQueueNames()); try { try { Thread.sleep(30000); } catch (Exception e) { log.error(e.getMessage()); } //判断此时消费者容器是否正在运行 Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container)); //消费者容器没有在运行时,进行启动 container.start(); log.info("重启队列{}的监听成功", queueNames); } catch (Exception e) { log.error("重启队列{}的监听失败", queueNames, e); } // TODO 短信/邮件/钉钉...告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等... } } }
- 将missingQueuesFatal改成false,可以在抛出QueuesNotAvailableException异常时不改变aborted的值,这样在killOrRestart方法中就会自动自动调用重启的方法,但是这种处理方式仅限于QueuesNotAvailableException异常,不像上面的处理方式具有通用性。
# 即不管什么异常继续无限重启消费者线程 spring.rabbitmq.listener.simple.missing-queues-fatal=false