Alluxio源码分析定位策略:循环遍历策略RoundRobinPolicy

简介:         循环遍历策略RoundRobinPolicy是一种通过循环遍历方式并且跳过没有足够空间workers的为下一个数据块选择worker的策略。如果没有worker被找到,该策略会返回null。

        循环遍历策略RoundRobinPolicy是一种通过循环遍历方式并且跳过没有足够空间workers的为下一个数据块选择worker的策略。如果没有worker被找到,该策略会返回null。在RoundRobinPolicy内部,有三个十分重要的成员变量,如下:

  // 初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo
  private List<BlockWorkerInfo> mWorkerInfoList;
  
  // mWorkerInfoList列表当前遍历到的索引
  private int mIndex;
  
  // 是否初始化的标志位mInitialized,默认为未初始化false
  private boolean mInitialized = false;
        mWorkerInfoList为初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo,当核心方法getWorkerForNextBlock()第一次被调用进行初始化时,由传入的BlockWorkerInfo列表进行赋值;

        mIndex为mWorkerInfoList列表当前遍历到的索引;

        mInitialized为是否初始化的标志位,默认为未初始化false。

        再看核心方法getWorkerForNextBlock(),代码如下:

  /**
   * The policy uses the first fetch of worker info list as the base, and visits each of them in a
   * round-robin manner in the subsequent calls. The policy doesn't assume the list of worker info
   * in the subsequent calls has the same order from the first, and it will skip the workers that
   * are no longer active.
   *
   * @param workerInfoList the info of the active workers
   * @param blockSizeBytes the size of the block in bytes
   * @return the address of the worker to write to
   */
  @Override
  public WorkerNetAddress getWorkerForNextBlock(List<BlockWorkerInfo> workerInfoList,
      long blockSizeBytes) {
    
	// 第一次执行该方法时,未初始化的话,先进行初始化
	if (!mInitialized) {
		
      // 将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList
      mWorkerInfoList = workerInfoList;
      
      // 对mWorkerInfoList进行shuffle,避免热点问题
      Collections.shuffle(mWorkerInfoList);
      
      // mWorkerInfoList列表当前遍历到的索引初始化为0,默认从第一个开始
      mIndex = 0;
      
      // 是否初始化的标志位mInitialized设置为true
      mInitialized = true;
    }

    // at most try all the workers
	
	// 遍历成员变量BlockWorkerInfo列表mWorkerInfoList:
    for (int i = 0; i < mWorkerInfoList.size(); i++) {
    	
      // 取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate
      WorkerNetAddress candidate = mWorkerInfoList.get(mIndex).getNetAddress();
      
      // 调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中,
      // 根据上述地址candidate找到对应的BlockWorkerInfo
      BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate);
      
      // 索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余,
      // 也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历
      mIndex = (mIndex + 1) % mWorkerInfoList.size();
      
      // 如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate
      if (workerInfo != null && workerInfo.getCapacityBytes() >= blockSizeBytes) {
        return candidate;
      }
    }
    
    // 选择不到的话,返回null
    return null;
  }
        当第一次进入getWorkerForNextBlock()方法时,由于标志位mInitialized默认为false,那么我们需要先做初始化工作,如下:

        1、将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList;

        2、对mWorkerInfoList进行shuffle,避免热点问题;

        3、mWorkerInfoList列表当前遍历到的索引mIndex初始化为0,默认从第一个开始;

        4、是否初始化的标志位mInitialized设置为true。

        初始化后,连同后续每次进入getWorkerForNextBlock()方法时,我们就可以开始选择worker的操作了,大体流程如下:

        1、遍历成员变量BlockWorkerInfo列表mWorkerInfoList:

              1.1、取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate;

              1.2、调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中,根据上述地址candidate找到对应的BlockWorkerInfo;

              1.3、索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余,也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历;

              1.4、如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate,否则继续遍历下一个位置;

        2、选择不到的话,返回null。

        很简单,上述流程描述的应该很详细。下面我们再看下findBlockWorkerInfo()方法,代码如下:

  /**
   * 根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo
   * 
   * @param workerInfoList the list of worker info
   * @param address the address to look for
   * @return the worker info in the list that matches the host name, null if not found
   */
  private BlockWorkerInfo findBlockWorkerInfo(List<BlockWorkerInfo> workerInfoList,
      WorkerNetAddress address) {
	  
	// 遍历workerInfoList中的每个BlockWorkerInfo:
    for (BlockWorkerInfo info : workerInfoList) {
    	
      // 判断地址是否相同,相同的话直接返回,否则继续遍历下一个
      if (info.getNetAddress().equals(address)) {
        return info;
      }
    }
    
    // 列表中不存在的话,最终返回null
    return null;
  }
        它负责根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo,遍历workerInfoList中的每个BlockWorkerInfo,判断地址是否相同,相同的话直接返回,否则继续遍历下一个,列表中不存在的话,最终返回null。

        循环遍历策略RoundRobinPolicy有一个显著的问题,既然初始化时mWorkerInfoList已经确定了,那么当worker增减时它是如何处理的呢?还是说根本无法应对这种情况?留待以后慢慢分析吧!



相关文章
|
7月前
|
消息中间件 存储 负载均衡
深度解析Kafka分区策略的精妙之处
深度解析Kafka分区策略的精妙之处
668 1
|
7月前
|
监控 固态存储 安全
源码剖析:Elasticsearch 段合并调度及优化手段
源码剖析:Elasticsearch 段合并调度及优化手段
73 0
|
7月前
|
存储 算法 数据管理
C++中利用随机策略优化二叉树操作效率的实现方法
C++中利用随机策略优化二叉树操作效率的实现方法
117 1
|
7月前
|
存储 缓存
认真研究ConcurrentHashMap中的元素统计策略
认真研究ConcurrentHashMap中的元素统计策略
56 0
|
存储 缓存 前端开发
伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】
伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】
166 0
|
存储 Java 测试技术
4.3 Java数组性能优化策略:数组与集合性能对比分析
4.3 Java数组性能优化策略:数组与集合性能对比分析
170 0
|
存储 NoSQL 算法
【Redis核心原理专题】(1)「技术提升系列」分析探究如何实现LFU的热点key发现机制以及内部的Scan扫描技术的原理
【Redis核心原理专题】(1)「技术提升系列」分析探究如何实现LFU的热点key发现机制以及内部的Scan扫描技术的原理
200 0
【Redis核心原理专题】(1)「技术提升系列」分析探究如何实现LFU的热点key发现机制以及内部的Scan扫描技术的原理
|
分布式计算 Java
Mapreduce执行机制之提交任务和切片原理
Mapreduce执行机制之提交任务和切片原理
103 0
集合或映射迭代过程进行删除或修改操作的时候会导致并发异常
集合或映射迭代过程进行删除或修改操作的时候会导致并发异常
150 0
集合或映射迭代过程进行删除或修改操作的时候会导致并发异常
|
负载均衡 安全 Java
不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作
本文在前一篇文章的基础上来继续分析Ribbon的核心内容。 不懂Ribbon原理的可以进来看看哦,分析SpringBoot自动装配完成了Ribbon哪些核心操作
不懂Ribbon原理的可以进来看看哦,分析RibbonClientConfiguration完成了哪些核心初始操作