RabbitMq交换机/队列的创建源码分析

简介: RabbitMq交换机/队列的创建源码分析

概述

SpringBoot整合RabbitMq思考一个问题,为什么只用在配置类里配置一下交换机/队列/绑定关系,在项目启动后就会自动创建对应的交换机/队列/绑定关系?

@ConfigurationpublicclassRabbitConfig {
@BeanpublicFanoutExchangeexchange(){
returnnewFanoutExchange("TEST");
    }
}


创建交换机/队列/绑定关系的时机到底是什么时候?如何创建的?

测试

  1. 配置如上bean后,启动项目,观察Rabbitmq控制台是否创建该交换机
  1. 并未创建
  1. 发送一条消息到该交换机,观察是否能成功
packagecom.zy.rabbitmq.controller;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.web.bind.annotation.GetMapping;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
/*** @Author: Zy* @Date: 2022/3/14 15:04*/@RestController@RequestMapping("test")
publicclassTestController {
@AutowiredRabbitTemplaterabbitTemplate;
@GetMappingpublicvoidtest(){
rabbitTemplate.convertAndSend("TEST","","HHHH");
    }
}



再次观察管理控制台,惊奇的发现: 有了!


猜想

那么其实并不是在项目启动后就进行的交换机的创建吗?而是在发送消息时才创建的交换机吗?

源码在哪里?

参考文章: https://cloud.tencent.com/developer/article/1668606

源码解析

上篇文章中提到了一个很重要的类: RabbitAdmin

这个类封装了对队列/交换机/绑定关系等一系列的增删改查操作

而在initialize()方法里,就进行了如下操作:

/*** Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe* (but unnecessary) to call this method more than once.*/@Override// NOSONAR complexitypublicvoidinitialize() {
if (this.applicationContext==null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
    }
// 获取spring注册的beanthis.logger.debug("Initializing declarations");
Collection<Exchange>contextExchanges=newLinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue>contextQueues=newLinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding>contextBindings=newLinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
Collection<DeclarableCustomizer>customizers=this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
processDeclarables(contextExchanges, contextQueues, contextBindings);
finalCollection<Exchange>exchanges=filterDeclarables(contextExchanges, customizers);
finalCollection<Queue>queues=filterDeclarables(contextQueues, customizers);
finalCollection<Binding>bindings=filterDeclarables(contextBindings, customizers);
for (Exchangeexchange : exchanges) {
if ((!exchange.isDurable() ||exchange.isAutoDelete())  &&this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("+exchange.getName()
+") durable:"+exchange.isDurable() +", auto-delete:"+exchange.isAutoDelete() +". "+"It will be deleted by the broker if it shuts down, and can be redeclared by closing and "+"reopening the connection.");
   }
    }
for (Queuequeue : queues) {
if ((!queue.isDurable() ||queue.isAutoDelete() ||queue.isExclusive()) &&this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("+queue.getName()
+") durable:"+queue.isDurable() +", auto-delete:"+queue.isAutoDelete() +", exclusive:"+queue.isExclusive() +". "+"It will be redeclared if the broker stops and is restarted while the connection factory is "+"alive, but all messages will be lost.");
   }
    }
if (exchanges.size() ==0&&queues.size() ==0&&bindings.size() ==0&&this.manualDeclarables.size() ==0) {
this.logger.debug("Nothing to declare");
return;
    }
this.rabbitTemplate.execute(channel-> {
declareExchanges(channel, exchanges.toArray(newExchange[exchanges.size()]));
declareQueues(channel, queues.toArray(newQueue[queues.size()]));
declareBindings(channel, bindings.toArray(newBinding[bindings.size()]));
returnnull;
    });
if (this.manualDeclarables.size() >0) {
synchronized (this.manualDeclarables) {
this.logger.debug("Redeclaring manually declared Declarables");
for (Declarabledec : this.manualDeclarables.values()) {
if (decinstanceofQueue) {
declareQueue((Queue) dec);
     }
elseif (decinstanceofExchange) {
declareExchange((Exchange) dec);
     }
else {
declareBinding((Binding) dec);
     }
    }
   }
    }
this.logger.debug("Declarations finished");
 }

 

可以看到从Spring中获取所有注册的交换机/队列/绑定关系的bean,然后进行创建.

而还有另外一个方法afterPropertiesSet(),这个方法就很常见了,在Spring中这个方法在Bean初始化完会进行调用:

