org.apache.flume.channel.ChannelProcessor 用于实际的Event到Channel的操作(在Source中用到),可以把它想象成channel的proxy,用于控制把Event put到哪些Channel中,以及怎么put(bacth或者单个),同时在put之前会使用 Interceptor对Event进行处理。
把Event put到哪些Channel中是由ChannelSelector 控制的,根据selector的设置,目前主要有两种:

1
2
REPLICATING->org.apache.flume.channel.ReplicatingChannelSelector,
MULTIPLEXING->org.apache.flume.channel.MultiplexingChannelSelector;

REPLICATING 把Event发送到每一个对应的channel上,每个channel都有完整的一份。
MULTIPLEXING 把Event发送到设置的映射的channel上,类似于hash,每个channel包含一部分
org.apache.flume.channel.MultiplexingChannelSelector会根据header(默认为flume.selector.header),mapping,default,optional的设置获取channel。

这里看下org.apache.flume.channel.ReplicatingChannelSelector的实现,可以看出有两个channel列表,optional和require,分布对应getOptionalChannels和getRequiredChannels方法,如果设置了optional,optionalChannels为optional的设置,requiredChannels为getAllChannels的设置减去optionalChannels的设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   public  void  configure(Context context) {  //通过configure配置requiredChannels 
     String optionalList = context.getString(CONFIG_OPTIONAL);  //根据optional的设置
     requiredChannels =  new  ArrayList<Channel>(getAllChannels());  //初始时requiredChannels 即为getAllChannels
     Map<String, Channel> channelNameMap = getChannelNameMap();
     if (optionalList !=  null  && !optionalList.isEmpty()) {  //如果optional的设置不为空
       for (String optional : optionalList.split( "\\s+" )) {  //对optional按空格进行split
         Channel optionalChannel = channelNameMap.get(optional);
         requiredChannels.remove(optionalChannel);  //从requiredChannels 数组中去除optionalChannel
         if  (!optionalChannels.contains(optionalChannel)) {
           optionalChannels.add(optionalChannel);  //添加到optionalChannels
         }
       }
     }
   }

ChannelProcessor的初始调用在SourceRunner中,比如在org.apache.flume.source.EventDrivenSourceRunner的start方法:

1
2
3
4
5
6
7
   public  void  start() {
     Source source = getSource();  //通过getSource获取Source对象
     ChannelProcessor cp = source.getChannelProcessor();  //获取ChannelProcessor 对象
     cp.initialize();  //调用ChannelProcessor.initialize方法
     source.start();  //调用Source.start方法
     lifecycleState = LifecycleState. START;
   }

而在org.apache.flume.source.ExecSource.ExecRunnable类中会调用其processEventBatch方法,进行批量插入数据

1
2
3
4
5
6
7
8
            while  ((line = reader.readLine()) !=  null ) {
             counterGroup.incrementAndGet( "exec.lines.read"  );
             eventList.add(EventBuilder. withBody(line.getBytes(charset)));
             if (eventList.size() >= bufferCount ) {
               channelProcessor.processEventBatch(eventList);
               eventList.clear();
             }
           }

看下ChannelProcessor的具体实现:
首先两个重要的属性

1
2
   private  final  ChannelSelector selector ;
   private  final  InterceptorChain interceptorChain ;

initialize方法调用InterceptorChain.initialize方法,初始化interceptorChain

1
2
3
   public  void  initialize() {
     interceptorChain.initialize();
   }

configure方法调用configureInterceptors方法,用于根据interceptors设置InterceptorChain 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private  void  configureInterceptors(Context context) {
     List<Interceptor> interceptors = Lists.newLinkedList();
     String interceptorListStr = context.getString(  "interceptors" ""  );  //获取interceptors的设置
     if  (interceptorListStr.isEmpty()) {
       return ;
     }
     String[] interceptorNames = interceptorListStr.split(  "\\s+" );  //根据空格分隔
     Context interceptorContexts =
         new  Context(context.getSubProperties( "interceptors."  ));
     // run through and instantiate all the interceptors specified in the Context
     InterceptorBuilderFactory factory =  new  InterceptorBuilderFactory();
     for  (String interceptorName : interceptorNames) {
       Context interceptorContext =  new  Context(
           interceptorContexts.getSubProperties(interceptorName +  "." ));
       String type = interceptorContext.getString(  "type" );
       if  (type ==  null ) {
         LOG.error( "Type not specified for interceptor "  + interceptorName);
         throw  new  FlumeException( "Interceptor.Type not specified for "  +
           interceptorName);
       }
       try  {
         Interceptor.Builder builder = factory.newInstance(type);  //根据type的设置获取Interceptor
         builder.configure(interceptorContext);
         interceptors.add(builder.build());
......
     }
     interceptorChain.setInterceptors(interceptors);
   }

另外提供了两个插入数据的方法,processEventBatch和processEvent,processEventBatch用于插入一批Event(参数是List<Event> events),processEvent用于插入一个Event。
看下processEvent的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
   public  void  processEvent(Event event) {
     event = interceptorChain.intercept(event);  //调用InterceptorChain.intercept对Event进行处理
     if  (event ==  null ) {
       return ;
     }
     // Process required channels
     List<Channel> requiredChannels = selector.getRequiredChannels(event);  // 根据ChannelSelector获取requiredChannels 
     for  (Channel reqChannel : requiredChannels) {  // 对requiredChannels 中的每一个channel执行对应的put操作,每个操作都在一个事务内
       Transaction tx = reqChannel.getTransaction();
       Preconditions.checkNotNull(tx,  "Transaction object must not be null" );
       try  {
         tx.begin();
         reqChannel.put(event);
         tx.commit();
       catch  (Throwable t) {
         tx.rollback();
         if  (t  instanceof  Error) {
           LOG.error( "Error while writing to required channel: "  +
               reqChannel, t);
           throw  (Error) t;
         else  {
           throw  new  ChannelException( "Unable to put event on required "  +
               "channel: "  + reqChannel, t);
         }
       finally  {
         if  (tx !=  null ) {
           tx.close();
         }
       }
     }
     // Process optional channels
     List<Channel> optionalChannels = selector.getOptionalChannels(event);  //同样对optionalChannels做相同的操作
     for  (Channel optChannel : optionalChannels) {
       Transaction tx =  null ;
       try  {
         tx = optChannel.getTransaction();
         tx.begin();
         optChannel.put(event);
         tx.commit();
       catch  (Throwable t) {
         tx.rollback();
         LOG.error( "Unable to put event on optional channel: "  + optChannel, t);
         if  (t  instanceof  Error) {
           throw  (Error) t;
         }
       finally  {
         if  (tx !=  null ) {
           tx.close();
         }
       }
     }
   }