flume可以监控并管理组件的运行状态,在组件关闭的时候可以自动拉起来,原理是通过启动一个计划任务线程池(monitorService ,线程的最大数量为30),运行监控线程(MonitorRunnable线程),每隔3s判断组件(包括Channel,SinkRunner)的状态是否符合要求(可用的状态由两种START和STOP),根据不同的要求调用对应组件不同的方法,START会调用start方法,STOP会调用stop方法,如果想监控一个组件的状态,只需对这个组件调用supervise方法即可,如果想停止监控一个组件,只需对这个组件调用unsupervise方法即可,同时有一个线程每隔两小时移除已经不再监控(调用了unsupervise方法)的组件的检查任务。

这个功能主要是通过org.apache.flume.lifecycle.LifecycleSupervisor实现
在org.apache.flume.node.Application类的构造函数中会初始化LifecycleSupervisor类的对象:

1
2
3
4
   public  Application(List<LifecycleAware> components) {
     this . components = components;
     supervisor =  new  LifecycleSupervisor();
   }

flume进程启动时调用

1
2
3
4
5
6
7
org.apache.flume.node.Application.main--->org.apache.flume.node.Application.start
   public  synchronized  void  start() {  //start方法会对每一个组件调用LifecycleSupervisor.supervise方法,参数为组件,AlwaysRestartPolicy和START状态(即期望的状态为START)
     for (LifecycleAware component : components) {
       supervisor.supervise(component,
           new  SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); 
     }
   }

分析LifecycleSupervisor类:
org.apache.flume.lifecycle.LifecycleSupervisor实现了LifecycleAware接口,本身也有一个生命周期的概念(提供start/stop等方法)