而就在这个方法里调用上面的initialize()方法,进行了交换机的初始化绑定操作

/*** If {@link #setAutoStartup(boolean) autoStartup} is set to true, registers a callback on the* {@link ConnectionFactory} to declare all exchanges and queues in the enclosing application context. If the* callback fails then it may cause other clients of the connection factory to fail, but since only exchanges,* queues and bindings are declared failure is not expected.** @see InitializingBean#afterPropertiesSet()* @see #initialize()*/@OverridepublicvoidafterPropertiesSet() {
synchronized (this.lifecycleMonitor) {
if (this.running||!this.autoStartup) {
return;
   }
if (this.retryTemplate==null&&!this.retryDisabled) {
this.retryTemplate=newRetryTemplate();
this.retryTemplate.setRetryPolicy(newSimpleRetryPolicy(DECLARE_MAX_ATTEMPTS));
ExponentialBackOffPolicybackOffPolicy=newExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL);
backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER);
backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL);
this.retryTemplate.setBackOffPolicy(backOffPolicy);
   }
if (this.connectionFactoryinstanceofCachingConnectionFactory&&     ((CachingConnectionFactory) this.connectionFactory).getCacheMode() ==CacheMode.CONNECTION) {
this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
return;
   }
// Prevent stack overflow...finalAtomicBooleaninitializing=newAtomicBoolean(false);
this.connectionFactory.addConnectionListener(connection-> {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...return;
    }
// 调用初始化方法try {
/** ...but it is possible for this to happen twice in the same ConnectionFactory (if more than* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are* declared for every connection. If anyone has a problem with it: use auto-startup="false".*/if (this.retryTemplate!=null) {
this.retryTemplate.execute(c-> {
initialize();
returnnull;
        });
     }
else {
initialize();
     }
    }
finally {
initializing.compareAndSet(true, false);
    }
   });
this.running=true;
    }
 }


那么这个调用链就很清楚了

RabbitAdmin创建后调用afterPropertiesSet方法,afterPropertiesSet方法调用initialize()方法,在initialize方法内部进行了交换机的创建操作

那么RabbitAdmin是什么时候创建的呢?

对于SpringBoot来说,当然是自动配置类里创建的 RabbitAutoConfiguration

@Bean@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix="spring.rabbitmq", name="dynamic", matchIfMissing=true)
@ConditionalOnMissingBeanpublicAmqpAdminamqpAdmin(ConnectionFactoryconnectionFactory) {
returnnewRabbitAdmin(connectionFactory);
}


创建之后,就会进入afterPropertiesSet方法

对于afterPropertiesSet方法,还有一点要注意,并不是进入afterPropertiesSet就会调用initialize,而是添加了一个连接监听器,当有一个新的连接创建时,才会进入initialize方法.

this.connectionFactory.addConnectionListener(connection-> {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...return;
    }
try {
/** ...but it is possible for this to happen twice in the same ConnectionFactory (if more than* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are* declared for every connection. If anyone has a problem with it: use auto-startup="false".*/if (this.retryTemplate!=null) {
this.retryTemplate.execute(c-> {
initialize();
returnnull;
            });
        }
else {
initialize();
        }
    }
finally {
initializing.compareAndSet(true, false);
    }
});


总结

  1. 对于SpringBoot的源码分析,一般都可以从自动配置类中找到源码的入口,结合整个框架的结构debug,才能分析源码的走向.
  2. RabbitMq项目启动后创建对应的交换机/队列/绑定关系看似简单,其实也经过了复杂的过程,特别是看源码的过程中还有很多Spring加载bean的方法,比较吃力,需要加强Spring源码的学习和阅读能力.
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
127 6
|
2月前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
224 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
2月前
|
消息中间件 JSON Java
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
87 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
91 2
|
2月前
|
消息中间件 存储
RabbitMQ-死信交换机和死信队列
死信队列和死信交换机是RabbitMQ提供的一个非常实用的功能,通过合理使用这一机制,可以大大增强系统的健壮性和可靠性。它们不仅能有效解决消息处理失败的情况,还能为系统的错误追踪、消息延迟处理等提供支持。在设计系统的消息体系时,合理规划和使用死信队列和死信交换机,将会为系统的稳定运行提供一个有力的
58 0
|
4月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
4月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
69 0
说说RabbitMQ延迟队列实现原理?
|
消息中间件 Linux
centos7 yum快速安装rabbitmq服务
centos7 yum快速安装rabbitmq服务
225 0