flume channel monitor实现源码分析

简介:

  对于flume的监控,只需要监控channel的性能数据即可,source和sink的性能一部分可以从channel中表现出来。
以MemoryChannel为例,在MemoryTransaction的构造函数中会实例化一个org.apache.flume.instrumentation.ChannelCounter对象

1
2
3
4
5
     public  MemoryTransaction(  int  transCapacity, ChannelCounter counter) {
       putList =  new  LinkedBlockingDeque<Event>(transCapacity);
       takeList =  new  LinkedBlockingDeque<Event>(transCapacity);
       channelCounter = counter;
     }

org.apache.flume.instrumentation.ChannelCounter 定义了几个计数器用来记录channel的性能数据

1
2
3
4
5
6
   private  static  final  String COUNTER_CHANNEL_SIZE =  "channel.current.size" //已经使用的容量大小
   private  static  final  String COUNTER_EVENT_PUT_ATTEMPT =  "channel.event.put.attempt" //source到channel尝试插入的数据(不管是否成功)
   private  static  final  String COUNTER_EVENT_TAKE_ATTEMPT =  "channel.event.take.attempt" //sink从channel尝试消费的数据(不管是否成功)
   private  static  final  String COUNTER_EVENT_PUT_SUCCESS =  "channel.event.put.success" //source到channel成功插入的数据
   private  static  final  String COUNTER_EVENT_TAKE_SUCCESS =  "channel.event.take.success" //sink从channel成功消费的数据
   private  static  final  String COUNTER_CHANNEL_CAPACITY =  "channel.capacity" ;   //总容量大小

并封装了相关的方法,来操作这些性能计数器:

wKiom1T1w3CynIp-AAFVrItasOA700.jpg

比如channel.event.put.attempt的由getEventPutAttemptCount和incrementEventPutAttemptCount操作:

1
2
3
4
5
6
   public  long  incrementEventPutAttemptCount() {  //用于数量增加1
     return  increment( COUNTER_EVENT_PUT_ATTEMPT);
   }
   public  long  getEventPutAttemptCount() {  //用于获取值
     return  get( COUNTER_EVENT_PUT_ATTEMPT);
   }

而在channel的相关操作中会使用到这些方法:

1
2
3
4
5
6
7
8
9
10
11
     protected  void  doPut(Event event)  throws  InterruptedException {
       channelCounter.incrementEventPutAttemptCount();  //比如在插入数据的操作开始时,增加channel.event.put.attempt的值
       int  eventByteSize = ( int )Math.ceil(estimateEventSize(event)/ byteCapacitySlotSize);
       if  (! putList.offer(event)) {
         throw  new  ChannelException(
           "Put queue for MemoryTransaction of capacity "  +
             putList.size() +  " full, consider committing more frequently, "  +
             "increasing capacity or increasing thread count"  );
       }
       putByteCounter += eventByteSize;
     }


counter的注册使用,以memorychannel相关为例:
ChannelCounter扩展了MonitoredCounterGroup类并实现了ChannelCounterMBean接口
MonitoredCounterGroup是一个抽象类,其具体的实现类定义了具体的组件的性能计数器和对应的封装方法

wKiom1T1w5Oimh3vAAEQLzntWLo002.jpg

ChannelCounter中包含的所有的可用的counter:

1
2
3
4
5
private  static  final  String[] ATTRIBUTES = {
     COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT,
     COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS,
     COUNTER_EVENT_TAKE_SUCCESS, COUNTER_CHANNEL_CAPACITY
   };

ChannelCounter的构造方法调用MonitoredCounterGroup的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   public  ChannelCounter(String name) {
     super (MonitoredCounterGroup.Type. CHANNEL, name, ATTRIBUTES );  //调用MonitoredCounterGroup构造方法
   }
