Nacos 客户端本地缓存及故障转移源码分析(四)

简介: Nacos 客户端本地缓存及故障转移源码分析(四)

2022 年农年最后一篇文章,学习无止境,分享技术博客无比乐趣,记录自己的学习生涯!!!

Nacos 客户端本地缓存及故障转移源码分析

在 Nacos 本地缓存时有的时候必然会出现一些故障,这些故障就需要及时进行处理,涉及到的核心类:ServiceInfoHolder 和 FailoverReactor

本地缓存也分两方面,第一方面是从注册中心获得实例信息会缓存在内存当中,也就是通过 Map 形式承载,这样查询操作都方便;第二方面是通过磁盘文件的方式定时缓存起来,以备不时之需

故障转移也分两方面,第一方面是故障转移的开关是通过文件来标记的;第二方面是当开启故障转移之后,当发生故障时,可以从故障转移备份的文件中来获取服务实例信息

ServiceInfoHolder 功能概述

ServiceInfoHolder:顾名思义,服务信息的持有者;每次客户端从注册中心获取最新的服务信息都会调用该类,其中 processServiceInfo 方法来进行本地化处理,包括更新缓存服务、发布事件、更新本地文件等,核心作用如下:

  • 缓存 ServiceInfo
  • 判断 ServiceInfo 是否更新
  • 发起写入本地缓存
  • 发布变更事件

除了这些核心功能以外,该类在实例化时,还做了本地缓存目录初始化、故障转移初始化等操作,下面来进行详细分析

ServiceInfo 本地内存缓存

ServiceInfo:注册服务等信息,其中包含了服务名称、分组名称、集群信息、实例列表信息、上次更新时间等,所以我们由此得出客户端从服务端注册中心获取到的信息在本地都以 ServiceInfo 作为承载者,ServiceInfoHolder 类又持有了 ServiceInfo,通过一个 ConncurrentMap 来储存

// ServiceInfoHolder.java
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;

这就是 Nacos 客户端对服务端获取到的注册信息的第一层缓存,并且之前在分析 processServiceInfo 方法时,我们已经看到,当服务信息变更时会第一时间更新 ServiceInfoMap 信息

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
  String serviceKey = serviceInfo.getKey();
  if (serviceKey == null) {
    return null;
  }
  ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
  if (isEmptyOrErrorPush(serviceInfo)) {
    //empty or error push, just ignore
    return oldService;
  }
  // 缓存服务信息
  serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
  // 判断注册的实例信息是否有更改
  boolean changed = isChangedServiceInfo(oldService, serviceInfo);
  if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
    serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
  }
  MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
  if (changed) {
    NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                       JacksonUtils.toJson(serviceInfo.getHosts()));
    // 发布实例变更事件:InstancesChangeEvent
    NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));
    // 当服务实例信息变更时就将服务信息写入磁盘中
    DiskCache.write(serviceInfo, cacheDir);
  }
  return serviceInfo;
}

ServiceInfoMap 使用就是这样,当变动实例向其中 put 最新数据即可;当使用实例时,通过 key 进行 get 操作即可

ServiceInfoMap 在 ServiceInfoHolder 构造方法中进行初始化,默认创建一个空的 ConcurrentMap;但当配置了启动时从缓存文件读取信息时,则会从本地缓存进行加载

public ServiceInfoHolder(String namespace, String notifierEventScope, Properties properties) {
  initCacheDir(namespace, properties);
  // 启动时是否从缓存目录读取信息,默认false。
  if (isLoadCacheAtStart(properties)) {
    this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
  } else {
    this.serviceInfoMap = new ConcurrentHashMap<>(16);
  }
  // this 代表 ServiceInfoHolder 类,这里可以立即为两者相互持有对方的引用
  this.failoverReactor = new FailoverReactor(this, cacheDir);
  this.pushEmptyProtection = isPushEmptyProtect(properties);
  this.notifierEventScope = notifierEventScope;
}
private boolean isLoadCacheAtStart(Properties properties) {
  boolean loadCacheAtStart = false;
  // PropertyKeyConst.NAMING_LOAD_CACHE_AT_START=namingLoadCacheAtStart
  if (properties != null && StringUtils
      .isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START))) {
    loadCacheAtStart = ConvertUtils
      .toBoolean(properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
  }
  return loadCacheAtStart;
}

