这里主要说下在task初始化时的distribute purge相关的类:TrackerDistributedCacheManager
4.6.0的distribute purge行为由4个参数控制。
1
2
3
4
local .cache.size  #默认10737418240
mapreduce.tasktracker.cache. local .numberdirectories  #默认10000
mapreduce.tasktracker.cache. local .keep.pct  #默认0.95
mapreduce.tasktracker.distributedcache.checkperiod  #默认1分钟
在声明一个TrackerDistributedCacheManager类的实例时,会初始化一个CleanupThread类的实例。
1
this .cleanupThread=newCleanupThread(conf);
并启动
1
2
3
4
5
6
7
8
9
publicvoidstartCleanupThread() {
this .cleanupThread.start();
   }
(由TaskTracker类的initialize方法调用
// Initialize DistributedCache
this .distributedCacheManager=newTrackerDistributedCacheManager(
this .fConf,taskController);
this .distributedCacheManager.startCleanupThread();
CleanupThread是TrackerDistributedCacheManager类的内部类:
其原理是启动一个thread,定期去触发 BaseDirManager类的checkAndCleanup方法,不会阻塞当前进程。间隔时间由mapreduce.tasktracker.distributedcache.checkperiod控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public  void  run() {
   while  ( running) {
     try  {
       Thread. sleep(cleanUpCheckPeriod);
       baseDirManager.checkAndCleanup();   //调用checkAndCleanup方法
     catch  (IOException e) {
       LOG.error( "Exception in DistributedCache CleanupThread."  , e);
     catch (InterruptedException e) {
       LOG.info( "Cleanup..."  ,e);
       //To force us to exit cleanly
       running =  false ;
     catch  (Throwable t) {
       exitTaskTracker(t);
     }
   }
}
BaseDirManager也是TrackerDistributedCacheManager的内部类,控制distribute cache的删除操作和删除后状态数据的更新:
其中checkAndCleanup方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
     Collection<CacheStatus> toBeDeletedCache =  new  LinkedList<CacheStatus>();
     HashMap<Path, CacheDir> toBeCleanedBaseDir =  new  HashMap<Path, CacheDir>();
.........
          for  (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {  //循环遍历mapred.local.dir目录大小和文件量
           CacheDir baseDirCounts = baseDir.getValue();
           LOG.debug(baseDir.getKey() +  ": allowedCacheSize="  + allowedCacheSize +
               ",baseDirCounts.size="  + baseDirCounts.size +
               ",allowedCacheSubdirs="  + allowedCacheSubdirs +
               ",baseDirCounts.subdirs="  + baseDirCounts.subdirs);
           if  (allowedCacheSize < baseDirCounts.size ||
               allowedCacheSubdirs < baseDirCounts.subdirs) {  //触发purge的条件(local.cache.size小于某一个目录大小,mapreduce.tasktracker.cache.local.numberdirectories小于某一个文件下的文件数量)
             CacheDir tcc =  new  CacheDir();
             tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
             tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
             toBeCleanedBaseDir.put(baseDir.getKey(), tcc);  //生成需要删除的目录的HashMap
           }
         }
实际的删除动作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// do the deletion, after releasing the global lock
for  (CacheStatus cacheStatus : toBeDeletedCache) {   //循环遍历需要删除的cache
   cacheStatus. lock.lock();   //获取删除对象的锁
   try  {
     Path localizedDir = cacheStatus.getLocalizedUniqueDir();
     if  (cacheStatus.user ==  null ) {
       TrackerDistributedCacheManager. LOG.info( "Deleted path "  + localizedDir);
       try  {
         localFs.delete(localizedDir,  true );   //public的情况调用FileSystem的delete方法
       catch  (IOException e) {
         TrackerDistributedCacheManager. LOG.warn( "Could not delete distributed cache empty directory "
                  + localizedDir, e);
       }
     else 
       TrackerDistributedCacheManager. LOG.info( "Deleted path "  + localizedDir +  " as "  + cacheStatus.user );
       String base = cacheStatus.getBaseDir().toString();
       String userDir = TaskTracker.getUserDir(cacheStatus. user);
       int  skip = base.length() +  1  + userDir.length() +  1 ;
       String relative = localizedDir.toString().substring(skip);
       taskController.deleteAsUser(cacheStatus.user , relative);   //private的情况调用TaskController的deleteAsUser的方法
     }
     deleteCacheInfoUpdate(cacheStatus);
   finally  {
     cacheStatus. lock.unlock();
   }
}