开发者学堂课程【Flume 基础应用实战-企业全场景解决方案:Apache Flume-自定义拦截器-代码逻辑梳理】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/715/detail/12776
Apache Flume-自定义拦截器-代码逻辑梳理
自定义拦截器的具体代码实现
通过最终的调用方式,可以看到它首先调的是自定类当中的一个叫做 Builder 的内部类,它需要去解析相关的参数,接下来落实到代码层面看一下怎么去完善。
打开参考资料,里面有一个叫做拦截器的案例,打开案例二,这时需要创建一个Maven 工程导入开发制定拦截器相关的架包,这里有一个样例,里面需要的依赖比较简单,一个是 Flume 当中的 sdk 包,一个是 Flume 当中的 ng-core 核心包。
下面需要两个插件,一个是指定编译器版本的1.8,一个是用来打一个 jar 包的叫做maven-shade-plugins,这些都是固定的。
创建好之后,打开开发工具来看一下,整个开发过程当中,需要写相关的代码去实现对应的 Flume 的接口,这个接口并不是我们自己指定的,它有官方的要求。
比如这里,首先写一个类,这个类需要去继承 interceptor,但是这个类怎么去看? 点击左边叫做 structure 的模块,看一下这个类中有哪些结构。
这个类当中有三个内部类,一个叫做 Builder,一个叫做 Constants,一个叫做StringUtils。这个 StringUtils 当中就做了一个 MD5 的加密,包括字节转数组,字节转 number 相关的方法分装,相当于一个工具类,而 Constants 相当于定了一个常量,重点还是看 Builder 这个内部类,因为从最终调用的方式来看,不管是去调用它自带的拦截器,还是开发拦截器,它都是首先调的是 Builder 这个内部类,如果这个内部类没有实现,外面的类也不会去调用。
可以看一下它怎么调用的,打开这个 Builder,可以发现当中有几个方法,一个叫做 build,一个叫做 confugure ,整个自定义拦截器首先调的方法就是 confugure 这个方法。
看到方法,首先来到 Builder 类当中看一下,可以发现这个类,首先要给它定义为一个静态的 ,从 java 语法层面上,因为它是自定义拦截器的一个内部类,在调用的时候,又是首先调用它,如果它不是静态的,意味着获取不到它的实例,因为外面的类还没有,所以说要把它定义为静态的,它要去实现 interceptor 接口下的Builder 接口,这是接口当中的接口,这都是它的开发规范。在这个类当中,首先定义了四个变量。
第一个叫做字段之间的分隔符,第二个叫做分隔后面的下标,第三个下标之间的分割符,第四个需要加密字段的下标,可以发现这四个字段属性正是配置当中的四个属性,说明我们需要把这里配置的四个参数属性值接收到,这里使用一个叫configure 的方法,可以获取到 Flume 当中的上下文,通过 apr 就可以获取到指定的参数。
可以明确在自定义开发 flume 拦截器的时候,如果某些属性,某些配置,不想在代码当中写死,就可以开放出来一个接口,让用户在调用的时候进行配置,我们在当中获取到就可以了。
这里就获取到字段之间的分隔符,包括下标分割符,但是我们并没有写得比较懒散,而是做了一个规范化的开发。
这里定了一个常量,这个 fields_separator 是小写的形式,这个值就是刚才调用的时候当中它的值,根据这几个属性值就可以拿到对应传过来的相关参数。
后面还做了一个 default 默认值指定,就是如果用户没有去调用的时候配置这四个参数,这里会有默认的值,通过 configure 方法就可以获取到采集方案当中的四个属性,然后它赋给 Build 这个方法的内部的成员变量。
接下来调用 Build 方法:
在这个方法当中,通过内部类创建了外部类的实例,再去 new 它的时候,把刚才解析到的四个参数,以构造的方式传递给它,这样就来到了外部类当中。
第一个方法是 configure:通过 Flume 上下文环境变量读取采集方案中配置的属性,如果没有配置,会有默认值。第一步通过 configure 解析出来配置的相关参数,接下来这个参数要传递给拦截器。
第二个方法是 build: 把解析的参数传递给第一步拦截器的实例对象。之后就把解到的参数传递给外部的 interceptor 类,而这个类要去继承 interceptor 这个接口。
同样,在这个外部类当中,也定义了四个成员变量属性,分别是字段之间的分隔符,下标之间的分隔符,以及加密的下标。这个外部类,它有一个拦截器,应该是一个构造函数。第三步,通过自定义拦截器构造函数的构造器获取到配置文件中的参数。
它通过内部类获取参数,然后传递给外部类,拿到这四个参数之后,做了一个处理,比如字段之间做一个去空的操作,但是不要忘了当中有两个字段是用 Unicode编码形式表示,一个是字段之间的分隔符,还有一个下标之间的分隔符逗号,如果是普通的编码,直接复制给成员变量,如果是加密的 Unicode 编码,就做一个判断。
如果把 Unicode 转为 string,,转完之后,已经获得的拦截器的实例,并且已经获取到我们需要干什么事——数据分隔符是什么、加密哪些字段、保留哪些字段等。
接下来就是写具体拦截的内容,在拦截器外部拦截中,重点有两个方法需要注意,一个叫做 intercept,一个叫 intercept event ,它拦截的时候会拦截到数据 event量会调用这两种方法。这两种方法其实本质是一样的,就是参数不一样,一个是传递1个 event,一个传递的是 Listvent>,因为有可能拦截的是一个 event 的对象,也可能是一个批次。
其实针对一个批次的拦截也很简单,因为获取一个批次的 event 的去遍历它,把每一个再去分别调用,就相当于一样了。所以在下面这里没有做具体代码的实现。
首先去创建一个历史的 Listvent>,创建一个集合,然后针对这一个批次 event做一个 for 循环之后,把里面的每个 event 都拿出来去调用 intercept 方法,最终实现逻辑就在这一块,这个方法就是 event 批处理方法,最终要调用 Listvent> 方法。
接下来重点调的方法就是 intercept 方法,这里要写一个非常重要的注释——该方法就是自定义拦截器最终具体功能实现的地方。要做的就是拦截数据,把里面做一个解析,拦截到的就是一个个 event。
首先对数据 Get body,因为拿到的是一个 event 对象,而数据位于 body 当中,然后把它转为字符串的形式,这里需要注意。
接下来把这两个参数做一个分隔,一个是字段之间的分隔符,把数据按照字段进行切割,分隔符是 -t,这样就会返回一个字符串的数组。另外一个是角标的分隔,角标传入进来,还是135这样的字符算法,所以需要切割。
切割完之后,首先用了一个新的字符串,接下来针对数据做一个遍历,遍历切割的下标,这时候做一个判断,因为最终的结果需要做的是把指定的字段保留下来,并且针对指定的字段做一个加密,所以要判断遍历的字段是不是指定的字段。如果等于加密的字段,就做一个加密处理,如果不是加密的,直接把它添加到当中,因为加密的只有一个。所以说首先是0,0当然是等于它,等于它之后做一个加密,然后保存在新的字符串中。
简单来说,就是遍历需要保留下来的下标数组,这个叫做字符下标切割,遍历之后,匹配到加密地就进行加密,当中进行加密的字段,只有几个01356,遍历这个数组,在遍历01356当中,0匹配到之后就拿了0做一个加密,剩下的1356不匹配,就直接把它保留下来放在字符串中就可以。
而其他的247没有被保留,直接被舍弃了。我们在进行添加的时候,还做了一个小插曲,因为它们原来字段的数据之间的分隔符是为 -t,因此,最终输出的时候也需要加个 -t,所以说在进行添加的时候,要在每个字段之间也加上分隔符 -t。
所以说这最后一个循环是指定最终输出的字符串之间字段分隔符,也就说如果遍历出来没到结束,每个字段之间都会加上一个分隔符,这样就完成了指定保留字段的获取,并且对第一个字段0做了一个加密,那么拦截要完成之后,千万不要忘记把拦截完之后的数据保存在 event body 当中,这是最后一步,然后 return 释放回去,这样就完成了一个字段的拦截。
梳理:
通过类获取到用户的配置参数之后,最终在 intercept 当中做一个具体的拦截操作,通过 get body 获取到数据的内容转为字符串进行切割,然后遍历并判断哪些数字需要保留,然后该加密的加密,该保留的保留,不需要的直接过滤掉,最终再把它给返回回去,这就完成了拦截器的功能。
这就是通过 java代 码实现自定义代码的自定义拦截器的核心逻辑,当中要注意的就是静态内部类的使用,其他的相关规范都是它的一个相关的指定,不需要过多改变,只要按照这个规范来就可以。