在这边涉及到了本地缓存目录,processServiceInfo 方法中,当服务实例变更时,会看到通过 DiskCache#write 方法向该目录写入 ServiceInfo 信息

本地缓存目录

本地缓存目录 cacheDir 是 ServiceInfoHolder 类中的一个属性,用于指定本地缓存的根目录和故障转移的根目录

在 ServiceInfoHolder 构造方法中,initCacheDir:初始化并且生成缓存目录

// ServiceInfoHolder.java
private void initCacheDir(String namespace, Properties properties) {
  String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);
  String namingCacheRegistryDir = "";
  if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
    namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
  }
  if (!StringUtils.isBlank(jmSnapshotPath)) {
    cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
      + File.separator + FILE_PATH_NAMING + File.separator + namespace;
  } else {
    cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
      + File.separator + FILE_PATH_NAMING + File.separator + namespace;
  }
}

initCacheDir 方法就是生成缓存目录的操作,默认路径:${user.home}/nacos/naming/public,也可以自定义,通过自定义: System.getProperty(JM_SNAPSHOT_PATH_PROPERTY)

这里初始化目录之后,故障转移信息也存储在该目录下

故障转移

在 ServiceInfoHolder 构造方法中,还会初始化一个 FailoverReactor 类,同样是 ServiceInfoHolder 成员变量;FailoverReactor:用来处理故障转移功能的执行者;在初始化该执行者时,它与 ServiceInfoHolder 类相互持有对方的引用,接下来看一下 FailoverReactor 类的构造方法,它的构造方法将它基本上的功能都展现的淋漓尽致了

public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
  // 持有 ServiceInfoHolder 引用
  this.serviceInfoHolder = serviceInfoHolder;
  // 拼接故障目录:${user.home}/nacos/naming/public/failover
  this.failoverDir = cacheDir + FAILOVER_DIR;
  // 初始化 executorService
  this.executorService = new ScheduledThreadPoolExecutor(1, r -> {
    Thread thread = new Thread(r);
    // 守护线程模式运行
    thread.setDaemon(true);
    thread.setName("com.alibaba.nacos.naming.failover");
    return thread;
  });
  // 其他初始化操作,通过 executorService 开启多个定时任务执行
  this.init();
}
  • 持有 ServiceInfoHolder 引用
  • 拼接故障目录:${user.home}/nacos/naming/public/failover,其中 public 也有可能是其他的自定义命名空间
  • 初始化 executorService(执行者服务)
  • Init 方法:通过 executorService 开启多个定时任务执行

FailoverReactor#init 方法执行

在这个方法中开启了三个定时任务,这三个任务其实都是 FailoverReactor 内部类

public void init() {
  // 初始化立即执行,执行间隔 5s,执行任务 SwitchRefresher 
  executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
  // 初始化延迟 30min 执行,执行间隔是 24h,执行任务 DiskFileWriter
  executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
  // backup file on startup if failover directory is empty.
  // 如果故障目录为空,启动时立即执行,立即备份文件
  // 初始化立即执行,执行间隔 10s,执行核心操作:DiskFileWriter
  executorService.schedule(() -> {
    try {
      File cacheDir = new File(failoverDir);
      if (!cacheDir.exists() && !cacheDir.mkdirs()) {
        throw new IllegalStateException("failed to create cache dir: " + failoverDir);
      }
      File[] files = cacheDir.listFiles();
      if (files == null || files.length <= 0) {
        new DiskFileWriter().run();
      }
    } catch (Throwable e) {
      NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
    }
  }, 10000L, TimeUnit.MILLISECONDS);
}

SwitchRefresher 该定时任务处理类,具体源码如下:

class SwitchRefresher implements Runnable {
   long lastModifiedMillis = 0L;
   @Override
   public void run() {
     try {
       File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
       // 文件不存在则退出
       if (!switchFile.exists()) {
         switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
         NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
         return;
       }
       long modified = switchFile.lastModified();
       if (lastModifiedMillis < modified) {
         lastModifiedMillis = modified;
         // 获取故障转移文件内容
         String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                                                             Charset.defaultCharset().toString());
         if (!StringUtils.isEmpty(failover)) {
           String[] lines = failover.split(DiskCache.getLineSeparator());
           for (String line : lines) {
             String line1 = line.trim();
             // 1 代表开启故障转移模式
             if (IS_FAILOVER_MODE.equals(line1)) {
               switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
               NAMING_LOGGER.info("failover-mode is on");
               new FailoverFileReader().run();
             // 0 代表关闭故障转移模式
             } else if (NO_FAILOVER_MODE.equals(line1)) {
               switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
               NAMING_LOGGER.info("failover-mode is off");
             }
           }
         } else {
           switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
         }
       }
     } catch (Throwable e) {
       NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
     }
   }
 }
  • 如果故障转移文件不存在,则直接返回(文件开关)
  • 比较文件修改事件,如果已经修改,则获取故障转移文件中的内容
  • 故障转移文件中存储了 0 和 1 标识,0 表示关闭、1 表示开启
  • 当为开启状态时,执行线程 FailoverFileReader#run

