org.apache.flume.SinkProcessor 扩展了LifecycleAware, Configurable接口的接口类,操作多个sink的抽象层(类似于proxy),用来分配给SinkRunner对象
抽象方法:
process 和Sink 的process方法类似(内部实现增加了选择Sink的功能)
setSinks 设置sinks
具体实现类:
org.apache.flume.sink.SinkProcessorFactory 设计模式的工厂模式,用于返回SinkProcessor对象(比如SinkGroup中就会调用这个类的getProcessor方法返回SinkProcessor对象)
提供getProcessor方法,根据type的设置和SinkProcessorType返回SinkProcessor对象,并使用processor.setSinks(sinks);设置Sink列表
其中org.apache.flume.conf.sink.SinkProcessorType是一个enum类,定义了processor type到类名的对应关系:
1
2
3
4
|
OTHER->
null
FAILOVER->org.apache.flume.sink.FailoverSinkProcessor
DEFAULT->org.apache.flume.sink.DefaultSinkProcessor
//默认
LOAD_BALANCE->org.apache.flume.sink.LoadBalancingSinkProcessor
|
org.apache.flume.sink.AbstractSinkProcessor实现了SinkProcessor接口
org.apache.flume.sink.DefaultSinkProcessor实现了SinkProcessor和ConfigurableComponent接口,在没有使用sink group时使用的processor,不会做额外的操作,只是简单的proxy的操作(直接process)
1
2
3
4
5
6
7
8
9
10
11
12
|
@Override
public
Status process()
throws
EventDeliveryException {
return
sink.process();
//直接调用Sink.process方法
}
@Override
public
void
setSinks(List<Sink> sinks) {
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() ==
1
,
"DefaultSinkPolicy can "
+
"only handle one sink, "
+
"try using a policy that supports multiple sinks"
);
//同时在setSinks方法中会检测对应的Sink是否为1个,如果不为1个会报错
sink = sinks.get(
0
);
}
|
实现类org.apache.flume.sink.FailoverSinkProcessor //AbstractSinkProcessor的子类
原理:
内部定义了两个容器,分布存储存活的Sink和失败的Sink,如果失败的Sink中的元素过了cooldown时间,会调用其process方法,判断是否已经恢复正常,如果已经恢复正常就会加入到存活的Sink中,在存活的Sink中,会根据priority进行排序,并获取priority最大的那个Sink作为active sink
setSinks必须在configure之前运行,运行过程中不能添加sink
使用时,需要配置:
1)设置sinkgroups
2)设置sinkgroups的processor.type 为 failover
3)为每一个sink设置惩罚因子 processor.priority,并且设置值是唯一的(zmap中key是priority,value是sink,需要重priority查找sink),这一点比较重要
4)可以设置上线failover时间 processor.maxpenalty(默认30000s)
例子:
1
2
3
4
5
6
|
host1.sinkgroups = group1
host1.sinkgroups.group1.sinks = sink1 sink2
host1.sinkgroups.group1.processor.type = failover
host1.sinkgroups.group1.processor.priority.sink1 =
5
host1.sinkgroups.group1.processor.priority.sink2 =
10
host1.sinkgroups.group1.processor.maxpenalty =
10000
|
源码分析:
首先定义了一个内部类FailedSink实现了Comparable接口,包含了sink和priority等信息,可以用来做Sink的对比排序操作
在configure方法中定义了两个容器分布用来存放 live的sink和failed的sink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
private
static
final
String PRIORITY_PREFIX =
"priority."
;
private
static
final
String MAX_PENALTY_PREFIX =
"maxpenalty"
;
private
Map<String, Sink> sinks;
private
Sink activeSink;
private
SortedMap<Integer, Sink> liveSinks;
//存放 live的sink,key是priority,value是sink
private
Queue<FailedSink> failedSinks;
//存放failed的sink
private
int
maxPenalty ;
@Override
public
void
configure(Context context) {
liveSinks =
new
TreeMap<Integer, Sink>();
//使用TreeMap存储priority到sink的对应关系, TreeMap是一个按key排序的map( 默认的排序为升序 )
failedSinks =
new
PriorityQueue<FailedSink>();
//使用优先级队列
Integer nextPrio =
0
;
String maxPenaltyStr = context.getString( MAX_PENALTY_PREFIX);
//获取设置的最大的maxpenalty 时间
if
(maxPenaltyStr ==
null
) {
maxPenalty = DEFAULT_MAX_PENALTY;
//如果没有设置值,使用默认值30000
}
else
{
try
{
maxPenalty = Integer.parseInt(maxPenaltyStr);
}
catch
(NumberFormatException e) {
logger.warn(
"{} is not a valid value for {}"
,
new
Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
maxPenalty = DEFAULT_MAX_PENALTY ;
//如果设置格式错误,也使用默认值
}
}
for
(Entry<String, Sink> entry : sinks.entrySet()) {
String priStr = PRIORITY_PREFIX + entry.getKey();
Integer priority;
try
{
priority = Integer.parseInt(context.getString(priStr));
//从配置信息中获取每个sink的priority值
}
catch
(Exception e) {
priority = --nextPrio;
}
if
(!liveSinks.containsKey(priority)) {
// 查看liveSinks是否含有这个priority设置的项
liveSinks.put(priority, sinks.get(entry.getKey()));
}
else
{
logger.warn(
"Sink {} not added to FailverSinkProcessor as priority"
+
"duplicates that of sink {}"
, entry.getKey(),
liveSinks.get(priority));
}
}
activeSink = liveSinks.get(liveSinks.lastKey());
// 获取最后一个Sink作为active sink(即priority最大的Sink)
}
|
process方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
public
Status process()
throws
EventDeliveryException {
// Retry any failed sinks that have gone through their "cooldown " period
Long now = System.currentTimeMillis();
while
(! failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
//peek方法用于检索该队列的头部,但不会将其删除,如果此队列为空,则返回 null
FailedSink cur = failedSinks.poll();
//获取并移除此队列的头,如果此队列为空,则返回 null
Status s;
try
{
s = cur.getSink().process();
//调用对应的process方法
if
(s == Status.READY) {
//如果sink处于READY状态
liveSinks.put(cur.getPriority(), cur.getSink());
//则插入到liveSinks 中
activeSink = liveSinks .get(liveSinks .lastKey());
//并尝试获取一次activeSink
logger.debug(
"Sink {} was recovered from the fail list"
,
cur.getSink().getName());
}
else
{
// if it's a backoff it needn't be penalized.
failedSinks.add(cur);
//否则继续加入到failedSinks 中
}
return
s;
}
catch
(Exception e) {
cur.incFails();
failedSinks.add(cur);
}
}
Status ret =
null
;
while
( activeSink !=
null
) {
try
{
ret = activeSink.process();
//对activeSink 调用process方法
return
ret;
}
catch
(Exception e) {
logger.warn(
"Sink {} failed and has been sent to failover list"
,
activeSink.getName(), e);
activeSink = moveActiveToDeadAndGetNext();
//moveActiveToDeadAndGetNext用于从liveSinks中取出最后一个并添加到failedSinks中,同时获取新的最后一项
}
}
throw
new
EventDeliveryException(
"All sinks failed to process, "
+
"nothing left to failover to"
);
}
|
moveActiveToDeadAndGetNext方法用于从liveSinks中取出最后一个并添加到failedSinks中,同时获取新的最后一项
1
2
3
4
5
6
7
8
9
10
11
|
private
Sink moveActiveToDeadAndGetNext() {
Integer key = liveSinks.lastKey();
//获取当前activeSink 的priority值
failedSinks.add(
new
FailedSink(key, activeSink ,
1
));
liveSinks.remove(key);
//从liveSinks中删除这一项
if
(liveSinks.isEmpty())
return
null
;
if
(liveSinks.lastKey() !=
null
) {
return
liveSinks.get(liveSinks .lastKey());
//取出新的最后一项
}
else
{
return
null
;
}
}
|
org.apache.flume.sink.LoadBalancingSinkProcessors//AbstractSinkProcessor的子类
两种选择方式ROUND_ROBIN(默认)/RANDOM,可以自定义自己的selector,只要实现SinkSelector 接口即可。
在process方法中循环调用每一个sink(createSinkIterator返回的迭代器),知道遇到可以正确返回的Sink并退出循环,如果所有的sink都不可用则抛出异常,默认时backoff的设置为false,这导致每一次循环都会检测所有的Sink,如果设置为true,会设置失败的Sink为backoff,一段时间后再加入可用的Sink列表中
例子:
1
2
3
4
|
host1.sinkgroups.group1.sinks = sink1 sink2
host1.sinkgroups.group1.processor.type = load_balance
host1.sinkgroups.group1.processor.selector = <selector type>
//random或者round_robin,默认是round_robin,也可以实现自己的selector (实现接口即可)SinkSelector
host1.sinkgroups.group1.processor.selector.selector_property = <value>
|
源码分析:
定义了一个内部接口类SinkSelector,主要包含的抽象方法:
createSinkIterator(返回可用的sink的迭代器)和informSinkFailed
configure方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
private
SinkSelector selector;
....
public
void
configure(Context context) {
Preconditions.checkState(getSinks().size() >
1
,
"The LoadBalancingSinkProcessor cannot be used for a single sink. "
+
"Please configure more than one sinks and try again."
);
//sink的数量必须大于1
String selectorTypeName = context.getString( CONFIG_SELECTOR,
SELECTOR_NAME_ROUND_ROBIN);
//获取selector的设置,默认是ROUND_ROBIN
Boolean shouldBackOff = context.getBoolean( CONFIG_BACKOFF,
false
);
//获取backoff的设置,默认是false
selector =
null
;
if
(selectorTypeName.equalsIgnoreCase( SELECTOR_NAME_ROUND_ROBIN)) {
//如果设置为ROUND_ROBIN,生成RoundRobinSinkSelector对象
selector =
new
RoundRobinSinkSelector(shouldBackOff);
}
else
if
(selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
//如果设置为RANDOM,生成RandomOrderSinkSelector对象
selector =
new
RandomOrderSinkSelector(shouldBackOff);
}
else
{
try
{
@SuppressWarnings
(
"unchecked"
)
Class<?
extends
SinkSelector> klass = (Class<?
extends
SinkSelector>)
Class.forName(selectorTypeName);
//自定义的类型的获取,自定义类型需要扩展SinkSelector类
selector = klass.newInstance();
}
catch
(Exception ex) {
throw
new
FlumeException(
"Unable to instantiate sink selector: "
+ selectorTypeName, ex);
}
}
selector.setSinks(getSinks());
selector.configure(
new
Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
LOGGER.debug(
"Sink selector: "
+ selector +
" initialized"
);
}
|
process方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public
Status process()
throws
EventDeliveryException {
Status status =
null
;
Iterator<Sink> sinkIterator = selector.createSinkIterator();
//调用对应SinkSelector实现类的createSinkIterator方法,返回可用的sink的迭代器
while
(sinkIterator.hasNext()) {
//循环调用对应每一个sink的process方法
Sink sink = sinkIterator.next();
try
{
status = sink.process();
break
;
//如果遇到第一个可以返回status的即退出循环
}
catch
(Exception ex) {
selector.informSinkFailed(sink);
//如果sink失败调用对应selector的informSinkFailed方法
LOGGER.warn(
"Sink failed to consume event. "
+
"Attempting next sink if available."
, ex);
}
}
if
(status ==
null
) {
//如果所有的都出现问题,才抛出异常
throw
new
EventDeliveryException(
"All configured sinks have failed"
);
}
return
status;
}
|
定义两个SinkSelector的实现类:RoundRobinSinkSelector和RandomOrderSinkSelector
以RoundRobinSinkSelector为例:
createIterator-->getIndexList //返回当前活动的对象
informSinkFailed 如果backoff设置为了true才有效(默认为false),主要是设置restoreTime等FailureState属性(在getIndexList 中会使用这个属性)
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1617025,如需转载请自行联系原作者