HDFSEventSink用于把数据从channel中拿出来(主动pull的形式)然后放到hdfs中,HDFSEventSink在启动时会启动两个线程池callTimeoutPool 和timedRollerPool ,callTimeoutPool 用于运行append/flush等操作hdfs的task(通过callWithTimeout方法调用,并实现timeout功能),用于运行翻转文件的计划任务timedRollerPool:
1
2
3
4
|
callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
new
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
new
ThreadFactoryBuilder().setNameFormat(rollerName).build());
|
channel到sink的操作最终调用了sink的process方法(由SinkProcessor实现类调用),比如HDFSEventSink的process方法,每个process方法中都是一个事务,用来提供原子性操作,process方法调用Channel的take方法从Channel中取出Event,每个transaction中最多的Event数量由hdfs.batchSize设定,默认是100,对每一个Event有如下操作:
1.获取文件的完整路径和名称lookupPath
2.声明一个BucketWriter对象和HDFSWriter 对象,HDFSWriter由hdfs.fileType设定,负责实际数据的写入,BucketWriter可以理解成对hdfs文件和写入方法的封装,每个lookupPath对应一个BucketWriter对象,对应关系写入到sfWriters中(这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类(private static class WriterLinkedHashMap extends LinkedHashMap<String, BucketWriter>),用来存放文件到BucketWriter的对应关系,在start方法中初始化:
this.sfWriters = new WriterLinkedHashMap( maxOpenFiles);
长度为hdfs.maxOpenFiles的设置,默认为5000,这个代表最多能打开的文件数量)
3.调用BucketWriter的append方法写入数据
4.当操作的Event数量达到hdfs.batchSize设定后,循环调用每个BucketWriter对象的flush方法,并提交transaction
5.如果出现异常则回滚事务
6.最后关闭transaction
process方法最后返回的是代表Sink状态的Status对象(BACKOFF或者READY),这个可以用于判断Sink的健康状态,比如failover的SinkProcessor就根据这个来判断Sink是否可以提供服务
主要方法分析:
1.构造函数声明一个HDFSWriterFactory对象
在后面会使用HDFSWriterFactory的getWriter方法会根据file类型返回对应的HDFSWriter实现类
2.configure
1)通过configure方法会根据Context设置各种参数项
比如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
inUseSuffix = context.getString(
"hdfs.inUseSuffix"
, defaultInUseSuffix );
//正在写入的文件的后缀名,默认为".tmp"
rollInterval = context.getLong(
"hdfs.rollInterval"
, defaultRollInterval );
//文件翻转时间,默认30
rollSize = context.getLong(
"hdfs.rollSize"
, defaultRollSize );
//文件翻转大小,默认1024
rollCount = context.getLong(
"hdfs.rollCount"
, defaultRollCount );
//默认为10
batchSize = context.getLong(
"hdfs.batchSize"
, defaultBatchSize );
//默认为100
idleTimeout = context.getInteger(
"hdfs.idleTimeout"
,
0
);
//默认为
String codecName = context.getString(
"hdfs.codeC"
);
//压缩格式
fileType = context.getString(
"hdfs.fileType"
, defaultFileType );
//默认为HDFSWriterFactory.SequenceFileType,即sequencefile
maxOpenFiles = context.getInteger(
"hdfs.maxOpenFiles"
, defaultMaxOpenFiles );
//默认为5000
callTimeout = context.getLong(
"hdfs.callTimeout"
, defaultCallTimeout );
//BucketWriter超时时间,默认为10000
threadsPoolSize = context.getInteger(
"hdfs.threadsPoolSize"
,
defaultThreadPoolSize);
//操作append/open/close/flush任务的线程池大小,默认为10
rollTimerPoolSize = context.getInteger(
"hdfs.rollTimerPoolSize"
,
defaultRollTimerPoolSize);
//文件翻转计时器线程池大小,默认为1
tryCount = context.getInteger(
"hdfs.closeTries"
, defaultTryCount );
//尝试close文件的此数(大于0)
retryInterval = context.getLong(
"hdfs.retryInterval"
, defaultRetryInterval);
//间隔时间(大于0)
|
2)获取压缩格式
1
2
3
4
5
6
7
8
|
if
(codecName ==
null
) {
//如果hdfs.codeC没有设置
codeC =
null
;
//则没有压缩功能
compType = CompressionType. NONE;
}
else
{
codeC = getCodec(codecName);
//调用getCodec方法获取压缩格式
// TODO : set proper compression type
compType = CompressionType. BLOCK;
//压缩类型为BLOCK类型
}
|
3)hdfs文件翻转相关设置,在实例化BucketWriter对象时会用到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
needRounding = context.getBoolean(
"hdfs.round"
,
false
);
if
(needRounding) {
String unit = context.getString(
"hdfs.roundUnit"
,
"second"
);
if
(unit.equalsIgnoreCase(
"hour"
)) {
this
.roundUnit = Calendar.HOUR_OF_DAY;
}
else
if
(unit.equalsIgnoreCase(
"minute"
)) {
this
.roundUnit = Calendar.MINUTE;
}
else
if
(unit.equalsIgnoreCase(
"second"
)){
this
.roundUnit = Calendar.SECOND;
}
else
{
LOG.warn(
"Rounding unit is not valid, please set one of"
+
"minute, hour, or second. Rounding will be disabled"
);
needRounding =
false
;
}
this
. roundValue = context.getInteger(
"hdfs.roundValue"
,
1
);
if
(roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){
Preconditions. checkArgument(roundValue >
0
&& roundValue <=
60
,
"Round value"
+
"must be > 0 and <= 60"
);
}
else
if
(roundUnit == Calendar.HOUR_OF_DAY){
Preconditions. checkArgument(roundValue >
0
&& roundValue <=
24
,
"Round value"
+
"must be > 0 and <= 24"
);
}
}
|
4)最后初始化一个SinkCounter对象用来记录sink的性能数据
1
2
3
|
if
(sinkCounter ==
null
) {
sinkCounter =
new
SinkCounter(getName());
}
|
3.start方法用来启动线程池等
1
2
3
4
5
6
7
8
9
10
11
|
public
void
start() {
String timeoutName =
"hdfs-"
+ getName() +
"-call-runner-%d"
;
callTimeoutPool = Executors. newFixedThreadPool(threadsPoolSize,
new
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
String rollerName =
"hdfs-"
+ getName() +
"-roll-timer-%d"
;
timedRollerPool = Executors. newScheduledThreadPool(rollTimerPoolSize,
new
ThreadFactoryBuilder().setNameFormat(rollerName).build());
this
. sfWriters =
new
WriterLinkedHashMap(maxOpenFiles);
//初始化WriterLinkedHashMap对象
sinkCounter.start();
super
.start();
}
|
4.process方法,从channel中pull出数据并发送到hdfs中(每一个transaction中最多可以有batchSize条Event),获取对应的bucket,序列化数据并写入hdfs文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
public
Status process()
throws
EventDeliveryException {
Channel channel = getChannel();
//获取对应的channel
Transaction transaction = channel.getTransaction();
//获取Transaction 对象,提供事务功能
List<BucketWriter> writers = Lists. newArrayList();
transaction.begin();
//事务开始
try
{
int
txnEventCount =
0
;
for
(txnEventCount =
0
; txnEventCount < batchSize; txnEventCount++) {
//这里一个transaction存放的数据最多由hdfs.batchSize指定
Event event = channel.take();
//循环调用Channel的take方法获取Event
if
(event ==
null
) {
break
;
}
// reconstruct the path name by substituting place holders
String realPath = BucketPath. escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit , roundValue );
//设置文件路径
String realName = BucketPath. escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit , roundValue );
//设置文件名称
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
//完整的文件名称
BucketWriter bucketWriter = sfWriters.get(lookupPath);
//根据文件获取对应的BucketWriter对象
// we haven't seen this file yet, so open it and cache the handle
if
(bucketWriter ==
null
) {
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType );
//根据文件类型获取HDFSWriter 对象
WriterCallback idleCallback =
null
;
if
(idleTimeout !=
0
) {
idleCallback =
new
WriterCallback() {
@Override
public
void
run(String bucketPath) {
sfWriters.remove(bucketPath);
//回调方法
}
};
}
bucketWriter =
new
BucketWriter(rollInterval , rollSize , rollCount ,
batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
suffix, codeC, compType, hdfsWriter, timedRollerPool,
proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath);
//实例化BucketWriter
sfWriters.put(lookupPath, bucketWriter);
//这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类,用来存放文件到BucketWriter的对应关系,在start方法中初始化:this .sfWriters = new WriterLinkedHashMap(maxOpenFiles);大小为hdfs.maxOpenFiles的设置,默认为5000
}
// track the buckets getting written in this transaction
if
(!writers.contains(bucketWriter)) {
//List<BucketWriter> writers = Lists.newArrayList();
writers.add(bucketWriter);
}
// Write the data to HDFS
append(bucketWriter, event);
//调用append方法写入Event数据
}
if
(txnEventCount ==
0
) {
sinkCounter.incrementBatchEmptyCount();
}
else
if
(txnEventCount == batchSize ) {
sinkCounter.incrementBatchCompleteCount();
}
else
{
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets before committing the transaction
for
(BucketWriter bucketWriter : writers) {
flush(bucketWriter);
//调用flush方法
}
transaction.commit();
//事务提交
if
(txnEventCount <
1
) {
return
Status.BACKOFF ;
}
else
{
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return
Status.READY ;
}
}
catch
(IOException eIO) {
//如果异常则回滚事务
transaction.rollback();
LOG.warn(
"HDFS IO error"
, eIO);
return
Status. BACKOFF;
}
catch
(Throwable th) {
transaction.rollback();
LOG.error(
"process failed"
, th);
if
(th
instanceof
Error) {
throw
(Error) th;
}
else
{
throw
new
EventDeliveryException(th);
}
}
finally
{
transaction.close();
}
}
|
5.同时定义了几个操作BucketWriter的方法append,flush,close
1
2
3
4
5
6
7
8
9
10
11
12
|
1
)
private
void
append(
final
BucketWriter bucketWriter,
final
Event event)
throws
IOException, InterruptedException {
// Write the data to HDFS
callWithTimeout(
new
Callable<Void>() {
//注意这里使用callWithTimeout提供了调用的超时功能
public
Void call()
throws
Exception {
bucketWriter.append(event);
//调用BucketWriter.append方法写入Event数据
return
null
;
}
});
}
2
)flush-->BucketWriter.flush()
3
) close-->BucketWriter.close()
|