在hive的源码中经常可以看到Context类和DriverContext类,咋一看感觉这两个意思差不多,其实其作用区别还是蛮大的:
org.apache.hadoop.hive.ql.Context类
存储job的上下文信息,一个job创建一个Context对象,job运行完后,调用clear方法进行清除
1)初始化/创建/删除中间目录
中间的目录包括local job的和非local job的
1
2
3
4
5
6
7
|
protected
int
pathid =
10000
;
private
static
final
String MR_PREFIX =
"-mr-"
;
//对应getMRTmpPath方法
private
static
final
String EXT_PREFIX =
"-ext-"
;
//对应getLocalTmpPath方法
private
static
final
String LOCAL_PREFIX =
"-local-"
;
//对应getExternalTmpPath方法
private
String nextPathId() {
return
Integer.toString(pathid++);
//这导致最小的那个目录应该是1000
}
|
比如getMRTmpPath:
1
2
3
4
|
public
Path getMRTmpPath() {
return
new
Path(getMRScratchDir(), MR_PREFIX +
nextPathId());
}
|
产生的临时文件路径如:
1
|
hdfs:
//xxx:9000/tmp/hive-ericni/hive_2014-12-18_14-37-00_106_3507079460876567552-1/_tmp.-mr-10003
|
产生临时目录的调用方法如下:
创建临时目录的调用
getMRScratchDir--->getScratchDir方法或者getLocalScratchDir方法(getLocalScratchDir最终也是调用getScratchDir方法,不过传入的参数是localScratchDir)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
private
final
Map<String, Path> fsScratchDirs =
new
HashMap<String, Path>();
//用来存放对应关系的hashmap
.....
private
Path getScratchDir(String scheme, String authority,
boolean
mkdir, String scratchDir) {
String fileSystem = scheme +
":"
+ authority;
//hdfs:10.100.90.204:9000
Path dir = fsScratchDirs.get(fileSystem +
"-"
+ TaskRunner.getTaskRunnerID());
//第一次调用getScratchDir方法时,fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID())的值为null(其中
if
(dir ==
null
) {
//第一次运行时,为null,使用Utilities.createDirsWithPermission创建目录,权限为777(hive.scratch.dir.permission设置为777时)
Path dirPath =
new
Path(scheme, authority,
scratchDir +
"-"
+ TaskRunner.getTaskRunnerID());
if
(mkdir) {
try
{
FileSystem fs = dirPath.getFileSystem(conf);
dirPath =
new
Path(fs.makeQualified(dirPath).toString());
FsPermission fsPermission =
new
FsPermission(Short.parseShort(scratchDirPermission.trim(),
8
));
if
(!Utilities.createDirsWithPermission(conf, dirPath, fsPermission)) {
throw
new
RuntimeException(
"Cannot make directory: "
+ dirPath.toString());
}
if
(isHDFSCleanup) {
fs.deleteOnExit(dirPath);
}
}
catch
(IOException e) {
throw
new
RuntimeException (e);
}
}
dir = dirPath;
fsScratchDirs.put(fileSystem +
"-"
+ TaskRunner.getTaskRunnerID(), dir);
}
return
dir;
}
|
2)提供封装方法,操作一些其他的信息
1
2
3
4
|
比如isLocalOnlyExecutionMode job是否为localmode setHiveLocks 设置锁的信息,
setHiveTxnManager设置锁的管理类,
getHiveTxnManager获取锁的管理类setNeedLockMgr 设置是否需要锁,
isNeedLockMgr返回是否需要锁等
|
org.apache.hadoop.hive.ql.DriverContext类
是和job 启动有关系的类,主要初始化两个queue,一个用来存在以及启动的job,一个用来存放可以启动的job
1
2
3
4
5
6
7
|
private
Queue<Task<?
extends
Serializable>> runnable;
private
Queue<TaskRunner> running;
public
DriverContext(Context ctx) {
this
.runnable =
new
ConcurrentLinkedQueue<Task<?
extends
Serializable>>();
this
.running =
new
LinkedBlockingQueue<TaskRunner>();
this
.ctx = ctx;
}
|
常用方法:
addToRunnable,把Task加到到runnable队列中:
1
2
3
4
5
6
7
8
9
|
public
synchronized
boolean
addToRunnable(Task<?
extends
Serializable> tsk)
throws
HiveException {
if
(runnable.contains(tsk)) {
return
false
;
}
checkShutdown();
runnable.add(tsk);
tsk.setQueued();
return
true
;
}
|
launching 把TaskRunner对象加入到running队列中,其中TaskRunner是一个线程类,用来启动Task:
1
2
3
4
|
public
synchronized
void
launching(TaskRunner runner)
throws
HiveException {
checkShutdown();
running.add(runner);
}
|
getRunnable和job的并发launch有关(默认hive.exec.parallel设置为false),在开启并发launch job时,如果runnable中还有元素,并且running的队列大小小于设置的线程数(默认hive.exec.parallel.thread.number设置为8),则取出runnable中第一个元素,并最终加入到running中
1
2
3
4
5
6
7
|
public
synchronized
Task<?
extends
Serializable> getRunnable(
int
maxthreads)
throws
HiveException {
checkShutdown();
if
( runnable.peek() !=
null
&& running.size() < maxthreads) {
return
runnable .remove();
}
return
null
;
}
|
pollFinished,从running的队列中获取TaskRunner,直到running队列为空,也就是等待所有job运行完毕:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public
synchronized
TaskRunner pollFinished()
throws
InterruptedException {
while
(! shutdown) {
Iterator< TaskRunner> it = running.iterator();
while
(it.hasNext()) {
TaskRunner runner = it.next();
if
(runner !=
null
&& !runner.isRunning()) {
it.remove();
return
runner;
}
}
wait( SLEEP_TIME);
}
return
null
;
}
|