flume源码学习7-SinkProcessor相关类

简介:

org.apache.flume.SinkProcessor 扩展了LifecycleAware, Configurable接口的接口类,操作多个sink的抽象层(类似于proxy),用来分配给SinkRunner对象
抽象方法:
process 和Sink 的process方法类似(内部实现增加了选择Sink的功能)
setSinks 设置sinks
具体实现类:

wKioL1T1xzqgIGiBAAC6r_hdhZE689.jpg



org.apache.flume.sink.SinkProcessorFactory 设计模式的工厂模式,用于返回SinkProcessor对象(比如SinkGroup中就会调用这个类的getProcessor方法返回SinkProcessor对象)
提供getProcessor方法,根据type的设置和SinkProcessorType返回SinkProcessor对象,并使用processor.setSinks(sinks);设置Sink列表



其中org.apache.flume.conf.sink.SinkProcessorType是一个enum类,定义了processor type到类名的对应关系:

1
2
3
4
OTHER-> null
FAILOVER->org.apache.flume.sink.FailoverSinkProcessor
DEFAULT->org.apache.flume.sink.DefaultSinkProcessor  //默认
LOAD_BALANCE->org.apache.flume.sink.LoadBalancingSinkProcessor



org.apache.flume.sink.AbstractSinkProcessor实现了SinkProcessor接口



org.apache.flume.sink.DefaultSinkProcessor实现了SinkProcessor和ConfigurableComponent接口,在没有使用sink group时使用的processor,不会做额外的操作,只是简单的proxy的操作(直接process)

1
2
3
4
5
6
7
8
9
10
11
12
   @Override
   public  Status process()  throws  EventDeliveryException {
     return  sink.process();  //直接调用Sink.process方法
   }
   @Override
   public  void  setSinks(List<Sink> sinks) {
     Preconditions.checkNotNull(sinks);
     Preconditions.checkArgument(sinks.size() ==  1 "DefaultSinkPolicy can "
         "only handle one sink, "
         "try using a policy that supports multiple sinks"  );  //同时在setSinks方法中会检测对应的Sink是否为1个,如果不为1个会报错
     sink = sinks.get( 0 );
   }



实现类org.apache.flume.sink.FailoverSinkProcessor  //AbstractSinkProcessor的子类
原理:
内部定义了两个容器,分布存储存活的Sink和失败的Sink,如果失败的Sink中的元素过了cooldown时间,会调用其process方法,判断是否已经恢复正常,如果已经恢复正常就会加入到存活的Sink中,在存活的Sink中,会根据priority进行排序,并获取priority最大的那个Sink作为active sink
setSinks必须在configure之前运行,运行过程中不能添加sink
使用时,需要配置:
1)设置sinkgroups
2)设置sinkgroups的processor.type 为 failover
3)为每一个sink设置惩罚因子 processor.priority,并且设置值是唯一的(zmap中key是priority,value是sink,需要重priority查找sink),这一点比较重要
4)可以设置上线failover时间 processor.maxpenalty(默认30000s)
例子:

1
2
3
4
5
6
host1.sinkgroups = group1
host1.sinkgroups.group1.sinks = sink1 sink2
host1.sinkgroups.group1.processor.type = failover
host1.sinkgroups.group1.processor.priority.sink1 =  5
host1.sinkgroups.group1.processor.priority.sink2 =  10
host1.sinkgroups.group1.processor.maxpenalty =  10000

