Alluxio源码分析定位策略:循环遍历策略RoundRobinPolicy

简介:         循环遍历策略RoundRobinPolicy是一种通过循环遍历方式并且跳过没有足够空间workers的为下一个数据块选择worker的策略。如果没有worker被找到,该策略会返回null。

        循环遍历策略RoundRobinPolicy是一种通过循环遍历方式并且跳过没有足够空间workers的为下一个数据块选择worker的策略。如果没有worker被找到,该策略会返回null。在RoundRobinPolicy内部,有三个十分重要的成员变量,如下:

  // 初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo
  private List<BlockWorkerInfo> mWorkerInfoList;
  
  // mWorkerInfoList列表当前遍历到的索引
  private int mIndex;
  
  // 是否初始化的标志位mInitialized,默认为未初始化false
  private boolean mInitialized = false;
        mWorkerInfoList为初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo,当核心方法getWorkerForNextBlock()第一次被调用进行初始化时,由传入的BlockWorkerInfo列表进行赋值;

        mIndex为mWorkerInfoList列表当前遍历到的索引;

        mInitialized为是否初始化的标志位,默认为未初始化false。

        再看核心方法getWorkerForNextBlock(),代码如下:

  /**
   * The policy uses the first fetch of worker info list as the base, and visits each of them in a
   * round-robin manner in the subsequent calls. The policy doesn't assume the list of worker info
   * in the subsequent calls has the same order from the first, and it will skip the workers that
   * are no longer active.
   *
   * @param workerInfoList the info of the active workers
   * @param blockSizeBytes the size of the block in bytes
   * @return the address of the worker to write to
   */
  @Override
  public WorkerNetAddress getWorkerForNextBlock(List<BlockWorkerInfo> workerInfoList,
      long blockSizeBytes) {
    
	// 第一次执行该方法时,未初始化的话,先进行初始化
	if (!mInitialized) {
		
      // 将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList
      mWorkerInfoList = workerInfoList;
      
      // 对mWorkerInfoList进行shuffle,避免热点问题
      Collections.shuffle(mWorkerInfoList);
      
      // mWorkerInfoList列表当前遍历到的索引初始化为0,默认从第一个开始
      mIndex = 0;
      
      // 是否初始化的标志位mInitialized设置为true
      mInitialized = true;
    }

    // at most try all the workers
	
	// 遍历成员变量BlockWorkerInfo列表mWorkerInfoList:
    for (int i = 0; i < mWorkerInfoList.size(); i++) {
    	
      // 取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate
      WorkerNetAddress candidate = mWorkerInfoList.get(mIndex).getNetAddress();
      
      // 调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中,
      // 根据上述地址candidate找到对应的BlockWorkerInfo
      BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate);
      
      // 索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余,
      // 也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历
      mIndex = (mIndex + 1) % mWorkerInfoList.size();
      
      // 如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate
      if (workerInfo != null && workerInfo.getCapacityBytes() >= blockSizeBytes) {
        return candidate;
      }
    }
    
    // 选择不到的话,返回null
    return null;
  }
        当第一次进入getWorkerForNextBlock()方法时,由于标志位mInitialized默认为false,那么我们需要先做初始化工作,如下:

        1、将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList;

        2、对mWorkerInfoList进行shuffle,避免热点问题;

        3、mWorkerInfoList列表当前遍历到的索引mIndex初始化为0,默认从第一个开始;

        4、是否初始化的标志位mInitialized设置为true。

        初始化后,连同后续每次进入getWorkerForNextBlock()方法时,我们就可以开始选择worker的操作了,大体流程如下:

        1、遍历成员变量BlockWorkerInfo列表mWorkerInfoList:

              1.1、取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate;

              1.2、调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中,根据上述地址candidate找到对应的BlockWorkerInfo;

              1.3、索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余,也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历;

              1.4、如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate,否则继续遍历下一个位置;

        2、选择不到的话,返回null。

        很简单,上述流程描述的应该很详细。下面我们再看下findBlockWorkerInfo()方法,代码如下:

  /**
   * 根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo
   * 
   * @param workerInfoList the list of worker info
   * @param address the address to look for
   * @return the worker info in the list that matches the host name, null if not found
   */
  private BlockWorkerInfo findBlockWorkerInfo(List<BlockWorkerInfo> workerInfoList,
      WorkerNetAddress address) {
	  
	// 遍历workerInfoList中的每个BlockWorkerInfo:
    for (BlockWorkerInfo info : workerInfoList) {
    	
      // 判断地址是否相同,相同的话直接返回,否则继续遍历下一个
      if (info.getNetAddress().equals(address)) {
        return info;
      }
    }
    
    // 列表中不存在的话,最终返回null
    return null;
  }
        它负责根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo,遍历workerInfoList中的每个BlockWorkerInfo,判断地址是否相同,相同的话直接返回,否则继续遍历下一个,列表中不存在的话,最终返回null。

        循环遍历策略RoundRobinPolicy有一个显著的问题,既然初始化时mWorkerInfoList已经确定了,那么当worker增减时它是如何处理的呢?还是说根本无法应对这种情况?留待以后慢慢分析吧!



相关文章
|
7月前
|
NoSQL API 调度
.NET开源的轻量化定时任务调度,支持临时的延时任务和重复循环任务(可持久化) - FreeScheduler
.NET开源的轻量化定时任务调度,支持临时的延时任务和重复循环任务(可持久化) - FreeScheduler
107 0
|
15天前
|
消息中间件 存储 负载均衡
深度解析Kafka分区策略的精妙之处
深度解析Kafka分区策略的精妙之处
31 1
|
30天前
|
存储 算法 数据管理
C++中利用随机策略优化二叉树操作效率的实现方法
C++中利用随机策略优化二叉树操作效率的实现方法
77 1
|
8月前
|
存储 Java 测试技术
4.3 Java数组性能优化策略:数组与集合性能对比分析
4.3 Java数组性能优化策略:数组与集合性能对比分析
99 0
|
10月前
|
存储 缓存 前端开发
伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】
伙伴匹配推荐接口的优化策略【优先队列+多线程分批处理,java实现】
116 0
|
NoSQL Redis 数据库
删除策略-过期数据的概念|学习笔记
快速学习删除策略-过期数据的概念
63 0
删除策略-过期数据的概念|学习笔记
集合或映射迭代过程进行删除或修改操作的时候会导致并发异常
集合或映射迭代过程进行删除或修改操作的时候会导致并发异常
125 0
集合或映射迭代过程进行删除或修改操作的时候会导致并发异常
|
Java 数据库连接 API
HashMap 的 7 种遍历方式与性能分析!「修正篇」(上)
HashMap 的 7 种遍历方式与性能分析!「修正篇」
103 0
HashMap 的 7 种遍历方式与性能分析!「修正篇」(上)
|
安全 测试技术 API
HashMap 的 7 种遍历方式与性能分析!「修正篇」(下)
HashMap 的 7 种遍历方式与性能分析!「修正篇」
173 1
HashMap 的 7 种遍历方式与性能分析!「修正篇」(下)
|
Oracle Java 关系型数据库
HashMap 的 7 种遍历方式与性能分析!「修正篇」(中)
HashMap 的 7 种遍历方式与性能分析!「修正篇」
144 0
HashMap 的 7 种遍历方式与性能分析!「修正篇」(中)