一 场景
服务是多节点部署的,配置由于很少改动又经常被查询,所以将配置数据放入本地缓存中,修改配置后,更新缓存,但是只能更新当前访问节点的缓存,无法修改其他节点的缓存,导致查询数据出现问题。
二 分析
如何在其中一个节点更新缓存也同时更新其他节点的缓存呢?消息队列是否可以解决此问题呢?
三 解决
使用Rabbit的广播模式,可以实现,使用交换机绑定队列,而队列在每一个服务节点启动后都生成一个唯一队列和交换机绑定,这样就实现了当发现消息给交换机,会有多个不同的队列对消息进行处理
四 实现
创建配置,声明交换机,动态队列,绑定交换机队列,绑定监听
/** * @author 子羽 * @Description 广播模式清除本地缓存 * @Date 2021/8/27 */ @Configuration public class CacheRabbitConfig { public static final String MY_FANOUTEXCHANGE_NAME = "local-cache-exchange"; /** * 用UUID来生成一个queue名称,也可以用服务器IP端口作为queue名称 */ public static final String MY_QUEUE_NAME = UUID.randomUUID().toString();
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 创建动态queue 自动删除队列,不然会造成队列堆积
* @return
*/
@Bean
public Queue myQueue() {
return new Queue(MY_QUEUE_NAME, true,false,true);
}
/**
* 创建Exchange
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(MY_FANOUTEXCHANGE_NAME, true, false);
}
/**
* 绑定当前queue到Exchange
* @return
*/
@Bean
public Binding bindingExchangeMyQueue() {
return BindingBuilder.bind(myQueue()).to(fanoutExchange());
}
/**
* 设置消息处理
* @return
*/
@Bean
public SimpleMessageListenerContainer mqMessageContainer(ClearCacheListener clearCacheListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());
container.setQueueNames(MY_QUEUE_NAME);
container.setExposeListenerChannel(true);
//设置每个消费者获取的最大的消息数量
container.setPrefetchCount(1);
//消费者个数
container.setConcurrentConsumers(1);
//设置确认模式为手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//消息处理类
container.setMessageListener(clearCacheListener);
return container;
}
}
```
设置监听实现
/** * @author 子羽 * @Description 监听删除缓存 * @Date 2021/8/27 */ @Component public class ClearCacheListener implements ChannelAwareMessageListener { private static final Logger log = LoggerFactory.getLogger(ClearCacheListener.class); @Autowired private SysConfigService sysConfigService; @Override public void onMessage(Message message, Channel channel){ try { log.info("接收删除系统配置缓存消息,correlationId:[{}]",new String(message.getMessageProperties().getCorrelationId())); // 删除本地缓存 sysConfigService.cleanLocal(); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e) { log.error("处理清除缓存消息失败",e); } } }
建立消息生产者
/** * @author 子羽 * @descript 清除内外部多节点缓存 */ @Component public class ClearCacheProducer extends AbstractRabbitProducer { protected static final Logger logger = LogManager.getCurrentClassLogger();
public ClearCacheProducer(RabbitTemplate rabbitTemplate) {
super(rabbitTemplate, MQConstant.APP_ID);
}
public void sendDelSystemCacheMessage() {
CorrelationData correlationData = getCorrelationData();
logger.info("发送消息至缓存队列,correlationId:{} 消息:[{}]", correlationData.getId(),"清除系统配置缓存");
this.sendMsg("local-cache-exchange", "", "", correlationData);
logger.info("发送消息成功correlationId:{}", correlationData.getId());
}
}
```
调用清除节点缓存
/** * <清空缓存> * * @see [ 类#方法] */ public void clean() { // 先清除本节点缓存 this.cache.clean(); // 清除内外部节点系统配置缓存 clearCacheProducer.sendDelSystemCacheMessage(); }
五 结语
整体实现还是比较简单的,主要的一个比较难想到的是,每个节点会创建出不同的队列,也就是不同节点会产生不同的结果,另外注意的是队列一定要动态队列
return new Queue(MY_QUEUE_NAME, true,false,true);
/**
* Construct a new queue, given a name, durability, exclusive and auto-delete flags.
* @param name the name of the queue.
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
* connection)
* @param autoDelete true if the server should delete the queue when it is no longer in use
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, null);
}
durable 代表是否持久化队列,如果持久化,那么服务重启队列中的数据还在
exclusive 排他队列,如果设定为true,那么这个队列仅对这个连接可见
autoDelete 自动删除,这个自动删除并不是执行一段时间字段删除,而是失去监听后自动删除,这次我设定就是自动删除,因为如果不设定自动删除,重启节点后,会生成一个新队列而原有队列也不会删除,会导致rabbitmq出现队列堆积。