源码分析:
首先定义了一个内部类FailedSink实现了Comparable接口,包含了sink和priority等信息,可以用来做Sink的对比排序操作
在configure方法中定义了两个容器分布用来存放 live的sink和failed的sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
   private  static  final  String PRIORITY_PREFIX =  "priority." ;
   private  static  final  String MAX_PENALTY_PREFIX =  "maxpenalty" ;
   private  Map<String, Sink> sinks;
   private  Sink activeSink;
   private  SortedMap<Integer, Sink> liveSinks;  //存放 live的sink,key是priority,value是sink
   private  Queue<FailedSink> failedSinks;  //存放failed的sink
   private  int  maxPenalty ;
   @Override
   public  void  configure(Context context) {
     liveSinks =  new  TreeMap<Integer, Sink>();  //使用TreeMap存储priority到sink的对应关系, TreeMap是一个按key排序的map( 默认的排序为升序 )
     failedSinks =  new  PriorityQueue<FailedSink>();  //使用优先级队列
     Integer nextPrio =  0 ;
     String maxPenaltyStr = context.getString( MAX_PENALTY_PREFIX);  //获取设置的最大的maxpenalty 时间
     if (maxPenaltyStr ==  null ) {
       maxPenalty = DEFAULT_MAX_PENALTY;  //如果没有设置值,使用默认值30000
     else  {
       try  {
         maxPenalty = Integer.parseInt(maxPenaltyStr);
       catch  (NumberFormatException e) {
         logger.warn( "{} is not a valid value for {}"  ,
                 new  Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
         maxPenalty  = DEFAULT_MAX_PENALTY ;  //如果设置格式错误,也使用默认值
       }
     }
     for  (Entry<String, Sink> entry : sinks.entrySet()) {
       String priStr = PRIORITY_PREFIX + entry.getKey();
       Integer priority;
       try  {
         priority =  Integer.parseInt(context.getString(priStr));  //从配置信息中获取每个sink的priority值
       catch  (Exception e) {
         priority = --nextPrio;
       }
       if (!liveSinks.containsKey(priority)) {  // 查看liveSinks是否含有这个priority设置的项
         liveSinks.put(priority, sinks.get(entry.getKey()));
       else  {
         logger.warn( "Sink {} not added to FailverSinkProcessor as priority"  +
             "duplicates that of sink {}" , entry.getKey(),
             liveSinks.get(priority));
       }
     }
     activeSink = liveSinks.get(liveSinks.lastKey());  // 获取最后一个Sink作为active  sink(即priority最大的Sink)
   }

process方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public  Status process()  throws  EventDeliveryException {
     // Retry any failed sinks that have gone through their "cooldown " period
     Long now = System.currentTimeMillis();
     while (! failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {  //peek方法用于检索该队列的头部,但不会将其删除,如果此队列为空,则返回 null
       FailedSink cur = failedSinks.poll();  //获取并移除此队列的头,如果此队列为空,则返回 null
       Status s;
       try  {
         s = cur.getSink().process();  //调用对应的process方法
         if  (s  == Status.READY) {  //如果sink处于READY状态
           liveSinks.put(cur.getPriority(), cur.getSink());  //则插入到liveSinks 中
           activeSink = liveSinks .get(liveSinks .lastKey());  //并尝试获取一次activeSink
           logger.debug( "Sink {} was recovered from the fail list"  ,
                   cur.getSink().getName());
         else  {
           // if it's a backoff it needn't be penalized.
           failedSinks.add(cur);  //否则继续加入到failedSinks 中
         }
         return  s;
       catch  (Exception e) {
         cur.incFails();
         failedSinks.add(cur);
       }
     }
     Status ret =  null ;
     while ( activeSink !=  null  ) {
       try  {
         ret = activeSink.process();  //对activeSink 调用process方法
         return  ret;
       catch  (Exception e) {
         logger.warn( "Sink {} failed and has been sent to failover list"  ,
                 activeSink.getName(), e);
         activeSink = moveActiveToDeadAndGetNext();  //moveActiveToDeadAndGetNext用于从liveSinks中取出最后一个并添加到failedSinks中,同时获取新的最后一项
       }
     }
     throw  new  EventDeliveryException( "All sinks failed to process, "  +
         "nothing left to failover to" );
   }

moveActiveToDeadAndGetNext方法用于从liveSinks中取出最后一个并添加到failedSinks中,同时获取新的最后一项

1
2
3
4
5
6
7
8
9
10
11
   private  Sink moveActiveToDeadAndGetNext() {
     Integer key = liveSinks.lastKey();  //获取当前activeSink 的priority值
     failedSinks.add(  new  FailedSink(key, activeSink ,  1 ));
     liveSinks.remove(key);  //从liveSinks中删除这一项
     if (liveSinks.isEmpty())  return  null  ;
     if (liveSinks.lastKey() !=  null ) {
       return  liveSinks.get(liveSinks .lastKey());  //取出新的最后一项
     else  {
       return  null ;
     }
   }



org.apache.flume.sink.LoadBalancingSinkProcessors//AbstractSinkProcessor的子类
两种选择方式ROUND_ROBIN(默认)/RANDOM,可以自定义自己的selector,只要实现SinkSelector 接口即可。
在process方法中循环调用每一个sink(createSinkIterator返回的迭代器),知道遇到可以正确返回的Sink并退出循环,如果所有的sink都不可用则抛出异常,默认时backoff的设置为false,这导致每一次循环都会检测所有的Sink,如果设置为true,会设置失败的Sink为backoff,一段时间后再加入可用的Sink列表中

例子:

1
2
3
4
host1.sinkgroups.group1.sinks = sink1 sink2
host1.sinkgroups.group1.processor.type = load_balance
host1.sinkgroups.group1.processor.selector = <selector type>  //random或者round_robin,默认是round_robin,也可以实现自己的selector (实现接口即可)SinkSelector
host1.sinkgroups.group1.processor.selector.selector_property = <value>

源码分析:
定义了一个内部接口类SinkSelector,主要包含的抽象方法:
createSinkIterator(返回可用的sink的迭代器)和informSinkFailed
configure方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private  SinkSelector selector;
....
   public  void  configure(Context context) {
     Preconditions.checkState(getSinks().size() >  1 ,
         "The LoadBalancingSinkProcessor cannot be used for a single sink. "
         "Please configure more than one sinks and try again."  );  //sink的数量必须大于1
     String selectorTypeName = context.getString( CONFIG_SELECTOR,
         SELECTOR_NAME_ROUND_ROBIN);  //获取selector的设置,默认是ROUND_ROBIN
     Boolean shouldBackOff = context.getBoolean( CONFIG_BACKOFF,  false  );  //获取backoff的设置,默认是false
     selector =  null ;
     if  (selectorTypeName.equalsIgnoreCase( SELECTOR_NAME_ROUND_ROBIN)) {   //如果设置为ROUND_ROBIN,生成RoundRobinSinkSelector对象
       selector =  new  RoundRobinSinkSelector(shouldBackOff);
     else  if  (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {  //如果设置为RANDOM,生成RandomOrderSinkSelector对象
       selector =  new  RandomOrderSinkSelector(shouldBackOff);
     else  {
       try  {
         @SuppressWarnings ( "unchecked"  )
         Class<?  extends  SinkSelector> klass = (Class<?  extends  SinkSelector>)
             Class.forName(selectorTypeName);  //自定义的类型的获取,自定义类型需要扩展SinkSelector类
         selector = klass.newInstance();
       catch  (Exception ex) {
         throw  new  FlumeException( "Unable to instantiate sink selector: "
             + selectorTypeName, ex);
       }
     }
     selector.setSinks(getSinks());
     selector.configure(
         new  Context(context.getSubProperties(CONFIG_SELECTOR_PREFIX)));
     LOGGER.debug(  "Sink selector: "  + selector +  " initialized"  );
   }

process方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   public  Status process()  throws  EventDeliveryException {
     Status status =  null ;
     Iterator<Sink> sinkIterator = selector.createSinkIterator(); //调用对应SinkSelector实现类的createSinkIterator方法,返回可用的sink的迭代器
     while  (sinkIterator.hasNext()) {  //循环调用对应每一个sink的process方法
       Sink sink = sinkIterator.next();
       try  {
         status = sink.process();
         break //如果遇到第一个可以返回status的即退出循环
       catch  (Exception ex) {
         selector.informSinkFailed(sink);   //如果sink失败调用对应selector的informSinkFailed方法
         LOGGER.warn( "Sink failed to consume event. "
             "Attempting next sink if available."  , ex);
       }
     }
     if  (status ==  null ) {  //如果所有的都出现问题,才抛出异常
       throw  new  EventDeliveryException( "All configured sinks have failed"  );
     }
     return  status;
   }

定义两个SinkSelector的实现类:RoundRobinSinkSelector和RandomOrderSinkSelector

wKiom1T1xk7gNCPVAADzrEzEkxE386.jpg

以RoundRobinSinkSelector为例:
createIterator-->getIndexList  //返回当前活动的对象
informSinkFailed 如果backoff设置为了true才有效(默认为false),主要是设置restoreTime等FailureState属性(在getIndexList  中会使用这个属性)



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1617025,如需转载请自行联系原作者

相关文章
|
6天前
|
Shell
Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
【2月更文挑战第17天】Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
85 2
|
9月前
|
SQL 分布式计算 监控
Flume学习--1、Flume概述、Flume入门、(一)
Flume学习--1、Flume概述、Flume入门、(一)
|
9月前
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
9月前
|
监控 负载均衡
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
|
9月前
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
|
9月前
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
|
9月前
|
JSON 监控 Unix
Flume学习--1、Flume概述、Flume入门、(二)
Flume学习--1、Flume概述、Flume入门、(二)
|
存储 消息中间件 数据采集
大数据开发笔记(六):Flume基础学习
Flume是数据采集,日志收集的框架,通过分布式形式进行采集,(高可用分布式)
254 0
大数据开发笔记(六):Flume基础学习
|
消息中间件 SQL 数据采集
|
分布式计算 监控 Java
02. Spark Streaming实时流处理学习——分布式日志收集框架Flume
2. 分布式日志收集框架Flume 2.1 业务现状分析 如上图,大量的系统和各种服务的日志数据持续生成。用户有了很好的商业创意想要充分利用这些系统日志信息。比如用户行为分析,轨迹跟踪等等。如何将日志上传到Hadoop集群上?对比方案存在什么问题,以及有什么优势? 方案1: 容错,负载均衡,高延时等问题如何消除? 方案2: Flume框架 2.
2398 0