HDFSEventSink用于把数据从channel中拿出来(主动pull的形式)然后放到hdfs中,HDFSEventSink在启动时会启动两个线程池callTimeoutPool 和timedRollerPool ,callTimeoutPool 用于运行append/flush等操作hdfs的task(通过callWithTimeout方法调用,并实现timeout功能),用于运行翻转文件的计划任务timedRollerPool:

1
2
3
4
     callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
             new  ThreadFactoryBuilder().setNameFormat(timeoutName).build());
     timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
             new  ThreadFactoryBuilder().setNameFormat(rollerName).build());

channel到sink的操作最终调用了sink的process方法(由SinkProcessor实现类调用),比如HDFSEventSink的process方法,每个process方法中都是一个事务,用来提供原子性操作,process方法调用Channel的take方法从Channel中取出Event,每个transaction中最多的Event数量由hdfs.batchSize设定,默认是100,对每一个Event有如下操作:
1.获取文件的完整路径和名称lookupPath
2.声明一个BucketWriter对象和HDFSWriter 对象,HDFSWriter由hdfs.fileType设定,负责实际数据的写入,BucketWriter可以理解成对hdfs文件和写入方法的封装,每个lookupPath对应一个BucketWriter对象,对应关系写入到sfWriters中(这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类(private static class WriterLinkedHashMap  extends LinkedHashMap<String, BucketWriter>),用来存放文件到BucketWriter的对应关系,在start方法中初始化:
this.sfWriters = new WriterLinkedHashMap( maxOpenFiles);
长度为hdfs.maxOpenFiles的设置,默认为5000,这个代表最多能打开的文件数量)
3.调用BucketWriter的append方法写入数据
4.当操作的Event数量达到hdfs.batchSize设定后,循环调用每个BucketWriter对象的flush方法,并提交transaction
5.如果出现异常则回滚事务
6.最后关闭transaction
process方法最后返回的是代表Sink状态的Status对象(BACKOFF或者READY),这个可以用于判断Sink的健康状态,比如failover的SinkProcessor就根据这个来判断Sink是否可以提供服务


主要方法分析:
1.构造函数声明一个HDFSWriterFactory对象
在后面会使用HDFSWriterFactory的getWriter方法会根据file类型返回对应的HDFSWriter实现类
2.configure 
1)通过configure方法会根据Context设置各种参数项
比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
inUseSuffix = context.getString(  "hdfs.inUseSuffix" , defaultInUseSuffix );  //正在写入的文件的后缀名,默认为".tmp"
rollInterval = context.getLong(  "hdfs.rollInterval" , defaultRollInterval );  //文件翻转时间,默认30
rollSize = context.getLong(  "hdfs.rollSize" , defaultRollSize );  //文件翻转大小,默认1024
rollCount = context.getLong(  "hdfs.rollCount" , defaultRollCount );  //默认为10
batchSize = context.getLong(  "hdfs.batchSize" , defaultBatchSize );  //默认为100
idleTimeout = context.getInteger(  "hdfs.idleTimeout" 0 );  //默认为
String codecName = context.getString(  "hdfs.codeC" );  //压缩格式
fileType = context.getString(  "hdfs.fileType" , defaultFileType );  //默认为HDFSWriterFactory.SequenceFileType,即sequencefile
maxOpenFiles = context.getInteger(  "hdfs.maxOpenFiles" , defaultMaxOpenFiles );  //默认为5000
callTimeout = context.getLong(  "hdfs.callTimeout" , defaultCallTimeout );  //BucketWriter超时时间,默认为10000
threadsPoolSize = context.getInteger(  "hdfs.threadsPoolSize" ,
         defaultThreadPoolSize);  //操作append/open/close/flush任务的线程池大小,默认为10
rollTimerPoolSize = context.getInteger(  "hdfs.rollTimerPoolSize" ,
         defaultRollTimerPoolSize);  //文件翻转计时器线程池大小,默认为1
tryCount = context.getInteger(  "hdfs.closeTries" , defaultTryCount );  //尝试close文件的此数(大于0)
retryInterval = context.getLong(  "hdfs.retryInterval" , defaultRetryInterval);  //间隔时间(大于0)

2)获取压缩格式

