hive Context类和DriverContext类

简介:

在hive的源码中经常可以看到Context类和DriverContext类,咋一看感觉这两个意思差不多,其实其作用区别还是蛮大的:

org.apache.hadoop.hive.ql.Context类
存储job的上下文信息,一个job创建一个Context对象,job运行完后,调用clear方法进行清除
1)初始化/创建/删除中间目录
中间的目录包括local job的和非local job的

1
2
3
4
5
6
7
protected  int  pathid =  10000 ;
private  static  final  String MR_PREFIX =  "-mr-" ;    //对应getMRTmpPath方法
private  static  final  String EXT_PREFIX =  "-ext-" //对应getLocalTmpPath方法
private  static  final  String LOCAL_PREFIX =  "-local-" //对应getExternalTmpPath方法
   private  String nextPathId() {
     return  Integer.toString(pathid++);  //这导致最小的那个目录应该是1000
   }

 比如getMRTmpPath: 

1
2
3
4
   public  Path getMRTmpPath() {
     return  new  Path(getMRScratchDir(), MR_PREFIX +
       nextPathId());
   }

产生的临时文件路径如:

1
hdfs: //xxx:9000/tmp/hive-ericni/hive_2014-12-18_14-37-00_106_3507079460876567552-1/_tmp.-mr-10003

产生临时目录的调用方法如下:

创建临时目录的调用
getMRScratchDir--->getScratchDir方法或者getLocalScratchDir方法(getLocalScratchDir最终也是调用getScratchDir方法,不过传入的参数是localScratchDir)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private  final  Map<String, Path> fsScratchDirs =  new  HashMap<String, Path>();   //用来存放对应关系的hashmap
.....
   private  Path getScratchDir(String scheme, String authority,
                                boolean  mkdir, String scratchDir) {
     String fileSystem =  scheme +  ":"  + authority;  //hdfs:10.100.90.204:9000
     Path dir = fsScratchDirs.get(fileSystem +  "-"  + TaskRunner.getTaskRunnerID());
      //第一次调用getScratchDir方法时,fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID())的值为null(其中
   
     if  (dir ==  null ) {  //第一次运行时,为null,使用Utilities.createDirsWithPermission创建目录,权限为777(hive.scratch.dir.permission设置为777时)
       Path dirPath =  new  Path(scheme, authority,
           scratchDir +  "-"  + TaskRunner.getTaskRunnerID());
       if  (mkdir) {
         try  {
           FileSystem fs = dirPath.getFileSystem(conf);
           dirPath =  new  Path(fs.makeQualified(dirPath).toString());
           FsPermission fsPermission =  new  FsPermission(Short.parseShort(scratchDirPermission.trim(),  8 ));
           if  (!Utilities.createDirsWithPermission(conf, dirPath, fsPermission)) {
             throw  new  RuntimeException( "Cannot make directory: "
                                        + dirPath.toString());
           }
           if  (isHDFSCleanup) {
             fs.deleteOnExit(dirPath);
           }
         catch  (IOException e) {
           throw  new  RuntimeException (e);
         }
       }
       dir = dirPath;
       fsScratchDirs.put(fileSystem +  "-"  + TaskRunner.getTaskRunnerID(), dir);
     }
     return  dir;
   }

2)提供封装方法,操作一些其他的信息

1
2
3
4
比如isLocalOnlyExecutionMode  job是否为localmode setHiveLocks 设置锁的信息,
setHiveTxnManager设置锁的管理类,
getHiveTxnManager获取锁的管理类setNeedLockMgr 设置是否需要锁,
isNeedLockMgr返回是否需要锁等

org.apache.hadoop.hive.ql.DriverContext类

是和job 启动有关系的类,主要初始化两个queue,一个用来存在以及启动的job,一个用来存放可以启动的job

1
2
3
4
5
6
7
   private  Queue<Task<?  extends  Serializable>> runnable;
   private  Queue<TaskRunner> running;
   public  DriverContext(Context ctx) {
     this .runnable =  new  ConcurrentLinkedQueue<Task<?  extends  Serializable>>();
     this .running =  new  LinkedBlockingQueue<TaskRunner>();
     this .ctx = ctx;
   }

常用方法:

addToRunnable,把Task加到到runnable队列中:

1
2
3
4
5
6
7
8
9
public  synchronized  boolean  addToRunnable(Task<?  extends  Serializable> tsk)  throws  HiveException {
     if  (runnable.contains(tsk)) {
       return  false ;
     }
     checkShutdown();
     runnable.add(tsk);
     tsk.setQueued();
     return  true ;
   }

launching 把TaskRunner对象加入到running队列中,其中TaskRunner是一个线程类,用来启动Task:

1
2
3
4
public  synchronized  void  launching(TaskRunner runner)  throws  HiveException { 
     checkShutdown();
     running.add(runner);
   }

getRunnable和job的并发launch有关(默认hive.exec.parallel设置为false),在开启并发launch job时,如果runnable中还有元素,并且running的队列大小小于设置的线程数(默认hive.exec.parallel.thread.number设置为8),则取出runnable中第一个元素,并最终加入到running中

1
2
3
4
5
6
7
public  synchronized  Task<?  extends  Serializable> getRunnable(  int  maxthreads)  throws  HiveException {
     checkShutdown();
     if  ( runnable.peek() !=  null  && running.size() < maxthreads) {
       return  runnable .remove();
     }
     return  null  ;
   }

pollFinished,从running的队列中获取TaskRunner,直到running队列为空,也就是等待所有job运行完毕:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    public  synchronized  TaskRunner pollFinished()  throws  InterruptedException {
     while  (! shutdown) {
       Iterator< TaskRunner> it = running.iterator();
       while  (it.hasNext()) {
         TaskRunner runner = it.next();
         if  (runner !=  null  && !runner.isRunning()) {
           it.remove();
           return  runner;
         }
       }
       wait( SLEEP_TIME);
     }
     return  null  ;
   }


本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1591569,如需转载请自行联系原作者
相关文章
|
SQL 分布式计算 Hadoop
|
SQL 分布式计算 Kubernetes
spark hive类总是优先记载应用里面的jar包,跟spark.{driver/executor}.userClassPathFirst无关
spark hive类总是优先记载应用里面的jar包,跟spark.{driver/executor}.userClassPathFirst无关
294 0
|
SQL JSON Oracle
hive函数大全:11大类、109个函数(下)
hive函数大全:11大类、109个函数
hive函数大全:11大类、109个函数(上)
hive函数大全:11大类、109个函数(上)
|
SQL 大数据 Java
hive(在大数据集合上的类SQL查询和表)学习
hive(在大数据集合上的类SQL查询和表)学习
159 0
|
SQL Java 数据库