(七)、Springboot整合rabbitmq集群配置详解
pringboot整合rabbitmq
集群创建方式这里省略
整合开始
1.引入starter
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.详细配置如下
rabbitmq: addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host) # port: ##集群配置 addresses之间用逗号隔开 # addresses: ip:port,ip:port password: admin username: 123456 virtual-host: / # 连接到rabbitMQ的vhost requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s publisher-confirms: #是否启用 发布确认 publisher-reurns: # 是否启用发布返回 connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时 cache: channel.size: # 缓存中保持的channel数量 channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel connection.size: # 缓存的连接数,只有是CONNECTION模式时生效 connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION listener: simple.auto-startup: # 是否启动时自动启动容器 simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto simple.concurrency: # 最小的消费者数量 simple.max-concurrency: # 最大的消费者数量 simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量. simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量. simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒 simple.retry.enabled: # 监听重试是否可用 simple.retry.max-attempts: # 最大重试次数 simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 simple.retry.multiplier: # 应用于上一重试间隔的乘数 simple.retry.max-interval: # 最大重试时间间隔 simple.retry.stateless: # 重试是有状态or无状态 template: mandatory: # 启用强制信息;默认false receive-timeout: # receive() 操作的超时时间 reply-timeout: # sendAndReceive() 操作的超时时间 retry.enabled: # 发送重试是否可用 retry.max-attempts: # 最大重试次数 retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔 retry.multiplier: # 应用于上一重试间隔的乘数 retry.max-interval: #最大重试时间间隔
注:相关配置很多,大家只需要关注一些常用的配置即可
对于发送方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
4 配置RabbitTemplate用于发送消息,RabbitTemplate通过CachingConnectionFactory获取到Connection,然后想指定Exchange发送
对于消费方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
4 配置RabbitListenerContainerFactory
5 配置@RabbitListener/@RabbitHandler用于接收消息
在默认情况下主要的配置如下:
3.Spring AMQP的主要对象
注:如果不了解AMQP请前往官网了解.
4.使用:
通过配置类加载的方式:
package com.yd.demo.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); public static final String RECEIVEDLXEXCHANGE="spring-ex"; public static final String RECEIVEDLXQUEUE="spring-qu1"; public static final String RECEIVEDLXROUTINGKEY="aa"; public static final String DIRECTEXCHANGE="spring-ex"; public static final String MDMQUEUE="mdmQueue"; public static final String TOPICEXCHANGE="spring-top"; @Value("${spring.rabbitmq.addresses}") private String hosts; @Value("${spring.rabbitmq.username}") private String userName; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; /* @Value("${rabbit.channelCacheSize}") private int channelCacheSize;*/ // @Value("${rabbit.port}") // private int port; /* @Autowired private ConfirmCallBackListener confirmCallBackListener; @Autowired private ReturnCallBackListener returnCallBackListener;*/ @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses(hosts); cachingConnectionFactory.setUsername(userName); cachingConnectionFactory.setPassword(password); // cachingConnectionFactory.setChannelCacheSize(channelCacheSize); //cachingConnectionFactory.setPort(port); cachingConnectionFactory.setVirtualHost(virtualHost); //设置连接工厂缓存模式: cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); //缓存连接数 cachingConnectionFactory.setConnectionCacheSize(3); //设置连接限制 cachingConnectionFactory.setConnectionLimit(6); logger.info("连接工厂设置完成,连接地址{}"+hosts); logger.info("连接工厂设置完成,连接用户{}"+userName); return cachingConnectionFactory; } @Bean public RabbitAdmin rabbitAdmin(){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); rabbitAdmin.setAutoStartup(true); rabbitAdmin.setIgnoreDeclarationExceptions(true); rabbitAdmin.declareBinding(bindingMdmQueue()); //声明topic交换器 rabbitAdmin.declareExchange(directExchange()); logger.info("管理员设置完成"); return rabbitAdmin; } @Bean public RabbitListenerContainerFactory listenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //最小消费者数量 factory.setConcurrentConsumers(10); //最大消费者数量 factory.setMaxConcurrentConsumers(10); //一个请求最大处理的消息数量 factory.setPrefetchCount(10); // factory.setChannelTransacted(true); //默认不排队 factory.setDefaultRequeueRejected(true); //手动确认接收到了消息 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); logger.info("监听者设置完成"); return factory; } @Bean public DirectExchange directExchange(){ return new DirectExchange(DIRECTEXCHANGE,true,false); } @Bean public Queue mdmQueue(){ Map arguments = new HashMap<>(); // 绑定该队列到私信交换机 arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE); arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY); logger.info("队列交换机绑定完成"); return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments); } @Bean Binding bindingMdmQueue() { return BindingBuilder.bind(mdmQueue()).to(directExchange()).with(""); } @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMandatory(true); //发布确认 // rabbitTemplate.setConfirmCallback(confirmCallBackListener); // 启用发布返回 // rabbitTemplate.setReturnCallback(returnCallBackListener); logger.info("连接模板设置完成"); return rabbitTemplate; } /* @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPICEXCHANGE,true,false); }*/ /* *//** * @return DirectExchange *//* @Bean public DirectExchange dlxExchange() { return new DirectExchange(RECEIVEDLXEXCHANGE,true,false); } *//* * * @return Queue *//* @Bean public Queue dlxQueue() { return new Queue(RECEIVEDLXQUEUE,true); } *//* * @return Binding *//* @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY); }*/ }
@Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses(hosts); cachingConnectionFactory.setUsername(userName); cachingConnectionFactory.setPassword(password); // cachingConnectionFactory.setChannelCacheSize(channelCacheSize); //cachingConnectionFactory.setPort(port); cachingConnectionFactory.setVirtualHost(virtualHost); //设置连接工厂缓存模式: cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); //缓存连接数 cachingConnectionFactory.setConnectionCacheSize(3); //设置连接限制 cachingConnectionFactory.setConnectionLimit(6); logger.info(“连接工厂设置完成,连接地址{}”+hosts); logger.info(“连接工厂设置完成,连接用户{}”+userName); return cachingConnectionFactory; } @Bean public RabbitAdmin rabbitAdmin(){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); rabbitAdmin.setAutoStartup(true); rabbitAdmin.setIgnoreDeclarationExceptions(true); rabbitAdmin.declareBinding(bindingMdmQueue()); //声明topic交换器 rabbitAdmin.declareExchange(directExchange()); logger.info(“管理员设置完成”); return rabbitAdmin; } @Bean public RabbitListenerContainerFactory listenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //最小消费者数量 factory.setConcurrentConsumers(10); //最大消费者数量 factory.setMaxConcurrentConsumers(10); //一个请求最大处理的消息数量 factory.setPrefetchCount(10); // factory.setChannelTransacted(true); //默认不排队 factory.setDefaultRequeueRejected(true); //手动确认接收到了消息 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); logger.info(“监听者设置完成”); return factory; } @Bean public DirectExchange directExchange(){ return new DirectExchange(DIRECTEXCHANGE,true,false); } @Bean public Queue mdmQueue(){ Map arguments = new HashMap<>(); // 绑定该队列到私信交换机 arguments.put(“x-dead-letter-exchange”,RECEIVEDLXEXCHANGE); arguments.put(“x-dead-letter-routing-key”,RECEIVEDLXROUTINGKEY); logger.info(“队列交换机绑定完成”); return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments); } @Bean Binding bindingMdmQueue() { return BindingBuilder.bind(mdmQueue()).to(directExchange()).with(""); } @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMandatory(true); //发布确认 // rabbitTemplate.setConfirmCallback(confirmCallBackListener); // 启用发布返回 // rabbitTemplate.setReturnCallback(returnCallBackListener); logger.info(“连接模板设置完成”); return rabbitTemplate; } / @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPICEXCHANGE,true,false); }/ / //* * @return DirectExchange // @Bean public DirectExchange dlxExchange() { return new DirectExchange(RECEIVEDLXEXCHANGE,true,false); } // * * @return Queue // @Bean public Queue dlxQueue() { return new Queue(RECEIVEDLXQUEUE,true); } // * @return Binding // @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY); }*/ }