Flume-ng启动过程分析

简介: <div> <div style="orphans:2; widows:2; font-size:13px; margin:0px; font-family:Monaco,Menlo,'Ubuntu Mono',Consolas,source-code-pro,SimSun,Song,宋体,幼圆,Heiti,黑体,文泉驿等宽正黑,文泉驿正黑,monospace"> <h3 style=

Application.java

入口函数main():

    ...
    //加载flume的配置文件,初始化Sink,Source,Channel的工厂类
    PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile);
    application = new Application();
    //configurationProvider.getConfiguration()中实例化Sink,Source,Channel
    application.handleConfigurationEvent(configurationProvider.getConfiguration());-------getConfiguration------>
            //Map用于存储所有Sink,Source,Channel
            Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
            Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
            Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
            //先实例化channel
            loadChannels(agentConf, channelComponentMap);
            //将Source对应的channel注册到ChannelSelector,Source通过ChannelSelector获取Channel
            loadSources(agentConf, channelComponentMap, sourceRunnerMap);
            //向Sink注册Channel
            loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
            ...
            conf.addChannel(channelName, channelComponent.channel);
            ...
            for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
              conf.addSourceRunner(entry.getKey(), entry.getValue());
            }
            for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
              conf.addSinkRunner(entry.getKey(), entry.getValue());
            }
            ...
            return conf

    ...
    //application.handleConfigurationEvent(conf)---->
            stopAllComponents();
            startAllComponents(conf);
    final Application appReference = application;
    //关闭程序时,调用的钩子
    Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
    @Override
    public void run() {
      appReference.stop();
    }
  });

startAllComponents(conf):

//通过LifecycleSupervisor类启动组件//启动MonitorRunnable,监控Channelfor (Entry<String, Channel> entry :
      materializedConfiguration.getChannels().entrySet()) {
      try{
        logger.info("Starting Channel " + entry.getKey());
        supervisor.supervise(entry.getValue(),
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e){
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    //等待启动
    for(Channel ch: materializedConfiguration.getChannels().values()){
      while(ch.getLifecycleState() != LifecycleState.START
          && !supervisor.isComponentInErrorState(ch)){
        try {
          logger.info("Waiting for channel: " + ch.getName() +
              " to start. Sleeping for 500 ms");
          Thread.sleep(500);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for channel to start.", e);
          Throwables.propagate(e);
        }
      }
    }

    //启动MonitorRunnable,监控sink
    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
        .entrySet()) {
      try{
        logger.info("Starting Sink " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    //启动MonitorRunnable,监控source
    for (Entry<String, SourceRunner> entry : materializedConfiguration
        .getSourceRunners().entrySet()) {
      try{
        logger.info("Starting Source " + entry.getKey());
        supervisor.supervise(entry.getValue(),
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      } catch (Exception e) {
        logger.error("Error while starting {}", entry.getValue(), e);
      }
    }

    this.loadMonitoring();

LifecycleSupervisor.java

负责启动和监控Flume组件的类,功能如:失败重启组件
LifecycleSupervisor内部比较重要的几个变量:

//监控进程的线程池
ScheduledThreadPoolExecutor monitorService
Map<LifecycleAware, ScheduledFuture<?>> monitorFutures
Map<LifecycleAware, Supervisoree> supervisedProcesses
//启动监控
  public synchronized void supervise(LifecycleAware lifecycleAware,
      SupervisorPolicy policy, LifecycleState desiredState) {
    ...

    //组件状态
    Supervisoree process = new Supervisoree();
    process.status = new Status();
    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    //监控线程,调用启动组件的线程。比如
    MonitorRunnable monitorRunnable = new MonitorRunnable();---->
            lifecycleAware.start();--->
                //如果是sink
                sinRunner.start()---->
                    runnerThread = new Thread(runner);
                    runnerThread.setName("SinkRunner-PollingRunner-" +
                    policy.getClass().getSimpleName());
                    runnerThread.start();---->
                        //在新线程里循环调用
                        DefaultSinkProcessor.process();---->
                            //sink从channel中取数据,进行处理
                            sink.process();



    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    //每隔三秒监控组件运行状况
    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);
  }


目录
相关文章
|
4月前
|
消息中间件 存储 分布式计算
【Flume】Flume配置文件详细分析
【4月更文挑战第4天】【Flume】Flume配置文件详细分析
|
4月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
4月前
|
存储 消息中间件 缓存
【Flume】Flume Agent的内部原理分析
【4月更文挑战第4天】【Flume】Flume Agent的内部原理分析
|
4月前
|
存储 消息中间件 监控
【Flume】Flume在大数据分析领域的应用
【4月更文挑战第4天】【Flume】Flume在大数据分析领域的应用
|
28天前
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
33 3
|
28天前
|
数据采集 存储 Java
Flume Agent 的内部原理分析:深入探讨 Flume 的架构与实现机制
【8月更文挑战第24天】Apache Flume是一款专为大规模日志数据的收集、聚合及传输而设计的分布式、可靠且高可用系统。本文深入解析Flume Agent的核心机制并提供实际配置与使用示例。Flume Agent由三大组件构成:Source(数据源)、Channel(数据缓存)与Sink(数据目的地)。工作流程包括数据采集、暂存及传输。通过示例配置文件和Java代码片段展示了如何设置这些组件以实现日志数据的有效管理。Flume的强大功能与灵活性使其成为大数据处理及实时数据分析领域的优选工具。
56 1
|
4月前
|
存储 消息中间件 Kafka
【Flume】Flume 核心组件分析
【4月更文挑战第4天】【Flume】Flume 核心组件分析
|
4月前
|
监控 Apache
【Flume】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
【4月更文挑战第4天】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
|
4月前
|
存储 监控 数据库
【Flume】 Flume 断点续传原理分析
【4月更文挑战第4天】【Flume】 Flume 断点续传原理分析
|
4月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
106 0