1
2
3
4
5
6
7
8
     if  (codecName ==  null ) {  //如果hdfs.codeC没有设置
       codeC =  null //则没有压缩功能
       compType = CompressionType. NONE; 
     else  {
       codeC = getCodec(codecName);   //调用getCodec方法获取压缩格式
       // TODO : set proper compression type
       compType = CompressionType. BLOCK;  //压缩类型为BLOCK类型
     }

3)hdfs文件翻转相关设置,在实例化BucketWriter对象时会用到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    needRounding = context.getBoolean(  "hdfs.round" false  );
     if (needRounding) {
       String unit = context.getString(  "hdfs.roundUnit" "second"  );
       if  (unit.equalsIgnoreCase(  "hour" )) {
         this .roundUnit = Calendar.HOUR_OF_DAY;
       else  if  (unit.equalsIgnoreCase( "minute"  )) {
         this .roundUnit = Calendar.MINUTE;
       else  if  (unit.equalsIgnoreCase( "second"  )){
         this .roundUnit = Calendar.SECOND;
       else  {
         LOG.warn( "Rounding unit is not valid, please set one of"  +
             "minute, hour, or second. Rounding will be disabled"  );
         needRounding =  false  ;
       }
       this . roundValue = context.getInteger( "hdfs.roundValue"  1 );
       if (roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){
         Preconditions. checkArgument(roundValue >  0  && roundValue <=  60 ,
             "Round value"  +
             "must be > 0 and <= 60" );
       else  if  (roundUnit == Calendar.HOUR_OF_DAY){
         Preconditions. checkArgument(roundValue >  0  && roundValue <=  24 ,
             "Round value"  +
             "must be > 0 and <= 24" );
       }
     }

4)最后初始化一个SinkCounter对象用来记录sink的性能数据

1
2
3
     if  (sinkCounter ==  null ) {
       sinkCounter =  new  SinkCounter(getName());
     }

3.start方法用来启动线程池等

1
2
3
4
5
6
7
8
9
10
11
   public  void  start() {
     String timeoutName =  "hdfs-"  + getName() +  "-call-runner-%d"  ;
     callTimeoutPool = Executors. newFixedThreadPool(threadsPoolSize,
             new  ThreadFactoryBuilder().setNameFormat(timeoutName).build());
     String rollerName =  "hdfs-"  + getName() +  "-roll-timer-%d"  ;
     timedRollerPool = Executors. newScheduledThreadPool(rollTimerPoolSize,
             new  ThreadFactoryBuilder().setNameFormat(rollerName).build());
     this . sfWriters =  new  WriterLinkedHashMap(maxOpenFiles);  //初始化WriterLinkedHashMap对象
     sinkCounter.start();
     super .start();
   }

4.process方法,从channel中pull出数据并发送到hdfs中(每一个transaction中最多可以有batchSize条Event),获取对应的bucket,序列化数据并写入hdfs文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
   public  Status process()  throws  EventDeliveryException {
     Channel channel = getChannel();  //获取对应的channel
     Transaction transaction = channel.getTransaction();  //获取Transaction 对象,提供事务功能
     List<BucketWriter> writers = Lists. newArrayList();
     transaction.begin();  //事务开始
     try  {
       int  txnEventCount =  0 ;
       for  (txnEventCount =  0 ; txnEventCount < batchSize; txnEventCount++) { //这里一个transaction存放的数据最多由hdfs.batchSize指定
         Event event = channel.take();  //循环调用Channel的take方法获取Event
         if  (event ==  null ) {
           break ;
         }
         // reconstruct the path name by substituting place holders
         String realPath = BucketPath. escapeString(filePath, event.getHeaders(),
             timeZone, needRounding, roundUnit , roundValue );  //设置文件路径
         String realName = BucketPath. escapeString(fileName, event.getHeaders(),
           timeZone, needRounding, roundUnit , roundValue );  //设置文件名称
         String lookupPath = realPath + DIRECTORY_DELIMITER + realName;  //完整的文件名称
         BucketWriter bucketWriter = sfWriters.get(lookupPath);   //根据文件获取对应的BucketWriter对象
         // we haven't seen this file yet, so open it and cache the handle
         if  (bucketWriter ==  null ) {
           HDFSWriter hdfsWriter = writerFactory.getWriter(fileType );  //根据文件类型获取HDFSWriter 对象
           WriterCallback idleCallback =  null ;
           if (idleTimeout !=  0 ) {
             idleCallback =  new  WriterCallback() {
               @Override
               public  void  run(String bucketPath) {
                 sfWriters.remove(bucketPath);  //回调方法
               }
             };
           }
           bucketWriter =  new  BucketWriter(rollInterval , rollSize , rollCount ,
               batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
               suffix, codeC, compType, hdfsWriter, timedRollerPool,
               proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath);  //实例化BucketWriter
           sfWriters.put(lookupPath, bucketWriter);  //这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类,用来存放文件到BucketWriter的对应关系,在start方法中初始化:this .sfWriters = new WriterLinkedHashMap(maxOpenFiles);大小为hdfs.maxOpenFiles的设置,默认为5000
         }
         // track the buckets getting written in this transaction
         if  (!writers.contains(bucketWriter)) {  //List<BucketWriter> writers = Lists.newArrayList();
           writers.add(bucketWriter);
         }
         // Write the data to HDFS
         append(bucketWriter, event);  //调用append方法写入Event数据
       }
       if  (txnEventCount ==  0 ) {
         sinkCounter.incrementBatchEmptyCount();
       else  if  (txnEventCount == batchSize ) {
         sinkCounter.incrementBatchCompleteCount();
       else  {
         sinkCounter.incrementBatchUnderflowCount();
       }
       // flush all pending buckets before committing the transaction
       for  (BucketWriter bucketWriter : writers) {
         flush(bucketWriter);  //调用flush方法
       }
       transaction.commit();  //事务提交
       if  (txnEventCount <  1 ) {
         return  Status.BACKOFF ;
       else  {
         sinkCounter.addToEventDrainSuccessCount(txnEventCount);
         return  Status.READY ;
       }
     catch  (IOException eIO) {  //如果异常则回滚事务
       transaction.rollback();
       LOG.warn(  "HDFS IO error" , eIO);
       return  Status. BACKOFF;
     catch  (Throwable th) {
       transaction.rollback();
       LOG.error(  "process failed" , th);
       if  (th  instanceof  Error) {
         throw  (Error) th;
       else  {
         throw  new  EventDeliveryException(th);
       }
     finally  {
       transaction.close();
     }
   }

5.同时定义了几个操作BucketWriter的方法append,flush,close

1
2
3
4
5
6
7
8
9
10
11
12
  1 )  private  void  append( final  BucketWriter bucketWriter,  final  Event event)
           throws  IOException, InterruptedException {
     // Write the data to HDFS
     callWithTimeout( new  Callable<Void>() {  //注意这里使用callWithTimeout提供了调用的超时功能
       public  Void call()  throws  Exception {
         bucketWriter.append(event);  //调用BucketWriter.append方法写入Event数据
         return  null ;
       }
     });
   }
2 )flush-->BucketWriter.flush()
3 ) close-->BucketWriter.close()