flume可以监控并管理组件的运行状态,在组件关闭的时候可以自动拉起来,原理是通过启动一个计划任务线程池(monitorService ,线程的最大数量为30),运行监控线程(MonitorRunnable线程),每隔3s判断组件(包括Channel,SinkRunner)的状态是否符合要求(可用的状态由两种START和STOP),根据不同的要求调用对应组件不同的方法,START会调用start方法,STOP会调用stop方法,如果想监控一个组件的状态,只需对这个组件调用supervise方法即可,如果想停止监控一个组件,只需对这个组件调用unsupervise方法即可,同时有一个线程每隔两小时移除已经不再监控(调用了unsupervise方法)的组件的检查任务。
这个功能主要是通过org.apache.flume.lifecycle.LifecycleSupervisor实现
在org.apache.flume.node.Application类的构造函数中会初始化LifecycleSupervisor类的对象:
1
2
3
4
|
public
Application(List<LifecycleAware> components) {
this
. components = components;
supervisor =
new
LifecycleSupervisor();
}
|
flume进程启动时调用
1
2
3
4
5
6
7
|
org.apache.flume.node.Application.main--->org.apache.flume.node.Application.start
public
synchronized
void
start() {
//start方法会对每一个组件调用LifecycleSupervisor.supervise方法,参数为组件,AlwaysRestartPolicy和START状态(即期望的状态为START)
for
(LifecycleAware component : components) {
supervisor.supervise(component,
new
SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
|
分析LifecycleSupervisor类:
org.apache.flume.lifecycle.LifecycleSupervisor实现了LifecycleAware接口,本身也有一个生命周期的概念(提供start/stop等方法)
其定义了几个重要的内部类:
1.MonitorRunnable,实现了Runnable接口的线程类
1)3个属性ScheduledExecutorService monitorService,LifecycleAware lifecycleAware,Supervisoree supervisoree;
2)主要的run方法分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
public
void
run() {
long
now = System.currentTimeMillis();
try
{
if
(supervisoree.status.firstSeen ==
null
) {
logger.debug(
"first time seeing {}"
, lifecycleAware);
supervisoree.status.firstSeen = now;
//第一次开始运行时,设置firstSeen为System.currentTimeMillis()
}
supervisoree.status.lastSeen = now;
//设置lastSeen为now
synchronized
(lifecycleAware) {
if
(supervisoree.status.discard) {
//如果Status的discard或者error的值为true,会直接退出
...
return
;
}
else
if
(supervisoree.status.error) {
...
return
;
}
supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
//设置lastSeenState的值
if
(!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
//如果获取的lifecycleAware对象状态不是想设置的desiredState状态
...
switch
(supervisoree.status.desiredState) {
//根据设置的desiredState状态调用lifecycleAware的不同方法,desiredState的值只有两种START和STOP
case
START:
try
{
lifecycleAware.start();
//状态为START时设置运行start方法
}
catch
(Throwable e) {
...
supervisoree.status.failures++;
//start方法异常时failures的值加1
}
break
;
case
STOP:
try
{
lifecycleAware.stop();
//状态为STOP时设置运行stop方法
}
catch
(Throwable e) {
...
supervisoree.status.failures++;
//stop方法异常时failures的值加1
}
break
;
default
:
...
}
if
(!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
//调用SupervisorPolicy的isValid方法,比如OnceOnlyPolicy 的isValid的方法会判断Status.failures 的值,如果为0则返回true,否则返回false
logger.error(
"Policy {} of {} has been violated - supervisor should exit!"
,
supervisoree.policy, lifecycleAware);
}
}
}
}
catch
(Throwable t) {
...
}
...
}
|
2.Purger,实现了Runnable接口的线程类
run方法:
1
2
3
4
5
6
|
public
void
run() {
if
(needToPurge){
//如果needToPurge设置为true
monitorService.purge();
//ScheduledThreadPoolExecutor.purge方法用于从工作队列中删除已经cancel的java.util.concurrent.Future对象(释放队列空间)
needToPurge =
false
;
//并设置needToPurge为false
}
}
|
3.Status内部类定义了几个状态属性,代表了Supervisoree的状态
1
2
3
4
5
6
7
|
public
Long firstSeen;
public
Long lastSeen;
public
LifecycleState lastSeenState;
public
LifecycleState desiredState;
public
int
failures ;
public
boolean
discard ;
public
volatile
boolean
error ;
|
4. SupervisorPolicy 是抽象类,定义了抽象方法isValid(LifecycleAware object, Status status),包含两个扩展类AlwaysRestartPolicy 和OnceOnlyPolicy
AlwaysRestartPolicy 的isValid会一直返回true,OnceOnlyPolicy 的isValid的方法会判断Status.failures 的值,如果为0则返回true,否则返回false
5.Supervisoree包含SupervisorPolicy 和Status属性
主要的方法分析:
在构造方法中初始化几个重要的属性:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
LifecycleSupervisor() {
lifecycleState = LifecycleState.IDLE;
supervisedProcesses =
new
HashMap<LifecycleAware, Supervisoree>();
// supervisedProcesses 用于存放LifecycleAware和Supervisoree对象的键值对,代表已经管理的组件
monitorFutures =
new
HashMap<LifecycleAware, ScheduledFuture<?>>();
//monitorFutures 用于存放LifecycleAware对象和ScheduledFuture对象的键值对
monitorService =
new
ScheduledThreadPoolExecutor(
10
,
new
ThreadFactoryBuilder().setNameFormat(
"lifecycleSupervisor-"
+ Thread.currentThread().getId() +
"-%d"
)
.build());
// monitorService 用于调用Purger线程,定时移除线程池中已经cancel的task
monitorService.setMaximumPoolSize(
20
);
monitorService.setKeepAliveTime(
30
, TimeUnit.SECONDS);
purger =
new
Purger();
needToPurge =
false
;
// 初始时为false,在有task cancel的时候设置为true
}
|
start方法用于启动检测线程池:
1
2
3
4
5
6
|
public
synchronized
void
start() {
....
monitorService.scheduleWithFixedDelay( purger,
2
,
2
, TimeUnit. HOURS);
//在两小时后每隔两小时运行一次Purger,释放线程池的工作队列
lifecycleState = LifecycleState. START;
//设置状态为START
...
}
|
stop方法首先关闭线程池,然后关闭各个组件
1)线程池关闭
1
|
monitorService.shutdown();
|
2)各组件关闭
1
2
3
4
5
6
7
|
for
(
final
Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
.entrySet()) {
//遍历supervisedProcesses中的各个组件
if
(entry.getKey(). getLifecycleState().equals(LifecycleState.START)) {
//如果组件的当前状态是START,则首先设置其需要变成的状态为STOP,并调用组件的stop方法
entry.getValue(). status. desiredState = LifecycleState.STOP;
entry.getKey().stop();
}
}
|
supervise方法用于监控对应的组件,有3个参数LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public
synchronized
void
supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
if
(
this
. monitorService.isShutdown()
||
this
.monitorService .isTerminated()
||
this
.monitorService .isTerminating()){
//检测监控线程池是否正常
throw
new
FlumeException(
"Supervise called on "
+ lifecycleAware +
" "
+
"after shutdown has been initiated. "
+ lifecycleAware +
" will not"
+
" be started"
);
}
Preconditions.checkState(!supervisedProcesses .containsKey(lifecycleAware),
"Refusing to supervise "
+ lifecycleAware +
" more than once"
);
//检测是否已经管理
.....
Supervisoree process =
new
Supervisoree();
//初始化Supervisoree对象
process.status =
new
Status();
//并实例化Supervisoree对象的Status属性
process.policy = policy;
//设置Supervisoree的属性
process.status.desiredState = desiredState;
process.status.error =
false
;
MonitorRunnable monitorRunnable =
new
MonitorRunnable();
//初始化一个MonitorRunnable 对象(线程),并设置对象的属性
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
//向supervisedProcesses中插入键值对,代表已经开始管理的组件
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable,
0
,
3
, TimeUnit. SECONDS);
// 设置计划任务线程池,每隔3s之后运行monitorRunnable
monitorFutures.put(lifecycleAware, future);
// 向monitorFutures中插入键值对
}
|
unsupervise方法用于停止组件并从监控容器中去除:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
synchronized
(lifecycleAware) {
Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
//从已经管理的Supervisoree hashmap中获取Supervisoree对象
supervisoree.status.discard =
true
;
//设置Supervisoree对象的Status属性的discard 值为discard
this
.setDesiredState(lifecycleAware, LifecycleState.STOP);
//调用setDesiredState方法,设置Supervisoree对象的Status属性的desiredState 值为STOP(supervisoree.status.desiredState = desiredState)
logger.info(
"Stopping component: {}"
, lifecycleAware);
lifecycleAware.stop();
//调用组件的stop方法
}
supervisedProcesses.remove(lifecycleAware);
//从supervisedProcesses hashmap中移除这个组件
monitorFutures.get(lifecycleAware).cancel(
false
);
//调用组件对应的ScheduledFuture的cancel方法取消任务(A Future represents the result of an asynchronous computation.
,cancel :Attempts to cancel execution of
this
task.)
needToPurge =
true
;
//设置needToPurge 的属性为true,这样就可以在purge中删除已经cancel的ScheduledFuture对象
monitorFutures.remove(lifecycleAware);
|