hive创建目录时相关的几个hdfs中的类:
1
2
3
|
org.apache.hadoop.hdfs.DistributedFileSystem,FileSystem 的具体实现类
org.apache.hadoop.hdfs.DFSClient,client操作hdfs文件系统的类
org.apache.hadoop.fs.permission.FsPermission 文件权限相关类,主要的方法有getUMask和applyUMask方法
|
org.apache.hadoop.hdfs.DistributedFileSystem中需要注意的几个方法:
initialize,主要用来初始DFSClient的实例:
1
2
3
4
5
6
7
8
9
10
11
12
|
@Override
public
void
initialize(URI uri, Configuration conf)
throws
IOException {
super
.initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if
(host ==
null
) {
throw
new
IOException(
"Incomplete HDFS URI, no host: "
+ uri);
}
this
.dfs =
new
DFSClient(uri, conf, statistics);
this
.uri = URI.create(uri.getScheme()+
"://"
+uri.getAuthority());
this
.workingDir = getHomeDirectory();
}
|
mkdir用来创建一个目录,mkdirs用来创建多个目录(类似于mkdir -p):
1
2
3
4
5
6
7
8
|
public
boolean
mkdir(Path f, FsPermission permission)
throws
IOException {
statistics.incrementWriteOps(
1
);
return
dfs.mkdirs(getPathName(f), permission,
false
);
}
public
boolean
mkdirs(Path f, FsPermission permission)
throws
IOException {
statistics.incrementWriteOps(
1
);
return
dfs.mkdirs(getPathName(f), permission,
true
);
}
|
两者最终调用的都是DFSClient.mkdirs方法,org.apache.hadoop.hdfs.DFSClient的mkdirs方法:
1
2
3
4
5
6
7
8
9
10
|
final
Conf dfsClientConf;
...
public
boolean
mkdirs(String src, FsPermission permission,
boolean
createParent)
throws
IOException {
if
(permission ==
null
) {
//如果传入的权限为null
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
return
primitiveMkdir(src, masked, createParent);
//调用primitiveMkdir方法
}
|
这里需要注意 FsPermission.getDefault方法和Conf.uMask属性(Conf是DFSClient的内部类,主要用来设置默认配置)
Conf.uMask属性:
1
|
uMask = FsPermission.getUMask(conf);
//由getUMask获取
|
getUMask方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public
static
final
String DEPRECATED_UMASK_LABEL =
"dfs.umask"
;
public
static
final
String UMASK_LABEL =
CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY;
//fs.permissions.umask-mode
public
static
final
int
DEFAULT_UMASK =
CommonConfigurationKeys.FS_PERMISSIONS_UMASK_DEFAULT;
//0022
public
static
FsPermission getUMask(Configuration conf) {
int
umask = DEFAULT_UMASK;
if
(conf !=
null
) {
String confUmask = conf.get(UMASK_LABEL);
int
oldUmask = conf.getInt(DEPRECATED_UMASK_LABEL, Integer.MIN_VALUE);
//老的配置项:dfs.umask,默认值为Integer.MIN_VALUE(-2147483648)
try
{
if
(confUmask !=
null
) {
//如果设置了fs.permissions.umask-mode,则按这个umask,否则为默认的umask(0022)
umask =
new
UmaskParser(confUmask).getUMask();
}
}
catch
(IllegalArgumentException iae) {
// Provide more explanation for user-facing message
String type = iae
instanceof
NumberFormatException ?
"decimal"
:
"octal or symbolic"
;
String error =
"Unable to parse configuration "
+ UMASK_LABEL
+
" with value "
+ confUmask +
" as "
+ type +
" umask."
;
LOG.warn(error);
// If oldUmask is not set, then throw the exception
if
(oldUmask == Integer.MIN_VALUE) {
throw
new
IllegalArgumentException(error);
}
}
if
(oldUmask != Integer.MIN_VALUE) {
//如果手动设置了老的配置项dfs.umask
if
(umask != oldUmask) {
//并且dfs.umask的值不等于0022
LOG.warn(DEPRECATED_UMASK_LABEL
+
" configuration key is deprecated. "
+
"Convert to "
+ UMASK_LABEL +
", using octal or symbolic umask "
+
"specifications."
);
// Old and new umask values do not match - Use old umask
umask = oldUmask;
//umask为默认值0022
}
}
}
return
new
FsPermission((
short
)umask);
}
|
在hive中创建hdfs的目录有两种方法
1)通过Utilities的createDirsWithPermission方法,这种方法会重设fs.permissions.umask-mode
2)直接通过DistributedFileSystem的mkdirs方法创建
两者最终都是调用了DFSClient的mkdirs方法,不同的是调用Utilities.createDirsWithPermission创建的目录权限在proxy时权限有可能是777(因为手动设置了权限为777),
比如:
Context类的构造函数中创建临时文件目录通过Context.getMRScratchDir调getLocalScratchDir(local job)或getScratchDir(非local job),其中getScratchDir中调用Utilities.createDirsWithPermission方法调用目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
public
static
boolean
createDirsWithPermission(Configuration conf, Path mkdirPath,
FsPermission fsPermission,
boolean
recursive)
throws
IOException {
String origUmask =
null
;
LOG.warn(
"Create dirs "
+ mkdirPath +
" with permission "
+ fsPermission +
" recursive "
+
recursive);
if
(recursive) {
//如果recursive为true,设置fs.permissions.umask-mode为000,
//默认情况下recursive = SessionState.get().isHiveServerQuery() &&conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal);
//即时来自hiveserver的请求,并且开启了doas,这里还会把权限设置为777(这里我增加了一个逻辑,如果设置了proxy,recursive也为true)
/**
boolean recursive = false;
if (SessionState.get() != null) {
recursive = (SessionState.get().isHiveServerQuery() &&
conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,
HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal))||(HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_USE_CUSTOM_PROXY));
fsPermission = new FsPermission((short)00777);
}
*/
origUmask = conf.get(
"fs.permissions.umask-mode"
);
conf.set(
"fs.permissions.umask-mode"
,
"000"
);
}
FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
//这里是DFSClient的实例
boolean
retval =
false
;
try
{
retval = fs.mkdirs(mkdirPath, fsPermission);
resetConfAndCloseFS(conf, recursive, origUmask, fs);
}
catch
(IOException ioe) {
try
{
resetConfAndCloseFS(conf, recursive, origUmask, fs);
//调用resetConfAndCloseFS,reset fs.permissions.umask-mode的设置
}
catch
(IOException e) {
// do nothing - double failure
}
}
return
retval;
}
|
resetConfAndCloseFS方法用来重设fs.permissions.umask-mode的设置,这样如果后面创建目录不是使用Utilities.createDirsWithPermission就会使用这个重设的配置
1
2
3
4
5
6
7
8
9
10
11
|
private
static
void
resetConfAndCloseFS (Configuration conf,
boolean
unsetUmask,
String origUmask, FileSystem fs)
throws
IOException {
if
(unsetUmask) {
//unsetUmask为true,即recursive为true的话,需要重设fs.permissions.umask-mode
if
(origUmask !=
null
) {
//如果有设置项的话,使用设置项
conf.set(
"fs.permissions.umask-mode"
, origUmask);
}
else
{
conf.unset(
"fs.permissions.umask-mode"
);
//这里虽然可以unset,后面会有默认值
}
}
fs.close();
}
|
通过查看DFSClient的源码,发现在DFSClient的构造函数中会初始化ugi的信息,默认为当前用户
1
2
3
4
5
6
7
8
9
10
11
|
final
UserGroupInformation ugi;
...
this
.ugi = UserGroupInformation.getCurrentUser();
如果更改成proxy用户,通过运行hadoop fs -mkdir测试,发现生成的文件目录属主还是当前登录用户
更改DFSClient的构造方法:
//this.ugi = UserGroupInformation.getCurrentUser();
if
(conf.getBoolean(
"use.custom.proxy"
,
false
)){
this
.ugi = UserGroupInformation.createRemoteUser(conf.get(
"custom.proxy.user"
));
}
else
{
this
.ugi = UserGroupInformation.getCurrentUser();
}
|
在hdfs-site.xml配置中增加:
dfs配置中增加:
1
2
3
4
5
6
7
8
|
<property>
<name>use.custom.proxy</name>
<value>
true
</value>
</property>
<property>
<name>custom.proxy.user</name>
<value>ericni</value>
</property>
|
使用hdfs创建目录后,目录的属主仍然是hdfs,而数据写入的用户为提交job的用户。
因为上面的原因,要想使创建的hdfs的目录属主为proxy的用户,可以采用创建完后设置owner的方法。
通过查看DistributedFileSystem类的api,发现有setOwner的方法。
以insert overwrite 语句为例,在mapred job提交之前,会根据job的上下文内容,创建map和reduce的临时目录,这个目录是最终数据落地的目录,落地之后,在job完成的finally阶段,会通过MoveTask移动到对应的目录下面临时数据写入目录在ExecDriver类的execute方法中生成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public
int
execute(DriverContext driverContext) {
IOPrepareCache ioPrepareCache = IOPrepareCache.get();
ioPrepareCache.clear();
boolean
success =
true
;
Context ctx = driverContext.getCtx();
boolean
ctxCreated =
false
;
Path emptyScratchDir;
MapWork mWork = work.getMapWork();
ReduceWork rWork = work.getReduceWork();
try
{
if
(ctx ==
null
) {
ctx =
new
Context(job);
ctxCreated =
true
;
}
emptyScratchDir = ctx.getMRTmpPath();
FileSystem fs = emptyScratchDir.getFileSystem(job);
fs.mkdirs(emptyScratchDir);
}
catch
(IOException e) {
e.printStackTrace();
console.printError(
"Error launching map-reduce job"
,
"\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
return
5
;
}
....
List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx);
//获取输入目录
Utilities.setInputPaths(job, inputPaths);
Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
....
Utilities.createTmpDirs(job, mWork);
//创建map临时目录
Utilities.createTmpDirs(job, rWork);
//创建reduce临时目录
|
1
2
3
4
5
6
7
8
9
10
11
|
一种思路,在外层创建目录后setOwner,可以在Utilities中增加一个方法调用setOwner:
public
static
void
setDirWithOwner(Configuration conf,Path mkdirPath,
String username,String groupname)
throws
IOException {
LOG.warn(
"in Utilities setDirWithOwner path: "
+ mkdirPath +
",username: "
+ username +
",groupname: "
+ groupname);
FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
try
{
fs.setOwner(mkdirPath, username, groupname);
//调用DistributedFileSystem.setOwner方法
}
catch
(IOException ios) {
//no-op
}
}
|
同时更改createTmpDirs方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private
static
void
createTmpDirs(Configuration conf,
List<Operator<?
extends
OperatorDesc>> ops)
throws
IOException {
FsPermission fsPermission =
new
FsPermission((
short
)
00777
);
while
(!ops.isEmpty()) {
Operator<?
extends
OperatorDesc> op = ops.remove(
0
);
if
(op
instanceof
FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
//org.apache.hadoop.hive.ql.plan.FileSinkDesc
Path tempDir = fdesc.getDirName();
//获取目录名
if
(tempDir !=
null
) {
Path tempPath = Utilities.toTempPath(tempDir);
//目录增加_tmp.前缀
createDirsWithPermission(conf, tempPath, fsPermission);
if
(conf.getBoolean(
"use.custom.proxy"
,
false
)) {
//如果设置了use.custom.proxy,则调用setDirWithOwner方法,设置目录权限
LOG.warn(
"set owner after create dirs"
);
String username = conf.get(
"custom.proxy.user"
);
setDirWithOwner(conf,tempPath,username,
null
);
}
}
}
if
(op.getChildOperators() !=
null
) {
ops.addAll(op.getChildOperators());
}
}
}
|
上面这种方法有一定的局限性,比如是使用了Utilities.createTmpDirs的方法创建的目录才有用(比如map或者reduce的临时数据目录)。
可以通过改下层的实现:
在DFSClient中增加一个setOwner方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public
boolean
setOwner(String src, String username)
throws
IOException {
boolean
setResult =
false
;
checkOpen();
try
{
namenode.setOwner(src, username,
null
);
setResult =
true
;
}
catch
(RemoteException re) {
throw
re.unwrapRemoteException(AccessControlException.
class
,
FileNotFoundException.
class
,
SafeModeException.
class
,
UnresolvedPathException.
class
);
}
finally
{
return
setResult;
}
}
|
同时更改primitiveMkdir为如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public
boolean
primitiveMkdir(String src, FsPermission absPermission,
boolean
createParent)
throws
IOException {
checkOpen();
boolean
MkRe;
boolean
SetRe;
if
(absPermission ==
null
) {
absPermission =
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
}
if
(LOG.isDebugEnabled()) {
LOG.debug(src +
": masked="
+ absPermission);
}
try
{
MkRe = namenode.mkdirs(src, absPermission, createParent);
//namenode:org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
if
(
this
.conf.getBoolean(
"use.custom.proxy"
,
false
)){
LOG.warn(
"change primitiveMkdir add conf: "
+
this
.conf.getBoolean(
"use.custom.proxy"
,
false
));
LOG.warn(
"change primitiveMkdir add conf: "
+
this
.conf.get(
"custom.proxy.user"
));
String username =
this
.conf.get(
"custom.proxy.user"
);
if
((
""
).equals(username)||username ==
null
||(
"hdfs"
).equals(username)){
//no-op
SetRe =
true
;
}
else
{
SetRe = setOwner(src,username);
}
}
else
{
SetRe =
true
;
}
return
MkRe&&SetRe;
}
catch
(RemoteException re) {
throw
re.unwrapRemoteException(AccessControlException.
class
,
InvalidPathException.
class
,
FileAlreadyExistsException.
class
,
FileNotFoundException.
class
,
ParentNotDirectoryException.
class
,
SafeModeException.
class
,
NSQuotaExceededException.
class
,
DSQuotaExceededException.
class
,
UnresolvedPathException.
class
);
}
}
|
这样,只要是调用了DFSClient的primitiveMkdir方法创建的目录(正常情况下创建目录都会调用primitiveMkdir方法),在proxy的情况下都可以更改目录。
到这里,hive的proxy算是开发完成了,为了实现proxy的功能,对hive和hadoop的代码更改如下:
1.HiveConf中增加两个配置项
2.重写HadoopDefaultAuthenticator的setConf方法
3.更改Context构造方法中关于scratch目录的项
4.更改Utilities中的createDirsWithPermission方法和createTmpDirs方法,并新增setDirWithOwner方法
5.更改HiveHistoryImpl构造方法中关于日志路径的项
6.更改JobClient的init方法
7.更改DFSClient的构造方法,增加一个setOwner方法,同时更改primitiveMkdir方法
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1591572,如需转载请自行联系原作者