这里主要说下在task初始化时的distribute purge相关的类:TrackerDistributedCacheManager
并启动
4.6.0的distribute purge行为由4个参数控制。
1
2
3
4
|
local
.cache.size
#默认10737418240
mapreduce.tasktracker.cache.
local
.numberdirectories
#默认10000
mapreduce.tasktracker.cache.
local
.keep.pct
#默认0.95
mapreduce.tasktracker.distributedcache.checkperiod
#默认1分钟
|
在声明一个TrackerDistributedCacheManager类的实例时,会初始化一个CleanupThread类的实例。
1
|
this
.cleanupThread=newCleanupThread(conf);
|
1
2
3
4
5
6
7
8
9
|
publicvoidstartCleanupThread() {
this
.cleanupThread.start();
}
(由TaskTracker类的initialize方法调用
// Initialize DistributedCache
this
.distributedCacheManager=newTrackerDistributedCacheManager(
this
.fConf,taskController);
this
.distributedCacheManager.startCleanupThread();
)
|
CleanupThread是TrackerDistributedCacheManager类的内部类:
其原理是启动一个thread,定期去触发 BaseDirManager类的checkAndCleanup方法,不会阻塞当前进程。间隔时间由mapreduce.tasktracker.distributedcache.checkperiod控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public
void
run() {
while
( running) {
try
{
Thread. sleep(cleanUpCheckPeriod);
baseDirManager.checkAndCleanup();
//调用checkAndCleanup方法
}
catch
(IOException e) {
LOG.error(
"Exception in DistributedCache CleanupThread."
, e);
}
catch
(InterruptedException e) {
LOG.info(
"Cleanup..."
,e);
//To force us to exit cleanly
running =
false
;
}
catch
(Throwable t) {
exitTaskTracker(t);
}
}
}
|
BaseDirManager也是TrackerDistributedCacheManager的内部类,控制distribute cache的删除操作和删除后状态数据的更新:
其中checkAndCleanup方法的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
Collection<CacheStatus> toBeDeletedCache =
new
LinkedList<CacheStatus>();
HashMap<Path, CacheDir> toBeCleanedBaseDir =
new
HashMap<Path, CacheDir>();
.........
for
(Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
//循环遍历mapred.local.dir目录大小和文件量
CacheDir baseDirCounts = baseDir.getValue();
LOG.debug(baseDir.getKey() +
": allowedCacheSize="
+ allowedCacheSize +
",baseDirCounts.size="
+ baseDirCounts.size +
",allowedCacheSubdirs="
+ allowedCacheSubdirs +
",baseDirCounts.subdirs="
+ baseDirCounts.subdirs);
if
(allowedCacheSize < baseDirCounts.size ||
allowedCacheSubdirs < baseDirCounts.subdirs) {
//触发purge的条件(local.cache.size小于某一个目录大小,mapreduce.tasktracker.cache.local.numberdirectories小于某一个文件下的文件数量)
CacheDir tcc =
new
CacheDir();
tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
toBeCleanedBaseDir.put(baseDir.getKey(), tcc);
//生成需要删除的目录的HashMap
}
}
|
实际的删除动作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// do the deletion, after releasing the global lock
for
(CacheStatus cacheStatus : toBeDeletedCache) {
//循环遍历需要删除的cache
cacheStatus. lock.lock();
//获取删除对象的锁
try
{
Path localizedDir = cacheStatus.getLocalizedUniqueDir();
if
(cacheStatus.user ==
null
) {
TrackerDistributedCacheManager. LOG.info(
"Deleted path "
+ localizedDir);
try
{
localFs.delete(localizedDir,
true
);
//public的情况调用FileSystem的delete方法
}
catch
(IOException e) {
TrackerDistributedCacheManager. LOG.warn(
"Could not delete distributed cache empty directory "
+ localizedDir, e);
}
}
else
{
TrackerDistributedCacheManager. LOG.info(
"Deleted path "
+ localizedDir +
" as "
+ cacheStatus.user );
String base = cacheStatus.getBaseDir().toString();
String userDir = TaskTracker.getUserDir(cacheStatus. user);
int
skip = base.length() +
1
+ userDir.length() +
1
;
String relative = localizedDir.toString().substring(skip);
taskController.deleteAsUser(cacheStatus.user , relative);
//private的情况调用TaskController的deleteAsUser的方法
}
deleteCacheInfoUpdate(cacheStatus);
}
finally
{
cacheStatus. lock.unlock();
}
}
|
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1401419,如需转载请自行联系原作者