概述
SpringBoot整合RabbitMq思考一个问题,为什么只用在配置类里配置一下交换机/队列/绑定关系,在项目启动后就会自动创建对应的交换机/队列/绑定关系?
publicclassRabbitConfig { publicFanoutExchangeexchange(){ returnnewFanoutExchange("TEST"); } }
创建交换机/队列/绑定关系的时机到底是什么时候?如何创建的?
测试
- 配置如上bean后,启动项目,观察Rabbitmq控制台是否创建该交换机
- 并未创建
- 发送一条消息到该交换机,观察是否能成功
packagecom.zy.rabbitmq.controller; importorg.springframework.amqp.rabbit.core.RabbitTemplate; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.web.bind.annotation.GetMapping; importorg.springframework.web.bind.annotation.RequestMapping; importorg.springframework.web.bind.annotation.RestController; /*** @Author: Zy* @Date: 2022/3/14 15:04*/"test") (publicclassTestController { RabbitTemplaterabbitTemplate; publicvoidtest(){ rabbitTemplate.convertAndSend("TEST","","HHHH"); } }
再次观察管理控制台,惊奇的发现: 有了!
猜想
那么其实并不是在项目启动后就进行的交换机的创建吗?而是在发送消息时才创建的交换机吗?
源码在哪里?
参考文章: https://cloud.tencent.com/developer/article/1668606
源码解析
上篇文章中提到了一个很重要的类: RabbitAdmin
这个类封装了对队列/交换机/绑定关系等一系列的增删改查操作
而在initialize()方法里,就进行了如下操作:
/*** Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe* (but unnecessary) to call this method more than once.*/// NOSONAR complexitypublicvoidinitialize() { if (this.applicationContext==null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } // 获取spring注册的beanthis.logger.debug("Initializing declarations"); Collection<Exchange>contextExchanges=newLinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection<Queue>contextQueues=newLinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); Collection<Binding>contextBindings=newLinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); Collection<DeclarableCustomizer>customizers=this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values(); processDeclarables(contextExchanges, contextQueues, contextBindings); finalCollection<Exchange>exchanges=filterDeclarables(contextExchanges, customizers); finalCollection<Queue>queues=filterDeclarables(contextQueues, customizers); finalCollection<Binding>bindings=filterDeclarables(contextBindings, customizers); for (Exchangeexchange : exchanges) { if ((!exchange.isDurable() ||exchange.isAutoDelete()) &&this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("+exchange.getName() +") durable:"+exchange.isDurable() +", auto-delete:"+exchange.isAutoDelete() +". "+"It will be deleted by the broker if it shuts down, and can be redeclared by closing and "+"reopening the connection."); } } for (Queuequeue : queues) { if ((!queue.isDurable() ||queue.isAutoDelete() ||queue.isExclusive()) &&this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("+queue.getName() +") durable:"+queue.isDurable() +", auto-delete:"+queue.isAutoDelete() +", exclusive:"+queue.isExclusive() +". "+"It will be redeclared if the broker stops and is restarted while the connection factory is "+"alive, but all messages will be lost."); } } if (exchanges.size() ==0&&queues.size() ==0&&bindings.size() ==0&&this.manualDeclarables.size() ==0) { this.logger.debug("Nothing to declare"); return; } this.rabbitTemplate.execute(channel-> { declareExchanges(channel, exchanges.toArray(newExchange[exchanges.size()])); declareQueues(channel, queues.toArray(newQueue[queues.size()])); declareBindings(channel, bindings.toArray(newBinding[bindings.size()])); returnnull; }); if (this.manualDeclarables.size() >0) { synchronized (this.manualDeclarables) { this.logger.debug("Redeclaring manually declared Declarables"); for (Declarabledec : this.manualDeclarables.values()) { if (decinstanceofQueue) { declareQueue((Queue) dec); } elseif (decinstanceofExchange) { declareExchange((Exchange) dec); } else { declareBinding((Binding) dec); } } } } this.logger.debug("Declarations finished"); }
可以看到从Spring中获取所有注册的交换机/队列/绑定关系的bean,然后进行创建.
而还有另外一个方法afterPropertiesSet(),这个方法就很常见了,在Spring中这个方法在Bean初始化完会进行调用:
而就在这个方法里调用上面的initialize()方法,进行了交换机的初始化绑定操作
/*** If {@link #setAutoStartup(boolean) autoStartup} is set to true, registers a callback on the* {@link ConnectionFactory} to declare all exchanges and queues in the enclosing application context. If the* callback fails then it may cause other clients of the connection factory to fail, but since only exchanges,* queues and bindings are declared failure is not expected.** @see InitializingBean#afterPropertiesSet()* @see #initialize()*/publicvoidafterPropertiesSet() { synchronized (this.lifecycleMonitor) { if (this.running||!this.autoStartup) { return; } if (this.retryTemplate==null&&!this.retryDisabled) { this.retryTemplate=newRetryTemplate(); this.retryTemplate.setRetryPolicy(newSimpleRetryPolicy(DECLARE_MAX_ATTEMPTS)); ExponentialBackOffPolicybackOffPolicy=newExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL); backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER); backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL); this.retryTemplate.setBackOffPolicy(backOffPolicy); } if (this.connectionFactoryinstanceofCachingConnectionFactory&& ((CachingConnectionFactory) this.connectionFactory).getCacheMode() ==CacheMode.CONNECTION) { this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION"); return; } // Prevent stack overflow...finalAtomicBooleaninitializing=newAtomicBoolean(false); this.connectionFactory.addConnectionListener(connection-> { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again...return; } // 调用初始化方法try { /** ...but it is possible for this to happen twice in the same ConnectionFactory (if more than* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are* declared for every connection. If anyone has a problem with it: use auto-startup="false".*/if (this.retryTemplate!=null) { this.retryTemplate.execute(c-> { initialize(); returnnull; }); } else { initialize(); } } finally { initializing.compareAndSet(true, false); } }); this.running=true; } }
那么这个调用链就很清楚了
RabbitAdmin创建后调用afterPropertiesSet方法,afterPropertiesSet方法调用initialize()方法,在initialize方法内部进行了交换机的创建操作
那么RabbitAdmin是什么时候创建的呢?
对于SpringBoot来说,当然是自动配置类里创建的 RabbitAutoConfiguration
ConnectionFactory.class) (prefix="spring.rabbitmq", name="dynamic", matchIfMissing=true) (publicAmqpAdminamqpAdmin(ConnectionFactoryconnectionFactory) { returnnewRabbitAdmin(connectionFactory); }
创建之后,就会进入afterPropertiesSet方法
对于afterPropertiesSet方法,还有一点要注意,并不是进入afterPropertiesSet就会调用initialize,而是添加了一个连接监听器,当有一个新的连接创建时,才会进入initialize方法.
this.connectionFactory.addConnectionListener(connection-> { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again...return; } try { /** ...but it is possible for this to happen twice in the same ConnectionFactory (if more than* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are* declared for every connection. If anyone has a problem with it: use auto-startup="false".*/if (this.retryTemplate!=null) { this.retryTemplate.execute(c-> { initialize(); returnnull; }); } else { initialize(); } } finally { initializing.compareAndSet(true, false); } });
总结
- 对于SpringBoot的源码分析,一般都可以从自动配置类中找到源码的入口,结合整个框架的结构debug,才能分析源码的走向.
- RabbitMq项目启动后创建对应的交换机/队列/绑定关系看似简单,其实也经过了复杂的过程,特别是看源码的过程中还有很多Spring加载bean的方法,比较吃力,需要加强Spring源码的学习和阅读能力.