对于flume的监控,只需要监控channel的性能数据即可,source和sink的性能一部分可以从channel中表现出来。
以MemoryChannel为例,在MemoryTransaction的构造函数中会实例化一个org.apache.flume.instrumentation.ChannelCounter对象
1
2
3
4
5
|
public
MemoryTransaction(
int
transCapacity, ChannelCounter counter) {
putList =
new
LinkedBlockingDeque<Event>(transCapacity);
takeList =
new
LinkedBlockingDeque<Event>(transCapacity);
channelCounter = counter;
}
|
org.apache.flume.instrumentation.ChannelCounter 定义了几个计数器用来记录channel的性能数据
1
2
3
4
5
6
|
private
static
final
String COUNTER_CHANNEL_SIZE =
"channel.current.size"
;
//已经使用的容量大小
private
static
final
String COUNTER_EVENT_PUT_ATTEMPT =
"channel.event.put.attempt"
;
//source到channel尝试插入的数据(不管是否成功)
private
static
final
String COUNTER_EVENT_TAKE_ATTEMPT =
"channel.event.take.attempt"
;
//sink从channel尝试消费的数据(不管是否成功)
private
static
final
String COUNTER_EVENT_PUT_SUCCESS =
"channel.event.put.success"
;
//source到channel成功插入的数据
private
static
final
String COUNTER_EVENT_TAKE_SUCCESS =
"channel.event.take.success"
;
//sink从channel成功消费的数据
private
static
final
String COUNTER_CHANNEL_CAPACITY =
"channel.capacity"
;
//总容量大小
|
并封装了相关的方法,来操作这些性能计数器:
比如channel.event.put.attempt的由getEventPutAttemptCount和incrementEventPutAttemptCount操作:
1
2
3
4
5
6
|
public
long
incrementEventPutAttemptCount() {
//用于数量增加1
return
increment( COUNTER_EVENT_PUT_ATTEMPT);
}
public
long
getEventPutAttemptCount() {
//用于获取值
return
get( COUNTER_EVENT_PUT_ATTEMPT);
}
|
而在channel的相关操作中会使用到这些方法:
1
2
3
4
5
6
7
8
9
10
11
|
protected
void
doPut(Event event)
throws
InterruptedException {
channelCounter.incrementEventPutAttemptCount();
//比如在插入数据的操作开始时,增加channel.event.put.attempt的值
int
eventByteSize = (
int
)Math.ceil(estimateEventSize(event)/ byteCapacitySlotSize);
if
(! putList.offer(event)) {
throw
new
ChannelException(
"Put queue for MemoryTransaction of capacity "
+
putList.size() +
" full, consider committing more frequently, "
+
"increasing capacity or increasing thread count"
);
}
putByteCounter += eventByteSize;
}
|
counter的注册使用,以memorychannel相关为例:
ChannelCounter扩展了MonitoredCounterGroup类并实现了ChannelCounterMBean接口
MonitoredCounterGroup是一个抽象类,其具体的实现类定义了具体的组件的性能计数器和对应的封装方法
ChannelCounter中包含的所有的可用的counter:
1
2
3
4
5
|
private
static
final
String[] ATTRIBUTES = {
COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT,
COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS,
COUNTER_EVENT_TAKE_SUCCESS, COUNTER_CHANNEL_CAPACITY
};
|
ChannelCounter的构造方法调用MonitoredCounterGroup的构造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public
ChannelCounter(String name) {
super
(MonitoredCounterGroup.Type. CHANNEL, name, ATTRIBUTES );
//调用MonitoredCounterGroup构造方法
}
MonitoredCounterGroup构造方法:
private
final
Map<String, AtomicLong> counterMap;
....
protected
MonitoredCounterGroup(Type type, String name, String... attrs) {
this
. type = type;
this
. name = name;
Map<String, AtomicLong> counterInitMap =
new
HashMap<String, AtomicLong>();
// 声明一个初始的hashmap,用来存放counter name到value的对应关系
// Initialize the counters
for
(String attribute : attrs) {
counterInitMap.put(attribute,
new
AtomicLong(0L));
//初始value都为0
}
counterMap = Collections.unmodifiableMap(counterInitMap);
//返回hashmap不可更改的映射视图
startTime =
new
AtomicLong(0L);
stopTime =
new
AtomicLong(0L);
}
|
这里Type是一个enum类型,可取值:
1
2
3
4
5
6
7
8
9
10
|
public
static
enum
Type {
SOURCE,
CHANNEL_PROCESSOR,
CHANNEL,
SINK_PROCESSOR,
SINK,
INTERCEPTOR,
SERIALIZER,
OTHER
};
|
在MemoryChannel中的start方法中启动:
1
2
3
4
5
6
7
|
public
synchronized
void
start() {
channelCounter.start();
//调用MonitoredCounterGroup的start方法
channelCounter.setChannelSize( queue.size());
channelCounter.setChannelCapacity(Long. valueOf(
queue.size() + queue.remainingCapacity()));
super
.start();
}
|
MonitoredCounterGroup.start:
1
2
3
4
5
6
7
8
9
|
public
void
start() {
register();
//调用register方法注册counter,主要是调用ManagementFactory. getPlatformMBeanServer().registerMBean( this, objName);进行注册mbean操作,把ChannelCounter对象作为mbean进行注册
stopTime.set(0L);
for
(String counter : counterMap.keySet()) {
counterMap.get(counter).set(0L);
//设置值都为0
}
startTime.set(System. currentTimeMillis());
logger.info(
"Component type: "
+ type +
", name: "
+ name +
" started"
);
}
|
这样就可以通过jmx获取的监控项
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1617019,如需转载请自行联系原作者