flume源码分析1-启动过程

简介:

 1.启动命令

1
nohup bin/flume-ng agent -n agent-server  -f  agent-server1.conf &

flume-ng是一个shell脚本:

1
2
   agent                 run a Flume agent  ---> org.apache.flume.node.Application 类
   avro-client           run an avro Flume client ---> org.apache.flume.client.avro.AvroCLIClient 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
run_flume() { #shell脚本实现
   local FLUME_APPLICATION_CLASS
   if  "$#"  -gt  0  ]; then
     FLUME_APPLICATION_CLASS=$ 1
     shift
   else
     error  "Must specify flume application class"  1
   fi
   if  [ ${CLEAN_FLAG} -ne  0  ]; then
     set -x
   fi
   $EXEC $JAVA_HOME/bin/java $JAVA_OPTS -cp  "$FLUME_CLASSPATH"  \
       -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH  "$FLUME_APPLICATION_CLASS"  $*
}
......
finally , invoke the appropriate command
if  [ -n  "$opt_agent"  ] ; then  #如果第一个参数为agent时,opt_agent取值为 1
   run_flume $FLUME_AGENT_CLASS $args #FLUME_AGENT_CLASS= "org.apache.flume.node.Application"
elif [ -n  "$opt_avro_client"  ] ; then
   run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n  "${opt_version}"  ] ; then
   run_flume $FLUME_VERSION_CLASS $args
elif [ -n  "${opt_tool}"  ] ; then
   run_flume $FLUME_TOOLS_CLASS $args
else
   error  "This message should never appear"  1
fi

最终启动的时候调用org.apache.flume.node.Application类的main方法
2.org.apache.flume.node.Application类
1)调用main方法,首先会解析参数,主要是n和f以及no-reload-conf,n为节点名称,f为配置文件,no-reload-conf代表是否支持自动reload(1.5.0才有的功能)
n/f 都有设置的值,no-reload-conf没有设置的项,如果设置了no-reload-conf代表不能自动reload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
       Options options =  new  Options();
       Option option =  new  Option( "n"  "name"  true "the name of this agent" );
       option.setRequired(  true );
       options.addOption(option);
       option =  new  Option( "f"  "conf-file"  true "specify a conf file" );
       option.setRequired(  true );
       options.addOption(option);
       option =  new  Option( null  "no-reload-conf"  false "do not reload "  +
         "conf file if changed" );
       options.addOption(option);
....
       CommandLineParser parser =  new  GnuParser();
       CommandLine commandLine = parser.parse(options, args);
       File configurationFile =  new  File(commandLine.getOptionValue( 'f'  ));
       String agentName = commandLine.getOptionValue(  'n' );
       boolean  reload = !commandLine.hasOption(  "no-reload-conf" );   //获取是否含有no-reload-conf的设置,如果没有设置no-reload-conf则reload为true

2)   

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  List<LifecycleAware> components = Lists.newArrayList();  //初始化一个List<LifecycleAware>对象,用来存放需要启动的组件,这个只有在支持reload的情况才会使用
       Application application;
       if (reload) {
         EventBus eventBus =  new  EventBus(agentName +  "-event-bus"  );
         PollingPropertiesFileConfigurationProvider configurationProvider =
             new  PollingPropertiesFileConfigurationProvider(agentName,
                 configurationFile, eventBus,  30 );
         components.add(configurationProvider);
         application =  new  Application(components);
         eventBus.register(application);
       else  //不知道reload的情况
         PropertiesFileConfigurationProvider configurationProvider =
             new  PropertiesFileConfigurationProvider(agentName,
                 configurationFile);  //实例化一个PropertiesFileConfigurationProvider 对象,参数是agent的名称和配置文件(即n和f的设置)
         application =  new  Application();  //实例化一个Application对象
         application.handleConfigurationEvent(configurationProvider.getConfiguration());  //调用handleConfigurationEvent方法
       }
       application.start();  // 调用start方法

不支持reload的启动方法调用:

1
main--->handleConfigurationEvent-->stopAllComponents+startAllComponents-->start

3)handleConfigurationEvent方法调用stopAllComponents和startAllComponents方法

1
2
3
4
   public  synchronized  void  handleConfigurationEvent(MaterializedConfiguration conf) {
     stopAllComponents();  //用于
     startAllComponents(conf);
   }

这里handleConfigurationEvent方法的参数为MaterializedConfiguration对象(这里为SimpleMaterializedConfiguration实例)
MaterializedConfiguration对象由AbstractConfigurationProvider.getConfiguration方法返回,在AbstractConfigurationProvider.getConfiguration方法中通过
调用loadChannels/loadSources/loadSinks方法来解析flume的配置文件,同时把对应的Channel,SourceRunner,SinkRunner放到对应的hashmap中,并最终通过SimpleMaterializedConfiguration的addChannel/addSourceRunner/addSinkRunner加载到SimpleMaterializedConfiguration对象中,然后供stopAllComponents/startAllComponents使用

stopAllComponents方法用于关闭所有的组件,
其通过调用MaterializedConfiguration对象(这里具体实现类为SimpleMaterializedConfiguration)的getSourceRunners和getChannels来获取需要关闭的SourceRunner和Channel组件对象,然后对各个组件对象调用LifecycleSupervisor.unsupervise来关闭组件,而startAllComponents正好相反,其对各个组件对象调用LifecycleSupervisor.supervise方法用于启动各个组件服务,另外
startAllComponents方法会调用this.loadMonitoring()方法启动监控flume的metrics的服务(而支持reload的方式不会调用这个方法)

4)start方法会对每一个组件调用LifecycleSupervisor.supervise方法,来进行服务的状态管理(在服务异常时可以自动拉起),这个主要是对支持reload的设置有用,
用来启动检测文件更新的计划任务线程池

1
2
3
4
5
   public  synchronized  void  start() {
     for (LifecycleAware component : components) {
       supervisor.supervise(component,
           new  SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
     }

  }
supervise的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619527)
支持reload的启动方法调用:main--->EventBus.register-->start方法
reload的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619523)



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1619532,如需转载请自行联系原作者

相关文章
|
Java Shell Apache
Flume-NG源码分析-整体结构及配置载入分析
弦外之音 很多朋友都在问我,经常看各种框架的源码会不会感到很枯燥,是什么东西在驱动着我一直看下去。其实我想说的很简单,作为一个程序员,不管你工作了多少年,能够经常学习和借鉴国内外优秀框架设计思想和程序架构,我想对我们来说是最直接的提高。
1186 0
|
6月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
6月前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
3月前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
76 0
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
47 2
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
3月前
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
88 3
|
6月前
|
SQL 数据采集 数据挖掘
nginx+flume网络流量日志实时数据分析实战
nginx+flume网络流量日志实时数据分析实战
184 0