MonitoredCounterGroup构造方法:
private  final  Map<String, AtomicLong> counterMap;
....
   protected  MonitoredCounterGroup(Type type, String name, String... attrs) {
     this . type = type;
     this . name = name;
     Map<String, AtomicLong> counterInitMap =  new  HashMap<String, AtomicLong>();  // 声明一个初始的hashmap,用来存放counter name到value的对应关系
     // Initialize the counters
     for  (String attribute : attrs) {
       counterInitMap.put(attribute,  new  AtomicLong(0L));  //初始value都为0
     }
     counterMap = Collections.unmodifiableMap(counterInitMap);  //返回hashmap不可更改的映射视图
     startTime =  new  AtomicLong(0L);
     stopTime =  new  AtomicLong(0L);
   }

这里Type是一个enum类型,可取值:

1
2
3
4
5
6
7
8
9
10
   public  static  enum  Type {
     SOURCE,
     CHANNEL_PROCESSOR,
     CHANNEL,
     SINK_PROCESSOR,
     SINK,
     INTERCEPTOR,
     SERIALIZER,
     OTHER
   };

在MemoryChannel中的start方法中启动:

1
2
3
4
5
6
7
   public  synchronized  void  start() {
     channelCounter.start();  //调用MonitoredCounterGroup的start方法
     channelCounter.setChannelSize( queue.size());
     channelCounter.setChannelCapacity(Long. valueOf(
             queue.size() + queue.remainingCapacity()));
     super .start();
   }

MonitoredCounterGroup.start:

1
2
3
4
5
6
7
8
9
   public  void  start() {
     register();  //调用register方法注册counter,主要是调用ManagementFactory. getPlatformMBeanServer().registerMBean( this, objName);进行注册mbean操作,把ChannelCounter对象作为mbean进行注册
     stopTime.set(0L);
     for  (String counter : counterMap.keySet()) {
       counterMap.get(counter).set(0L);  //设置值都为0
     }
     startTime.set(System. currentTimeMillis());
     logger.info(  "Component type: "  + type +  ", name: "  + name +  " started"  );
   }

这样就可以通过jmx获取的监控项



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1617019,如需转载请自行联系原作者

相关文章
|
3月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
43 1
|
3月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
63 1
|
5月前
|
数据采集 存储 Apache
Flume核心组件大揭秘:Agent、Source、Channel、Sink,一文掌握数据采集精髓!
【8月更文挑战第24天】Flume是Apache旗下的一款顶级服务工具,专为大规模日志数据的收集、聚合与传输而设计。其架构基于几个核心组件:Agent、Source、Channel及Sink。Agent作为基础执行单元,整合Source(数据采集)、Channel(数据暂存)与Sink(数据传输)。本文通过实例深入剖析各组件功能与配置,包括Avro、Exec及Spooling Directory等多种Source类型,Memory与File Channel方案以及HDFS、Avro和Logger等Sink选项,旨在提供全面的Flume应用指南。
336 1
|
消息中间件 数据采集 存储
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的Kafka Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的Kafka Channel,讲解其数据采集流程。
209 0
|
缓存 中间件
【Flume中间件】(8)channel选择器副本机制
【Flume中间件】(8)channel选择器副本机制
194 1
【Flume中间件】(8)channel选择器副本机制
|
数据采集 存储 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的File Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的File Channel,讲解其数据采集流程。
160 0
|
数据采集 存储 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的JDBC Channel
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Channel模块是实现数据缓存和传输的核心模块之一。本文将介绍Flume中的JDBC Channel,讲解其数据采集流程。
293 0
|
存储 数据采集 大数据
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Channel的Memory Channel
在Flume中,Channel是用于存储从Source采集的数据并传输至Sink的组件。Memory Channel是其中一种常见的Channel类型。它将事件存储在内存中,并提供快速的读写和处理能力。本文将介绍Memory Channel的配置和数据传输流程。
170 0
|
存储 数据采集 消息中间件
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Sink:从Channel中取数据
在Flume中,Sink是数据采集和传输过程中的最终组件。它负责从Channel缓冲区中获取数据并将其存储到目标存储系统中。
281 0
|
存储 数据采集 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Channel:临时存储数据的管道
在Flume中,Channel是数据采集和传输过程中的一个重要组件。它负责存储从Source获取的数据,并将其转发给Sink进行处理和存储。
164 0