使用RabbitMq清除本地多节点缓存

简介: 使用RabbitMq清除本地多节点缓存

一 场景

服务是多节点部署的,配置由于很少改动又经常被查询,所以将配置数据放入本地缓存中,修改配置后,更新缓存,但是只能更新当前访问节点的缓存,无法修改其他节点的缓存,导致查询数据出现问题。

二 分析

如何在其中一个节点更新缓存也同时更新其他节点的缓存呢?消息队列是否可以解决此问题呢?

三 解决

使用Rabbit的广播模式,可以实现,使用交换机绑定队列,而队列在每一个服务节点启动后都生成一个唯一队列和交换机绑定,这样就实现了当发现消息给交换机,会有多个不同的队列对消息进行处理

四 实现

  1. 创建配置,声明交换机,动态队列,绑定交换机队列,绑定监听

    
    /**
     * @author 子羽
     * @Description 广播模式清除本地缓存
     * @Date 2021/8/27
     */
    @Configuration
    public class CacheRabbitConfig {
    
        public static final String MY_FANOUTEXCHANGE_NAME = "local-cache-exchange";
    
        /**
         * 用UUID来生成一个queue名称,也可以用服务器IP端口作为queue名称
         */
        public static final String MY_QUEUE_NAME = UUID.randomUUID().toString();
    @Autowired
    RabbitTemplate rabbitTemplate;


    /**
     * 创建动态queue 自动删除队列,不然会造成队列堆积
     * @return
     */
    @Bean
    public Queue myQueue() {
        return new Queue(MY_QUEUE_NAME, true,false,true);
    }

    /**
     * 创建Exchange
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(MY_FANOUTEXCHANGE_NAME, true, false);
    }

    /**
     * 绑定当前queue到Exchange
     * @return
     */
    @Bean
    public Binding bindingExchangeMyQueue() {
        return BindingBuilder.bind(myQueue()).to(fanoutExchange());
    }

    /**
     * 设置消息处理
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(ClearCacheListener clearCacheListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());
        container.setQueueNames(MY_QUEUE_NAME);
        container.setExposeListenerChannel(true);
        //设置每个消费者获取的最大的消息数量
        container.setPrefetchCount(1);
        //消费者个数
        container.setConcurrentConsumers(1);
        //设置确认模式为手工确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //消息处理类
        container.setMessageListener(clearCacheListener);
        return container;
    }




}
```
  1. 设置监听实现

    /**
     * @author 子羽
     * @Description 监听删除缓存
     * @Date 2021/8/27
     */
    @Component
    public class ClearCacheListener implements ChannelAwareMessageListener {
    
        private static final Logger log = LoggerFactory.getLogger(ClearCacheListener.class);
    
        @Autowired
        private SysConfigService sysConfigService;
    
        @Override
        public void onMessage(Message message, Channel channel){
            try {
                log.info("接收删除系统配置缓存消息,correlationId:[{}]",new String(message.getMessageProperties().getCorrelationId()));
                // 删除本地缓存
                sysConfigService.cleanLocal();
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }catch (Exception e) {
                log.error("处理清除缓存消息失败",e);
            }
    
        }
    }
  2. 建立消息生产者

    /**
     * @author 子羽
     * @descript 清除内外部多节点缓存
     */
    @Component
    public class ClearCacheProducer extends AbstractRabbitProducer {
    
        protected static final Logger logger = LogManager.getCurrentClassLogger();
    public ClearCacheProducer(RabbitTemplate rabbitTemplate) {
        super(rabbitTemplate, MQConstant.APP_ID);
    }

    public void sendDelSystemCacheMessage() {
        CorrelationData correlationData = getCorrelationData();
        logger.info("发送消息至缓存队列,correlationId:{} 消息:[{}]", correlationData.getId(),"清除系统配置缓存");
        this.sendMsg("local-cache-exchange", "", "", correlationData);
        logger.info("发送消息成功correlationId:{}", correlationData.getId());
    }
}
```
  1. 调用清除节点缓存

    /**
     * <清空缓存>
     *
     * @see [ 类#方法]
     */
    public void clean() {
        // 先清除本节点缓存
        this.cache.clean();
        // 清除内外部节点系统配置缓存
        clearCacheProducer.sendDelSystemCacheMessage();
    }

五 结语

整体实现还是比较简单的,主要的一个比较难想到的是,每个节点会创建出不同的队列,也就是不同节点会产生不同的结果,另外注意的是队列一定要动态队列

return new Queue(MY_QUEUE_NAME, true,false,true);

/**
 * Construct a new queue, given a name, durability, exclusive and auto-delete flags.
 * @param name the name of the queue.
 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
 * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
 * connection)
 * @param autoDelete true if the server should delete the queue when it is no longer in use
 */
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
   this(name, durable, exclusive, autoDelete, null);
}

durable 代表是否持久化队列,如果持久化,那么服务重启队列中的数据还在
exclusive 排他队列,如果设定为true,那么这个队列仅对这个连接可见
autoDelete 自动删除,这个自动删除并不是执行一段时间字段删除,而是失去监听后自动删除,这次我设定就是自动删除,因为如果不设定自动删除,重启节点后,会生成一个新队列而原有队列也不会删除,会导致rabbitmq出现队列堆积。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 RocketMQ
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
926 0
|
7月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
248 32
|
消息中间件 监控 Oracle
消息队列 MQ产品使用合集之启动Namesrv节点时,遇到报错,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
缓存 网络协议 算法
Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)
Golang简单实现 分布式缓存+一致性哈希+节点再平衡(gossip + consistent + rebalance)
426 0
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
canal 消息中间件 缓存
Canal 实战 | 第一篇:SpringBoot 整合 Canal + RabbitMQ 实现监听 MySQL 数据库同步更新 Redis 缓存
Canal 实战 | 第一篇:SpringBoot 整合 Canal + RabbitMQ 实现监听 MySQL 数据库同步更新 Redis 缓存
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
701 0
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
814 0
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
java初中级面试题(SSM+Mysql+微服务(SpringCloud+Dubbo)+消息队列(RocketMQ)+缓存(Redis+MongoDB)+设计模式+搜索引擎(ES)+JVM
966 0
|
消息中间件 存储 Apache
Apache RocketMQ 的 Broker 节点
Apache RocketMQ 的 Broker 节点
303 0