DiskFileWriter 该定时任务处理类,简单来说就是获取 ServiceInfoHolder 中缓存的 ServiceInfo

  • 先判断是否满足写入磁盘,如果条件满足,就将其写入到拼接好的故障目录中,

因为后两个定时任务执行的都是 DiskFileWriter,但是第三个定时任务是有前置的判断的,只要文件不存在就会立即执行把文件写入到本地磁盘中

class DiskFileWriter extends TimerTask {
  @Override
  public void run() {
    Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
    for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
      ServiceInfo serviceInfo = entry.getValue();
      if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
          .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
          .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
          .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
          .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
        continue;
      }
      // 将缓存的服务信息写入到磁盘中
      DiskCache.write(serviceInfo, failoverDir);
    }
  }
}

FailoverFileReader 类详解

顾名思义:故障转移文件读取,基本操作就是读取 failover 目录存储的备份服务信息文件内容,然后转换成 ServiceInfo,并且将所有的 ServiceInfo 存储在 FailoverReactor#ServiceMap 属性中

class FailoverFileReader implements Runnable {
  @Override
  public void run() {
    Map<String, ServiceInfo> domMap = new HashMap<>(16);
    BufferedReader reader = null;
    try {
      File cacheDir = new File(failoverDir);
      if (!cacheDir.exists() && !cacheDir.mkdirs()) {
        throw new IllegalStateException("failed to create cache dir: " + failoverDir);
      }
      // 读取 failover 目录下所有文件,进行遍历处理
      File[] files = cacheDir.listFiles();
      if (files == null) {
        return;
      }
      for (File file : files) {
        // 如果文件不存在则跳过
        if (!file.isFile()) {
          continue;
        }
        // 如果是故障转移标志文件,则跳过
        if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
          continue;
        }
        // 读取文件中的备份内容,转换为 ServiceInfo 对象
        ServiceInfo dom = new ServiceInfo(file.getName());
        try {
          String dataString = ConcurrentDiskUtil
            .getFileContent(file, Charset.defaultCharset().toString());
          reader = new BufferedReader(new StringReader(dataString));
          String json;
          if ((json = reader.readLine()) != null) {
            try {
              dom = JacksonUtils.toObj(json, ServiceInfo.class);
            } catch (Exception e) {
              NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
            }
          }
        } catch (Exception e) {
          NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
        } finally {
          try {
            if (reader != null) {
              reader.close();
            }
          } catch (Exception e) {
            //ignore
          }
        }
        // 将 ServiceInfo 对象放入到 domMap 中
        if (!CollectionUtils.isEmpty(dom.getHosts())) {
          domMap.put(dom.getKey(), dom);
        }
      }
    } catch (Exception e) {
      NAMING_LOGGER.error("[NA] failed to read cache file", e);
    }
    // 读入缓存
    // 最后判断 domMap 不为空,赋值给 serviceMap
    if (domMap.size() > 0) {
      serviceMap = domMap;
    }
  }
}

方法大致流程如下:

  • 读取 failover 目录下所有文件,进行遍历挨个处理
  • 如果文件不存在跳过、如果该文件名是故障转移开关标志文件就跳过
  • 读取文件中的备份内容,转换为 ServiceInfo 对象
  • 若当前 hosts 不为空时,将 ServiceInfo 对象放入到 domMap 中
  • 最后再判断 domMap 不为空,赋值给 serviceMap

但这边有一个问题就是 serviceMap 是从哪里开始用到的,这个其实是我们之前读取实例时用到的 「NacosNamingService#getAllInstances->ServiceInfoHolder#getServiceInfo」 方法