其定义了几个重要的内部类:
1.MonitorRunnable,实现了Runnable接口的线程类
1)3个属性ScheduledExecutorService monitorService,LifecycleAware lifecycleAware,Supervisoree supervisoree;
2)主要的run方法分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
     public  void  run() {
       long  now = System.currentTimeMillis();
       try  {
         if  (supervisoree.status.firstSeen ==  null ) {
           logger.debug( "first time seeing {}" , lifecycleAware);
           supervisoree.status.firstSeen = now;  //第一次开始运行时,设置firstSeen为System.currentTimeMillis()
         }
         supervisoree.status.lastSeen = now;  //设置lastSeen为now
         synchronized  (lifecycleAware) {
           if  (supervisoree.status.discard) {  //如果Status的discard或者error的值为true,会直接退出
...
             return ;
           else  if  (supervisoree.status.error) {
...
             return ;
           }
           supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();  //设置lastSeenState的值
           if  (!lifecycleAware.getLifecycleState().equals(
               supervisoree.status.desiredState)) {  //如果获取的lifecycleAware对象状态不是想设置的desiredState状态
...
             switch  (supervisoree.status.desiredState) {  //根据设置的desiredState状态调用lifecycleAware的不同方法,desiredState的值只有两种START和STOP
               case  START:
                 try  {
                   lifecycleAware.start();  //状态为START时设置运行start方法
                 catch  (Throwable e) {
...
                   supervisoree.status.failures++;  //start方法异常时failures的值加1
                 }
                 break ;
               case  STOP:
                 try  {
                   lifecycleAware.stop();  //状态为STOP时设置运行stop方法
                 catch  (Throwable e) {
...
                   supervisoree.status.failures++;  //stop方法异常时failures的值加1
                 }
                 break ;
               default :
...
             }
             if  (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
                //调用SupervisorPolicy的isValid方法,比如OnceOnlyPolicy 的isValid的方法会判断Status.failures 的值,如果为0则返回true,否则返回false
               logger.error(
                   "Policy {} of {} has been violated - supervisor should exit!" ,
                   supervisoree.policy, lifecycleAware);
             }
           }
         }
       catch (Throwable t) {
...      
       }
...
     }

2.Purger,实现了Runnable接口的线程类
run方法:

1
2
3
4
5
6
     public  void  run() { 
       if (needToPurge){   //如果needToPurge设置为true
         monitorService.purge();  //ScheduledThreadPoolExecutor.purge方法用于从工作队列中删除已经cancel的java.util.concurrent.Future对象(释放队列空间)
         needToPurge =  false  //并设置needToPurge为false
       }
     }

3.Status内部类定义了几个状态属性,代表了Supervisoree的状态

1
2
3
4
5
6
7
     public  Long firstSeen;
     public  Long lastSeen;
     public  LifecycleState lastSeenState;
     public  LifecycleState desiredState;
     public  int  failures ;
     public  boolean  discard ;
     public  volatile  boolean  error ;

4. SupervisorPolicy 是抽象类,定义了抽象方法isValid(LifecycleAware object, Status status),包含两个扩展类AlwaysRestartPolicy 和OnceOnlyPolicy 
AlwaysRestartPolicy 的isValid会一直返回true,OnceOnlyPolicy 的isValid的方法会判断Status.failures 的值,如果为0则返回true,否则返回false

5.Supervisoree包含SupervisorPolicy 和Status属性


主要的方法分析:
在构造方法中初始化几个重要的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
   public  LifecycleSupervisor() {
     lifecycleState = LifecycleState.IDLE;
     supervisedProcesses =  new  HashMap<LifecycleAware, Supervisoree>();  // supervisedProcesses 用于存放LifecycleAware和Supervisoree对象的键值对,代表已经管理的组件
     monitorFutures =  new  HashMap<LifecycleAware, ScheduledFuture<?>>();  //monitorFutures 用于存放LifecycleAware对象和ScheduledFuture对象的键值对
     monitorService =  new  ScheduledThreadPoolExecutor( 10 ,
         new  ThreadFactoryBuilder().setNameFormat(
             "lifecycleSupervisor-"  + Thread.currentThread().getId() +  "-%d" )
             .build());  // monitorService 用于调用Purger线程,定时移除线程池中已经cancel的task
     monitorService.setMaximumPoolSize( 20 );
     monitorService.setKeepAliveTime( 30 , TimeUnit.SECONDS);
     purger =  new  Purger();
     needToPurge =  false // 初始时为false,在有task cancel的时候设置为true
   }

  
start方法用于启动检测线程池:

1
2
3
4
5
6
   public  synchronized  void  start() {
....
     monitorService.scheduleWithFixedDelay( purger,  2 2 , TimeUnit. HOURS);  //在两小时后每隔两小时运行一次Purger,释放线程池的工作队列
     lifecycleState = LifecycleState. START;  //设置状态为START
...
   }


stop方法首先关闭线程池,然后关闭各个组件
1)线程池关闭

1
monitorService.shutdown();

2)各组件关闭

1
2
3
4
5
6
7
    for  final  Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
         .entrySet()) {  //遍历supervisedProcesses中的各个组件
       if  (entry.getKey(). getLifecycleState().equals(LifecycleState.START)) {  //如果组件的当前状态是START,则首先设置其需要变成的状态为STOP,并调用组件的stop方法
         entry.getValue(). status. desiredState = LifecycleState.STOP; 
         entry.getKey().stop();
       }
     }


supervise方法用于监控对应的组件,有3个参数LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  public  synchronized  void  supervise(LifecycleAware lifecycleAware,
       SupervisorPolicy policy, LifecycleState desiredState) {
     if ( this . monitorService.isShutdown()
         ||  this .monitorService .isTerminated()
         ||  this .monitorService .isTerminating()){  //检测监控线程池是否正常
       throw  new  FlumeException( "Supervise called on "  + lifecycleAware +  " "  +
           "after shutdown has been initiated. "  + lifecycleAware +  " will not"  +
           " be started" );
     }
     Preconditions.checkState(!supervisedProcesses .containsKey(lifecycleAware),
         "Refusing to supervise "  + lifecycleAware +  " more than once"  );  //检测是否已经管理
.....
     Supervisoree process =  new  Supervisoree();  //初始化Supervisoree对象
     process.status =  new  Status();  //并实例化Supervisoree对象的Status属性
     process.policy = policy;  //设置Supervisoree的属性
     process.status.desiredState = desiredState;
     process.status.error =  false ;
     MonitorRunnable monitorRunnable =  new  MonitorRunnable();  //初始化一个MonitorRunnable 对象(线程),并设置对象的属性
     monitorRunnable.lifecycleAware = lifecycleAware;
     monitorRunnable.supervisoree = process;
     monitorRunnable.monitorService = monitorService;
     supervisedProcesses.put(lifecycleAware, process);  //向supervisedProcesses中插入键值对,代表已经开始管理的组件
     ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
         monitorRunnable,  0 3 , TimeUnit. SECONDS);  // 设置计划任务线程池,每隔3s之后运行monitorRunnable
     monitorFutures.put(lifecycleAware, future);  // 向monitorFutures中插入键值对
   }


unsupervise方法用于停止组件并从监控容器中去除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
     synchronized  (lifecycleAware) {
     Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);  //从已经管理的Supervisoree  hashmap中获取Supervisoree对象
     supervisoree.status.discard =  true ;   //设置Supervisoree对象的Status属性的discard 值为discard
       this .setDesiredState(lifecycleAware, LifecycleState.STOP); 
       //调用setDesiredState方法,设置Supervisoree对象的Status属性的desiredState 值为STOP(supervisoree.status.desiredState = desiredState)
       logger.info( "Stopping component: {}" , lifecycleAware);
       lifecycleAware.stop();  //调用组件的stop方法
     }
     supervisedProcesses.remove(lifecycleAware);  //从supervisedProcesses hashmap中移除这个组件
     monitorFutures.get(lifecycleAware).cancel( false ); 
     //调用组件对应的ScheduledFuture的cancel方法取消任务(A Future represents the result of an asynchronous computation.
  ,cancel :Attempts to cancel execution of  this  task.)
     needToPurge =  true //设置needToPurge 的属性为true,这样就可以在purge中删除已经cancel的ScheduledFuture对象
     monitorFutures.remove(lifecycleAware);