使用RabbitMq清除本地多节点缓存

简介: 使用RabbitMq清除本地多节点缓存

一 场景

服务是多节点部署的,配置由于很少改动又经常被查询,所以将配置数据放入本地缓存中,修改配置后,更新缓存,但是只能更新当前访问节点的缓存,无法修改其他节点的缓存,导致查询数据出现问题。

二 分析

如何在其中一个节点更新缓存也同时更新其他节点的缓存呢?消息队列是否可以解决此问题呢?

三 解决

使用Rabbit的广播模式,可以实现,使用交换机绑定队列,而队列在每一个服务节点启动后都生成一个唯一队列和交换机绑定,这样就实现了当发现消息给交换机,会有多个不同的队列对消息进行处理

四 实现

  1. 创建配置,声明交换机,动态队列,绑定交换机队列,绑定监听

    
    /**
     * @author 子羽
     * @Description 广播模式清除本地缓存
     * @Date 2021/8/27
     */
    @Configuration
    public class CacheRabbitConfig {
    
        public static final String MY_FANOUTEXCHANGE_NAME = "local-cache-exchange";
    
        /**
         * 用UUID来生成一个queue名称,也可以用服务器IP端口作为queue名称
         */
        public static final String MY_QUEUE_NAME = UUID.randomUUID().toString();
    @Autowired
    RabbitTemplate rabbitTemplate;


    /**
     * 创建动态queue 自动删除队列,不然会造成队列堆积
     * @return
     */
    @Bean
    public Queue myQueue() {
        return new Queue(MY_QUEUE_NAME, true,false,true);
    }

    /**
     * 创建Exchange
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(MY_FANOUTEXCHANGE_NAME, true, false);
    }

    /**
     * 绑定当前queue到Exchange
     * @return
     */
    @Bean
    public Binding bindingExchangeMyQueue() {
        return BindingBuilder.bind(myQueue()).to(fanoutExchange());
    }

    /**
     * 设置消息处理
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(ClearCacheListener clearCacheListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());
        container.setQueueNames(MY_QUEUE_NAME);
        container.setExposeListenerChannel(true);
        //设置每个消费者获取的最大的消息数量
        container.setPrefetchCount(1);
        //消费者个数
        container.setConcurrentConsumers(1);
        //设置确认模式为手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //消息处理类
        container.setMessageListener(clearCacheListener);
        return container;
    }




}
```
  1. 设置监听实现

    /**
     * @author 子羽
     * @Description 监听删除缓存
     * @Date 2021/8/27
     */
    @Component
    public class ClearCacheListener implements ChannelAwareMessageListener {
    
        private static final Logger log = LoggerFactory.getLogger(ClearCacheListener.class);
    
        @Autowired
        private SysConfigService sysConfigService;
    
        @Override
        public void onMessage(Message message, Channel channel){
            try {
                log.info("接收删除系统配置缓存消息,correlationId:[{}]",new String(message.getMessageProperties().getCorrelationId()));
                // 删除本地缓存
                sysConfigService.cleanLocal();
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }catch (Exception e) {
                log.error("处理清除缓存消息失败",e);
            }
    
        }
    }
  2. 建立消息生产者

    /**
     * @author 子羽
     * @descript 清除内外部多节点缓存
     */
    @Component
    public class ClearCacheProducer extends AbstractRabbitProducer {
    
        protected static final Logger logger = LogManager.getCurrentClassLogger();
    public ClearCacheProducer(RabbitTemplate rabbitTemplate) {
        super(rabbitTemplate, MQConstant.APP_ID);
    }

    public void sendDelSystemCacheMessage() {
        CorrelationData correlationData = getCorrelationData();
        logger.info("发送消息至缓存队列,correlationId:{} 消息:[{}]", correlationData.getId(),"清除系统配置缓存");
        this.sendMsg("local-cache-exchange", "", "", correlationData);
        logger.info("发送消息成功correlationId:{}", correlationData.getId());
    }
}
```
  1. 调用清除节点缓存

    /**
     * <清空缓存>
     *
     * @see [ 类#方法]
     */
    public void clean() {
        // 先清除本节点缓存
        this.cache.clean();
        // 清除内外部节点系统配置缓存
        clearCacheProducer.sendDelSystemCacheMessage();
    }

五 结语

整体实现还是比较简单的,主要的一个比较难想到的是,每个节点会创建出不同的队列,也就是不同节点会产生不同的结果,另外注意的是队列一定要动态队列

return new Queue(MY_QUEUE_NAME, true,false,true);

/**
 * Construct a new queue, given a name, durability, exclusive and auto-delete flags.
 * @param name the name of the queue.
 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
 * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
 * connection)
 * @param autoDelete true if the server should delete the queue when it is no longer in use
 */
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
   this(name, durable, exclusive, autoDelete, null);
}

durable 代表是否持久化队列,如果持久化,那么服务重启队列中的数据还在
exclusive 排他队列,如果设定为true,那么这个队列仅对这个连接可见
autoDelete 自动删除,这个自动删除并不是执行一段时间字段删除,而是失去监听后自动删除,这次我设定就是自动删除,因为如果不设定自动删除,重启节点后,会生成一个新队列而原有队列也不会删除,会导致rabbitmq出现队列堆积。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9月前
|
消息中间件 RocketMQ
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
304 0
|
2月前
|
缓存 网络协议 算法
Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)
Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)
94 0
|
消息中间件 存储 Apache
Apache RocketMQ 的 Broker 节点
Apache RocketMQ 的 Broker 节点
170 0
|
消息中间件 Java
Java 最常见的面试题:rabbitmq 对集群节点停止顺序有要求吗?
Java 最常见的面试题:rabbitmq 对集群节点停止顺序有要求吗?
|
消息中间件 Java
Java 最常见的面试题:rabbitmq 集群中唯一一个磁盘节点崩溃了会发生什么情况?
Java 最常见的面试题:rabbitmq 集群中唯一一个磁盘节点崩溃了会发生什么情况?
|
消息中间件 Java
Java 最常见的面试题: rabbitmq 每个节点是其他节点的完整拷贝吗?为什么?
Java 最常见的面试题: rabbitmq 每个节点是其他节点的完整拷贝吗?为什么?
|
存储 消息中间件 Java
Java 最常见的面试题:rabbitmq 节点的类型有哪些?
Java 最常见的面试题:rabbitmq 节点的类型有哪些?
|
缓存 前端开发
ehcache jgroups同步,节点重启初始化缓存bug
ehcache jgroups同步,节点重启初始化缓存bug
119 0
|
消息中间件 网络安全
RabbitMQ多机多节点集群部署(下)
RabbitMQ多机多节点集群部署
243 0
|
消息中间件 网络安全 数据安全/隐私保护
RabbitMQ多机多节点集群部署(上)
RabbitMQ多机多节点集群部署
257 0