// ServiceInfoHolder.java
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
  NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
  String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  String key = ServiceInfo.getKey(groupedServiceName, clusters);
  // 是否开启了故障转移
  if (failoverReactor.isFailoverSwitch()) {
    return failoverReactor.getService(key);
  }
  return serviceInfoMap.get(key);
}
// FailoverReactor.java
public ServiceInfo getService(String key) {
  ServiceInfo serviceInfo = serviceMap.get(key);
  if (serviceInfo == null) {
    serviceInfo = new ServiceInfo();
    serviceInfo.setName(key);
  }
  return serviceInfo;
}

其实在这里就是一旦开启了故障转移就会先调用 failoverReactor.getService 方法,此方法便是从 ServiceMap 中获取 ServiceInfo 信息

此处待补充流程图 TODO!!!

结尾

至此「Nacos 客户端本地缓存及故障转移源码分析」分析到这里,基本只需要掌握大致的脉路即可

欢迎大家在评论框分享您的看法,喜欢该文章帮忙给个赞👍和收藏,喜欢博客分享的文章内容帮忙给个粉丝位,感谢,感谢!!!

分享个人学习源码的几部曲

  • 设计模式掌握为前提,程序员的内功修炼法,🙅不分语言
  • 不要太追究于细节,捋清大致脉路即可;太过于追究于细节,你会越捋越乱
  • 关注重要的类和方法、核心逻辑
  • 掌握 Debug 技巧,在关键的类和方法多停留,多作分析和记录

更多技术文章可以查看:vnjohn 个人博客

目录
相关文章
|
7月前
|
缓存 前端开发 Java
nacos常见问题之开启鉴权后客户端报403升级版本如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
1631 0
|
4月前
|
缓存 NoSQL Redis
【Azure Redis 缓存】Redission客户端连接Azure:客户端出现 Unable to send PING command over channel
【Azure Redis 缓存】Redission客户端连接Azure:客户端出现 Unable to send PING command over channel
226 3
|
4月前
|
缓存 监控 NoSQL
【Azure Redis 缓存】Azure Redis出现了超时问题后,记录一步一步的排查出异常的客户端连接和所执行命令的步骤
【Azure Redis 缓存】Azure Redis出现了超时问题后,记录一步一步的排查出异常的客户端连接和所执行命令的步骤
|
4月前
|
缓存 NoSQL Java
【Azure Redis 缓存 Azure Cache For Redis】当使用Jedis客户端连接Redis时候,遇见JedisConnectionException: Could not get a resource from the pool / Redis connection los
【Azure Redis 缓存 Azure Cache For Redis】当使用Jedis客户端连接Redis时候,遇见JedisConnectionException: Could not get a resource from the pool / Redis connection los
131 0
|
4月前
|
Java Nacos 开发工具
【Nacos】心跳断了怎么办?!8步排查法+实战代码,手把手教你解决Nacos客户端不发送心跳检测问题,让服务瞬间恢复活力!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心。然而,“客户端不发送心跳检测”的问题时有发生,可能导致服务实例被视为离线。本文介绍如何排查此类问题:确认Nacos服务器地址配置正确;检查网络连通性;查看客户端日志;确保Nacos SDK版本兼容;调整心跳检测策略;验证服务实例注册状态;必要时重启应用;检查影响行为的环境变量。通过这些步骤,通常可定位并解决问题,保障服务稳定运行。
287 0
|
5月前
|
网络安全 Nacos
Nacos客户端配置错误检查
Nacos客户端配置错误检查
204 3
|
5月前
|
缓存 网络安全 Nacos
登录nacos客户端提示no message available
登录nacos客户端提示no message available
|
6月前
|
缓存 NoSQL Java
Redis系列学习文章分享---第四篇(Redis快速入门之Java客户端--商户查询缓存+更新+双写一致+穿透+雪崩+击穿+工具封装)
Redis系列学习文章分享---第四篇(Redis快速入门之Java客户端--商户查询缓存+更新+双写一致+穿透+雪崩+击穿+工具封装)
74 0
|
7月前
|
存储 缓存 Java
探秘MyBatis缓存原理:Cache接口与实现类源码分析
探秘MyBatis缓存原理:Cache接口与实现类源码分析
116 2
探秘MyBatis缓存原理:Cache接口与实现类源码分析
|
7月前
|
Kubernetes 关系型数据库 MySQL
nacos常见问题之客户端不发送心跳检测如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
408 2

热门文章

最新文章