上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:
线上hdfs sink的几个重要设置
1
2
3
4
5
6
7
8
|
hdfs.path = hdfs:
//xxxxx/%{logtypename}/%Y%m%d/%H:
hdfs.rollInterval =
60
hdfs.rollSize =
0
//想让文件只根据实际来roll
hdfs.rollCount =
0
hdfs.batchSize =
2000
hdfs.txnEventMax =
2000
hdfs.fileType = DataStream
hdfs.writeFormat = Text
|
这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.
1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置
2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式
上面的设置,其数据写入流程大概如下:
1
|
SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write
|
简单说下:
在HDFSEventSink中会实例化BucketWriter和HDFSWriter:
1
2
3
4
5
6
7
|
if
(bucketWriter ==
null
) {
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType );
//获取HDFSWriter 对象
....
bucketWriter =
new
BucketWriter(rollInterval , rollSize , rollCount ,
batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath);
//根据HDFSWriter 对象获取BucketWriter对象
|
这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象
目前只支持3种
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
static
final
String SequenceFileType =
"SequenceFile"
;
static
final
String DataStreamType =
"DataStream"
;
static
final
String CompStreamType =
"CompressedStream"
;
....
public
HDFSWriter getWriter(String fileType)
throws
IOException {
if
(fileType.equalsIgnoreCase( SequenceFileType)) {
//SequenceFile,sequencefile
return
new
HDFSSequenceFile();
}
else
if
(fileType.equalsIgnoreCase(DataStreamType)) {
//DataStream
return
new
HDFSDataStream();
}
else
if
(fileType.equalsIgnoreCase(CompStreamType)) {
//CompressedStream
return
new
HDFSCompressedDataStream();
}
else
{
throw
new
IOException(
"File type "
+ fileType +
" not supported"
);
}
|
BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:
1)首先判断文件是否打开:
1
2
3
4
5
6
7
|
if
(! isOpen) {
if
(idleClosed) {
throw
new
IOException(
"This bucket writer was closed due to idling and this handle "
+
"is thus no longer valid"
);
}
open();
//如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录)
}
|
doOpen的主要步骤
a.设置两个文件名:
1
2
3
|
bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
+ fullFileName + inUseSuffix;
targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;
|
b.调用HDFSWriter.open方法打开bucketPath
1
2
3
4
5
6
7
8
9
10
11
12
|
if
(codeC ==
null
) {
// Need to get reference to FS using above config before underlying
// writer does in order to avoid shutdown hook & IllegalStateExceptions
fileSystem =
new
Path(bucketPath ).getFileSystem(config);
LOG.info(
"Creating "
+ bucketPath );
writer.open( bucketPath);
}
else
{
// need to get reference to FS before writer does to avoid shutdown hook
fileSystem =
new
Path(bucketPath ).getFileSystem(config);
LOG.info(
"Creating "
+ bucketPath );
writer.open( bucketPath, codeC , compType );
}
|
c.如果设置了rollInterval ,则执行计划任务调用close方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// if time-based rolling is enabled, schedule the roll
if
(rollInterval >
0
) {
Callable<Void> action =
new
Callable<Void>() {
public
Void call()
throws
Exception {
LOG.debug(
"Rolling file ({}): Roll scheduled after {} sec elapsed."
,
bucketPath, rollInterval );
try
{
close();
}
catch
(Throwable t) {
LOG.error(
"Unexpected error"
, t);
}
return
null
;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval ,
TimeUnit. SECONDS);
}
|
2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):
1
2
3
4
5
|
// check if it's time to rotate the file
if
(shouldRotate()) {
close();
//close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中
open();
}
|
其中shouldRotate(基于数量和大小的roll方式):
1
2
3
4
5
6
7
8
9
10
11
12
|
private
boolean
shouldRotate() {
boolean
doRotate =
false
;
if
(( rollCount >
0
) && (rollCount <= eventCounter )) {
//hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
LOG.debug(
"rolling: rollCount: {}, events: {}"
, rollCount , eventCounter );
doRotate =
true
;
}
if
(( rollSize >
0
) && ( rollSize <= processSize)) {
//hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
LOG.debug(
"rolling: rollSize: {}, bytes: {}"
, rollSize , processSize );
doRotate =
true
;
}
return
doRotate;
}
|
其中doClose主要的步骤
a.调用HDFSWriter.close方法
b.调用renameBucket方法把tmp文件命名为最终文件:
1
2
3
4
|
if
(bucketPath !=
null
&& fileSystem !=
null
) {
renameBucket();
// could block or throw IOException
fileSystem =
null
;
}
|
其中renameBucket:
1
|
fileSystem.rename(srcPath, dstPath)
|
3)调用HDFSWriter.append方法写入Event
1
|
writer.append(event);
|
4) 更新计数器
1
2
3
4
|
// update statistics
processSize += event.getBody(). length;
eventCounter++;
batchCounter++;
|
5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs
1
2
3
|
if
(batchCounter == batchSize) {
flush();
}
|
Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:
configure用于设置serializer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public
void
configure(Context context) {
serializerType = context.getString(
"serializer"
,
"TEXT"
);
//默认序列化方式为TEXT
useRawLocalFileSystem = context.getBoolean(
"hdfs.useRawLocalFileSystem"
,
false
);
serializerContext =
new
Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
logger.info(
"Serializer = "
+ serializerType +
", UseRawLocalFileSystem = "
+ useRawLocalFileSystem);
}
append方法用于Event的写入,调用EventSerializer.write方法:
public
void
append(Event e)
throws
IOException {
// shun flumeformatter...
serializer.write(e);
//调用EventSerializer.write方法写入Event
}
|
open方法主要步骤:
1)根据hdfs.append.support的设置(默认为false)打开或者新建文件
1
2
3
4
5
6
7
8
|
boolean
appending =
false
;
if
(conf.getBoolean(
"hdfs.append.support"
,
false
) ==
true
&& hdfs.isFile
(dstPath)) {
//默认hdfs.append.support为false
outStream = hdfs.append(dstPath);
appending =
true
;
}
else
{
outStream = hdfs.create(dstPath);
//如果不支持append,则创建文件
}
|
2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象
1
2
|
serializer = EventSerializerFactory.getInstance(
serializerType, serializerContext , outStream );
//实例化EventSerializer对象
|
3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常
1
2
3
4
5
6
|
if
(appending && ! serializer.supportsReopen()) {
outStream.close();
serializer =
null
;
throw
new
IOException(
"serializer ("
+ serializerType +
") does not support append"
);
}
|
4)调用文件打开或者reopen之后的操作
1
2
3
4
5
6
|
if
(appending) {
serializer.afterReopen();
}
else
{
serializer.afterCreate();
}
}
|
这里hdfs.writeFormat的3种设置和对应的类:
1
2
3
|
TEXT(BodyTextEventSerializer.Builder.
class
),
//支持reopen
HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.
class
),
//支持reopen
AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.
class
),
// 不支持reopen
|
默认设置为TEXT,即BodyTextEventSerializer类:
1
2
3
4
5
6
7
8
9
10
|
private
BodyTextEventSerializer(OutputStream out, Context ctx) {
//构造方法
this
. appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT );
//默认为true
this
. out = out;
}
....
public
void
write(Event e)
throws
IOException {
//write方法
out.write(e.getBody());
//java.io.OutputStream.write,只写Event的body
if
(appendNewline) {
//每一行之后增加一个回车
out.write(
'\n'
);
}
|