org.apache.flume.channel.ChannelProcessor 用于实际的Event到Channel的操作(在Source中用到),可以把它想象成channel的proxy,用于控制把Event put到哪些Channel中,以及怎么put(bacth或者单个),同时在put之前会使用 Interceptor对Event进行处理。
把Event put到哪些Channel中是由ChannelSelector 控制的,根据selector的设置,目前主要有两种:
|
1
2
|
REPLICATING->org.apache.flume.channel.ReplicatingChannelSelector,
MULTIPLEXING->org.apache.flume.channel.MultiplexingChannelSelector;
|
REPLICATING 把Event发送到每一个对应的channel上,每个channel都有完整的一份。
MULTIPLEXING 把Event发送到设置的映射的channel上,类似于hash,每个channel包含一部分
org.apache.flume.channel.MultiplexingChannelSelector会根据header(默认为flume.selector.header),mapping,default,optional的设置获取channel。
这里看下org.apache.flume.channel.ReplicatingChannelSelector的实现,可以看出有两个channel列表,optional和require,分布对应getOptionalChannels和getRequiredChannels方法,如果设置了optional,optionalChannels为optional的设置,requiredChannels为getAllChannels的设置减去optionalChannels的设置
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public
void
configure(Context context) {
//通过configure配置requiredChannels
String optionalList = context.getString(CONFIG_OPTIONAL);
//根据optional的设置
requiredChannels =
new
ArrayList<Channel>(getAllChannels());
//初始时requiredChannels 即为getAllChannels
Map<String, Channel> channelNameMap = getChannelNameMap();
if
(optionalList !=
null
&& !optionalList.isEmpty()) {
//如果optional的设置不为空
for
(String optional : optionalList.split(
"\\s+"
)) {
//对optional按空格进行split
Channel optionalChannel = channelNameMap.get(optional);
requiredChannels.remove(optionalChannel);
//从requiredChannels 数组中去除optionalChannel
if
(!optionalChannels.contains(optionalChannel)) {
optionalChannels.add(optionalChannel);
//添加到optionalChannels
}
}
}
}
|
ChannelProcessor的初始调用在SourceRunner中,比如在org.apache.flume.source.EventDrivenSourceRunner的start方法:
|
1
2
3
4
5
6
7
|
public
void
start() {
Source source = getSource();
//通过getSource获取Source对象
ChannelProcessor cp = source.getChannelProcessor();
//获取ChannelProcessor 对象
cp.initialize();
//调用ChannelProcessor.initialize方法
source.start();
//调用Source.start方法
lifecycleState = LifecycleState. START;
}
|
而在org.apache.flume.source.ExecSource.ExecRunnable类中会调用其processEventBatch方法,进行批量插入数据
|
1
2
3
4
5
6
7
8
|
while
((line = reader.readLine()) !=
null
) {
counterGroup.incrementAndGet(
"exec.lines.read"
);
eventList.add(EventBuilder. withBody(line.getBytes(charset)));
if
(eventList.size() >= bufferCount ) {
channelProcessor.processEventBatch(eventList);
eventList.clear();
}
}
|
看下ChannelProcessor的具体实现:
首先两个重要的属性
|
1
2
|
private
final
ChannelSelector selector ;
private
final
InterceptorChain interceptorChain ;
|
initialize方法调用InterceptorChain.initialize方法,初始化interceptorChain
|
1
2
3
|
public
void
initialize() {
interceptorChain.initialize();
}
|
configure方法调用configureInterceptors方法,用于根据interceptors设置InterceptorChain
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
private
void
configureInterceptors(Context context) {
List<Interceptor> interceptors = Lists.newLinkedList();
String interceptorListStr = context.getString(
"interceptors"
,
""
);
//获取interceptors的设置
if
(interceptorListStr.isEmpty()) {
return
;
}
String[] interceptorNames = interceptorListStr.split(
"\\s+"
);
//根据空格分隔
Context interceptorContexts =
new
Context(context.getSubProperties(
"interceptors."
));
// run through and instantiate all the interceptors specified in the Context
InterceptorBuilderFactory factory =
new
InterceptorBuilderFactory();
for
(String interceptorName : interceptorNames) {
Context interceptorContext =
new
Context(
interceptorContexts.getSubProperties(interceptorName +
"."
));
String type = interceptorContext.getString(
"type"
);
if
(type ==
null
) {
LOG.error(
"Type not specified for interceptor "
+ interceptorName);
throw
new
FlumeException(
"Interceptor.Type not specified for "
+
interceptorName);
}
try
{
Interceptor.Builder builder = factory.newInstance(type);
//根据type的设置获取Interceptor
builder.configure(interceptorContext);
interceptors.add(builder.build());
......
}
interceptorChain.setInterceptors(interceptors);
}
|
另外提供了两个插入数据的方法,processEventBatch和processEvent,processEventBatch用于插入一批Event(参数是List<Event> events),processEvent用于插入一个Event。
看下processEvent的实现:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
public
void
processEvent(Event event) {
event = interceptorChain.intercept(event);
//调用InterceptorChain.intercept对Event进行处理
if
(event ==
null
) {
return
;
}
// Process required channels
List<Channel> requiredChannels = selector.getRequiredChannels(event);
// 根据ChannelSelector获取requiredChannels
for
(Channel reqChannel : requiredChannels) {
// 对requiredChannels 中的每一个channel执行对应的put操作,每个操作都在一个事务内
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx,
"Transaction object must not be null"
);
try
{
tx.begin();
reqChannel.put(event);
tx.commit();
}
catch
(Throwable t) {
tx.rollback();
if
(t
instanceof
Error) {
LOG.error(
"Error while writing to required channel: "
+
reqChannel, t);
throw
(Error) t;
}
else
{
throw
new
ChannelException(
"Unable to put event on required "
+
"channel: "
+ reqChannel, t);
}
}
finally
{
if
(tx !=
null
) {
tx.close();
}
}
}
// Process optional channels
List<Channel> optionalChannels = selector.getOptionalChannels(event);
//同样对optionalChannels做相同的操作
for
(Channel optChannel : optionalChannels) {
Transaction tx =
null
;
try
{
tx = optChannel.getTransaction();
tx.begin();
optChannel.put(event);
tx.commit();
}
catch
(Throwable t) {
tx.rollback();
LOG.error(
"Unable to put event on optional channel: "
+ optChannel, t);
if
(t
instanceof
Error) {
throw
(Error) t;
}
}
finally
{
if
(tx !=
null
) {
tx.close();
}
}
}
}
|