flume作为日志收集端,其本质上也是一个生产者消费者结构,source作为消息的producer,sink作为消息的consumer,中间的channel作消息的存储
为了保证消息消费的正确性,flume使用了事务的机制,主要涉及的类:
1)org.apache.flume.Transaction 接口类,为访问channel提供事务的功能(可以是put,也可以是take)
首先定义了一个enum类TransactionState,定义了时间的几种状态 {Started, Committed, RolledBack, Closed }
定义的抽象方法:

1
2
3
4
begin  //开始事务处理
commit  //事务完成
rollback  //回滚
close   //关闭事务

使用方法,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Channel ch = ...  //构造一个channel对象
Transaction tx = ch.getTransaction();  //获取此channel对象的Transaction 对象
try  {
   tx.begin();  //事务开始
   ...
   ch.put(event) or ch.take()  //数据操作
   ... 
   tx.commit();  //提交事务
catch  (ChannelException ex) {
   tx.rollback();  //异常时回滚事务
   ...
finally  {
   tx.close();
}

2)org.apache.flume.channel.BasicTransactionSemantics   实现了Transaction接口的抽象类 (和BasicChannelSemantics共同使用)
定义了一个enum的类State,标识了transaction的状态NEW(新建), OPEN(可以执行commit或者rollback), COMPLETED(commit或者rollback之后的状态), CLOSED
定义了几个抽象方法doBegin/doPut/doTake/doCommit/doRollback/doClose,同时定义了put/take/begin/commit/rollback/close方法

每一个方法中都会使用Preconditions.checkState做检测,检测是否是同一线程操作,状态是否是要求的状态,比如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
   protected  BasicTransactionSemantics() { 
     state = State.NEW;  //在构造方法中先设置state为NEW
     initialThreadId = Thread.currentThread().getId();
  
   public  void  begin() {  //begin方法
     Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
         "begin() called from different thread than getTransaction()!" );  //检测是否为同一线程操作
     Preconditions.checkState(state.equals(State.NEW),
         "begin() called when transaction is "  + state +  "!" );  //检测当前状态是否为NEW
     try  {
       doBegin();  //调用doBegin方法
     catch  (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw  new  ChannelException(e.toString(), e);
     }
     state = State.OPEN;  //设置状态为OPEN
   }

事务的顺序:
begin-->put/take-->commit/rollback-->close   

3)而BasicTransactionSemantics 的子类包括如下几个,分别对应具体的channel

1
2
3
FileChannel-->FileBackedTranscation
MemoryChannel-->MemoryTranscation
SpillableMemoryChannel-->SpillableMemoryTransaction

transaction类:
wKiom1T1wiqTAyovAAEruW-ZXYA796.jpg

channel类:

wKiom1T1wkey5obqAAErrxff4IM198.jpg

每个具体的BasicTransactionSemantics 子类会实现具体的doPut/doTake/doCommit/doRollback方法
下面看怎么调用的:
调用channel实例的put/take/commit/rollback方法时,以put为例
1)在org.apache.flume.channel.ChannelProcessor中调用processEvent(操作一条event)或者processEventBatch(插入一批event)方法
比如processEvent中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
     // Process required channels
     List<Channel> requiredChannels = selector.getRequiredChannels(event);
     for  (Channel reqChannel : requiredChannels) {
       Transaction tx = reqChannel.getTransaction();
       Preconditions.checkNotNull(tx,  "Transaction object must not be null" );  //transaction对象不能为null
       try  {
         tx.begin();  //事务开始
         reqChannel.put(event);  //调用channel的put方法,这里为org.apache.flume.channel.BasicChannelSemantics
         tx.commit();  //事务提交,put有commit的操作,take也有commit的操作
       catch  (Throwable t) {
         tx.rollback();
         if  (t  instanceof  Error) {
           LOG.error( "Error while writing to required channel: "  +
               reqChannel, t);
           throw  (Error) t;
         else  {
           throw  new  ChannelException( "Unable to put event on required "  +
               "channel: "  + reqChannel, t);
         }
       finally  {
         if  (tx !=  null  ) {
           tx.close();
         }
       }
     }

2)调用父类org.apache.flume.channel.BasicChannelSemantics的put方法:

1
2
3
4
5
6
   public  void  put(Event event)  throws  ChannelException {
     BasicTransactionSemantics transaction = currentTransaction.get();
     Preconditions.checkState(transaction !=  null ,
         "No transaction exists for this thread"  );
     transaction.put(event);  //调用BasicTransactionSemantics 具体实现类的put方法
   }

3)org.apache.flume.channel.BasicTransactionSemantics的put方法(子类没有具体的put实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   protected  void  put(Event event) {
     Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
         "put() called from different thread than getTransaction()!"  );
     Preconditions.checkState(state .equals(State.OPEN),
         "put() called when transaction is %s!"  , state );
     Preconditions.checkArgument(event !=  null ,
         "put() called with null event!"  );  //开始会做一些状态判断,比如transcation的状态是否正确等
     try  {
       doPut(event);  //调用doPut方法
     catch  (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw  new  ChannelException(e.toString(), e);
     }
   }

4)然后调用对应channel中MemoryTransaction具体实现类的doPut方法
比如org.apache.flume.channel.MemoryChannel的内部类MemoryTransaction的doPut:

1
2
3
4
5
6
7
8
9
10
11
     protected  void  doPut(Event event)  throws  InterruptedException {
       channelCounter.incrementEventPutAttemptCount();
       int  eventByteSize = ( int )Math.ceil(estimateEventSize(event)/ byteCapacitySlotSize);
       if  (! putList.offer(event)) {
         throw  new  ChannelException(
           "Put queue for MemoryTransaction of capacity "  +
             putList.size() +  " full, consider committing more frequently, "  +
             "increasing capacity or increasing thread count"  );
       }
       putByteCounter += eventByteSize;
     }