flume作为日志收集端,其本质上也是一个生产者消费者结构,source作为消息的producer,sink作为消息的consumer,中间的channel作消息的存储
为了保证消息消费的正确性,flume使用了事务的机制,主要涉及的类:
1)org.apache.flume.Transaction 接口类,为访问channel提供事务的功能(可以是put,也可以是take)
首先定义了一个enum类TransactionState,定义了时间的几种状态 {Started, Committed, RolledBack, Closed }
定义的抽象方法:
1
2
3
4
|
begin
//开始事务处理
commit
//事务完成
rollback
//回滚
close
//关闭事务
|
使用方法,比如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
Channel ch = ...
//构造一个channel对象
Transaction tx = ch.getTransaction();
//获取此channel对象的Transaction 对象
try
{
tx.begin();
//事务开始
...
ch.put(event) or ch.take()
//数据操作
...
tx.commit();
//提交事务
}
catch
(ChannelException ex) {
tx.rollback();
//异常时回滚事务
...
}
finally
{
tx.close();
}
|
2)org.apache.flume.channel.BasicTransactionSemantics 实现了Transaction接口的抽象类 (和BasicChannelSemantics共同使用)
定义了一个enum的类State,标识了transaction的状态NEW(新建), OPEN(可以执行commit或者rollback), COMPLETED(commit或者rollback之后的状态), CLOSED
定义了几个抽象方法doBegin/doPut/doTake/doCommit/doRollback/doClose,同时定义了put/take/begin/commit/rollback/close方法
每一个方法中都会使用Preconditions.checkState做检测,检测是否是同一线程操作,状态是否是要求的状态,比如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
protected
BasicTransactionSemantics() {
state = State.NEW;
//在构造方法中先设置state为NEW
initialThreadId = Thread.currentThread().getId();
}
public
void
begin() {
//begin方法
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"begin() called from different thread than getTransaction()!"
);
//检测是否为同一线程操作
Preconditions.checkState(state.equals(State.NEW),
"begin() called when transaction is "
+ state +
"!"
);
//检测当前状态是否为NEW
try
{
doBegin();
//调用doBegin方法
}
catch
(InterruptedException e) {
Thread.currentThread().interrupt();
throw
new
ChannelException(e.toString(), e);
}
state = State.OPEN;
//设置状态为OPEN
}
|
事务的顺序:
begin-->put/take-->commit/rollback-->close
3)而BasicTransactionSemantics 的子类包括如下几个,分别对应具体的channel
1
2
3
|
FileChannel-->FileBackedTranscation
MemoryChannel-->MemoryTranscation
SpillableMemoryChannel-->SpillableMemoryTransaction
|
channel类:
每个具体的BasicTransactionSemantics 子类会实现具体的doPut/doTake/doCommit/doRollback方法
下面看怎么调用的:
调用channel实例的put/take/commit/rollback方法时,以put为例
1)在org.apache.flume.channel.ChannelProcessor中调用processEvent(操作一条event)或者processEventBatch(插入一批event)方法
比如processEvent中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Process required channels
List<Channel> requiredChannels = selector.getRequiredChannels(event);
for
(Channel reqChannel : requiredChannels) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx,
"Transaction object must not be null"
);
//transaction对象不能为null
try
{
tx.begin();
//事务开始
reqChannel.put(event);
//调用channel的put方法,这里为org.apache.flume.channel.BasicChannelSemantics
tx.commit();
//事务提交,put有commit的操作,take也有commit的操作
}
catch
(Throwable t) {
tx.rollback();
if
(t
instanceof
Error) {
LOG.error(
"Error while writing to required channel: "
+
reqChannel, t);
throw
(Error) t;
}
else
{
throw
new
ChannelException(
"Unable to put event on required "
+
"channel: "
+ reqChannel, t);
}
}
finally
{
if
(tx !=
null
) {
tx.close();
}
}
}
|
2)调用父类org.apache.flume.channel.BasicChannelSemantics的put方法:
1
2
3
4
5
6
|
public
void
put(Event event)
throws
ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction !=
null
,
"No transaction exists for this thread"
);
transaction.put(event);
//调用BasicTransactionSemantics 具体实现类的put方法
}
|
3)org.apache.flume.channel.BasicTransactionSemantics的put方法(子类没有具体的put实现)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
protected
void
put(Event event) {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"put() called from different thread than getTransaction()!"
);
Preconditions.checkState(state .equals(State.OPEN),
"put() called when transaction is %s!"
, state );
Preconditions.checkArgument(event !=
null
,
"put() called with null event!"
);
//开始会做一些状态判断,比如transcation的状态是否正确等
try
{
doPut(event);
//调用doPut方法
}
catch
(InterruptedException e) {
Thread.currentThread().interrupt();
throw
new
ChannelException(e.toString(), e);
}
}
|
4)然后调用对应channel中MemoryTransaction具体实现类的doPut方法
比如org.apache.flume.channel.MemoryChannel的内部类MemoryTransaction的doPut:
1
2
3
4
5
6
7
8
9
10
11
|
protected
void
doPut(Event event)
throws
InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int
eventByteSize = (
int
)Math.ceil(estimateEventSize(event)/ byteCapacitySlotSize);
if
(! putList.offer(event)) {
throw
new
ChannelException(
"Put queue for MemoryTransaction of capacity "
+
putList.size() +
" full, consider committing more frequently, "
+
"increasing capacity or increasing thread count"
);
}
putByteCounter += eventByteSize;
}
|