1.启动命令
1
|
nohup bin/flume-ng agent -n agent-server -f agent-server1.conf &
|
flume-ng是一个shell脚本:
1
2
|
agent run a Flume agent ---> org.apache.flume.node.Application 类
avro-client run an avro Flume client ---> org.apache.flume.client.avro.AvroCLIClient 类
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
run_flume() { #shell脚本实现
local FLUME_APPLICATION_CLASS
if
[
"$#"
-gt
0
]; then
FLUME_APPLICATION_CLASS=$
1
shift
else
error
"Must specify flume application class"
1
fi
if
[ ${CLEAN_FLAG} -ne
0
]; then
set -x
fi
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS -cp
"$FLUME_CLASSPATH"
\
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH
"$FLUME_APPLICATION_CLASS"
$*
}
......
#
finally
, invoke the appropriate command
if
[ -n
"$opt_agent"
] ; then #如果第一个参数为agent时,opt_agent取值为
1
run_flume $FLUME_AGENT_CLASS $args #FLUME_AGENT_CLASS=
"org.apache.flume.node.Application"
elif [ -n
"$opt_avro_client"
] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n
"${opt_version}"
] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n
"${opt_tool}"
] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error
"This message should never appear"
1
fi
|
最终启动的时候调用org.apache.flume.node.Application类的main方法
2.org.apache.flume.node.Application类
1)调用main方法,首先会解析参数,主要是n和f以及no-reload-conf,n为节点名称,f为配置文件,no-reload-conf代表是否支持自动reload(1.5.0才有的功能)
n/f 都有设置的值,no-reload-conf没有设置的项,如果设置了no-reload-conf代表不能自动reload
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
Options options =
new
Options();
Option option =
new
Option(
"n"
,
"name"
,
true
,
"the name of this agent"
);
option.setRequired(
true
);
options.addOption(option);
option =
new
Option(
"f"
,
"conf-file"
,
true
,
"specify a conf file"
);
option.setRequired(
true
);
options.addOption(option);
option =
new
Option(
null
,
"no-reload-conf"
,
false
,
"do not reload "
+
"conf file if changed"
);
options.addOption(option);
....
CommandLineParser parser =
new
GnuParser();
CommandLine commandLine = parser.parse(options, args);
File configurationFile =
new
File(commandLine.getOptionValue(
'f'
));
String agentName = commandLine.getOptionValue(
'n'
);
boolean
reload = !commandLine.hasOption(
"no-reload-conf"
);
//获取是否含有no-reload-conf的设置,如果没有设置no-reload-conf则reload为true
|
2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
List<LifecycleAware> components = Lists.newArrayList();
//初始化一个List<LifecycleAware>对象,用来存放需要启动的组件,这个只有在支持reload的情况才会使用
Application application;
if
(reload) {
EventBus eventBus =
new
EventBus(agentName +
"-event-bus"
);
PollingPropertiesFileConfigurationProvider configurationProvider =
new
PollingPropertiesFileConfigurationProvider(agentName,
configurationFile, eventBus,
30
);
components.add(configurationProvider);
application =
new
Application(components);
eventBus.register(application);
}
else
{
//不知道reload的情况
PropertiesFileConfigurationProvider configurationProvider =
new
PropertiesFileConfigurationProvider(agentName,
configurationFile);
//实例化一个PropertiesFileConfigurationProvider 对象,参数是agent的名称和配置文件(即n和f的设置)
application =
new
Application();
//实例化一个Application对象
application.handleConfigurationEvent(configurationProvider.getConfiguration());
//调用handleConfigurationEvent方法
}
application.start();
// 调用start方法
|
不支持reload的启动方法调用:
1
|
main--->handleConfigurationEvent-->stopAllComponents+startAllComponents-->start
|
3)handleConfigurationEvent方法调用stopAllComponents和startAllComponents方法
1
2
3
4
|
public
synchronized
void
handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
//用于
startAllComponents(conf);
}
|
这里handleConfigurationEvent方法的参数为MaterializedConfiguration对象(这里为SimpleMaterializedConfiguration实例)
MaterializedConfiguration对象由AbstractConfigurationProvider.getConfiguration方法返回,在AbstractConfigurationProvider.getConfiguration方法中通过
调用loadChannels/loadSources/loadSinks方法来解析flume的配置文件,同时把对应的Channel,SourceRunner,SinkRunner放到对应的hashmap中,并最终通过SimpleMaterializedConfiguration的addChannel/addSourceRunner/addSinkRunner加载到SimpleMaterializedConfiguration对象中,然后供stopAllComponents/startAllComponents使用
stopAllComponents方法用于关闭所有的组件,
其通过调用MaterializedConfiguration对象(这里具体实现类为SimpleMaterializedConfiguration)的getSourceRunners和getChannels来获取需要关闭的SourceRunner和Channel组件对象,然后对各个组件对象调用LifecycleSupervisor.unsupervise来关闭组件,而startAllComponents正好相反,其对各个组件对象调用LifecycleSupervisor.supervise方法用于启动各个组件服务,另外
startAllComponents方法会调用this.loadMonitoring()方法启动监控flume的metrics的服务(而支持reload的方式不会调用这个方法)
4)start方法会对每一个组件调用LifecycleSupervisor.supervise方法,来进行服务的状态管理(在服务异常时可以自动拉起),这个主要是对支持reload的设置有用,
用来启动检测文件更新的计划任务线程池
1
2
3
4
5
|
public
synchronized
void
start() {
for
(LifecycleAware component : components) {
supervisor.supervise(component,
new
SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
|
}
supervise的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619527)
支持reload的启动方法调用:main--->EventBus.register-->start方法
reload的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619523)
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1619532,如需转载请自行联系原作者