Apache Flume-自定义拦截器-代码逻辑梳理|学习笔记

简介: 快速学习 Apache Flume-自定义拦截器-代码逻辑梳理

开发者学堂课程【Flume 基础应用实战-企业全场景解决方案Apache Flume-自定义拦截器-代码逻辑梳理】学习笔记,与课程紧密联系,让用户快速学习知识。  

课程地址:https://developer.aliyun.com/learning/course/715/detail/12776


Apache Flume-自定义拦截器-代码逻辑梳理


自定义拦截器的具体代码实现

通过最终的调用方式,可以看到它首先调的是自定类当中的一个叫做 Builder 的内部类,它需要去解析相关的参数,接下来落实到代码层面看一下怎么去完善

打开参考资料,里面有一个叫做拦截器的案例,打开案例二,这时需要创建一个Maven 工程导入开发制定拦截器相关的架包,这里有一个样例,里面需要的依赖比较简单,一个是 Flume 当中的 sdk 一个是 Flume 当中的 ng-core 核心

下面需要两个插件,一个是指定编译器版本的1.8个是用来打一个 jar 包的叫做maven-shade-plugins,这些都是固定的

创建好之后,打开开发工具来看一下整个开发过程当中,需要写相关的代码去实现对应的 Flume 的接口,这个接口并不是我们自己指定的,它有官方的要求

图片1.png

比如这里,首先写一个类,这个类需要去继承 interceptor,但是这个类怎么去看?  点击左边叫做 structure 模块看一下这个类中有哪些结构

图片2.png

这个类当中有三个内部类,一个叫做 Builder,一个叫做 Constants,一个叫做StringUtils。这个 StringUtils 当中就做了一个 MD5 的加密,包括字节转数组,字节转 number 相关的方法分装,相当一个工具类,而 Constants 相当于定了一个常量,重点还是看 Builder 这个内部类因为从最终调用的方式来看,不管是去调用它自带的拦截器,还是开发拦截器,它都是首先调的是 Builder 这个内部类如果这个内部类没有实现外面类也不会去调用。

可以看一下它怎么调用的打开这个 Builder可以发现当中有几个方法,一个叫做 build,一个叫做 confugure ,整个自定义拦截器首先调的方法就是 confugure 这个方法。

看到方法,首先来到 Builder 类当中看一下,可以发现这个类,首先要给它定义为一个静态的从 java 语法层面上,因为它是自定义拦截器的一个内部类,在调用的时候,又是首先调用它如果它不是静态的,意味着获取不到它的实例因为外面类还没有所以说要把它定义为静态的,它要去实现 interceptor 接口下的Builder 接口,这是接口当中的接口,这都是它的开发规范在这个类当中,首先定义了四个变量。

第一个叫做字段之间的分隔第二个叫做分隔后面的下标第三个下标之间的分割符第四个需要加密字段的下标,可以发现这四个字段属性正是配置当中的四个属性,说明我们需要把这里配置的四个参数属性值接收到这里使用一个叫configure 方法,可以获取到 Flume 当中的上下文,通过 apr 就可以获取到指定的参数

可以明确在自定义开发 flume 拦截器的时候,如果某些属性,某些配置不想在代码当中写死就可以开放出来一个接口,让用户调用的时候进行配置,我们在当中获取到就可以了。

这里就获取到字段之间的符,包括下标分割符,但是我们并没有写得比较懒散,而是做了一个规范化的开发。

图片3.png

这里定了一个常量,这个 fields_separator 小写的形式这个值就是刚才调用的时候当中它的值,根据这几个属性值就可以拿到对应传过来的相关参

图片4.png

后面还做了一个 default 默认值指定,就是如果用户没有去调用的时候配置这四个参数这里会有默认的值,通过 configure 方法就可以获取到采集方案当中的四个属性,然后它Build 这个方法的内部的成员变量

接下来调用 Build 方法

图片5.png

在这个方法当中,通过内部类创建外部类的实例,再去 new 它的时候把刚才解析到的四个参数以构造的方式传递给,这样就来到了外部类当中

第一个方法是 configure通过 Flume 上下文环境变量读取采集方案中配置的属性如果没有配置会有默认值第一步通过 configure 解析出来配置的相关参数,接下来这个参数要传递给拦截器

第二个方法是 build: 把解析的参数传递给第一步拦截器的实例对象之后就把解到参数传递给外部的 interceptor 类而这个类要去继承 interceptor 这个接口

同样,在这个外部类当中,也定义了四个成员变量属性分别是字段之间的分下标之间分隔符,以及加密的下标这个外部类,它有一个拦截器,应该是一个构造函数。第三步通过自定义拦截器构造函数构造器获取到配置文件中的参数

它通过内部类获取参数,然后传递给外部类,拿到这四个参数之后做了一个处理,比如字段之间做一个去空的操作但是不要忘了当中有两个字段是用 Unicode编码形式表示,一个是字段之间的分符,还有一个下标之间的分隔符逗号如果是普通的编码,直接复制给成员变量,如果是加密的 Unicode 编码,做一个判断。

图片6.png

如果把 Unicode 转为 string转完之后,已经获得的拦截器的实例,并且已经获取到我们需要干什么事——数据分隔符是什么、加密哪些字段保留哪些字段等。

接下来就是写具体拦截的内容,在拦截器外部拦截中,重点有两个方法需要注意,一个叫做 intercept一个叫 intercept event  ,它拦截的时候拦截到数据 event量会调用这两种方法。这两种方法其实本质是一样的,就是参数不一样,一个是传递1个 event一个传递的是 Listvent>因为有可能拦截的是event 的对象,也可能是一个批次

其实针对一个批次的拦截也很简单,因为获取一个批次event 的去遍历把每一个再去分别调用就相当于一样了。所以在下面这里没有做具体代码的实现

首先去创建一个历史的 Listvent>,创建一个集合,然后针对这一个批次 event做一个 for 循环之后,把里面的每个 event 都拿出来去调用 intercept 方法,最终实现逻辑就在这一块,这个方法就是 event 批处理方法最终要调用 Listvent> 方法。

接下来重点调的方法就intercept 方法,这里要写一个非常重要的注释——该方法就是自定义拦截器最终具体功能实现的地方。要做的就是拦截数据,把里面做一个解析,拦截到就是一个个 event。

首先对数据 Get body因为拿到的是一个 event 对象,数据位于 body 当中,然后把它转为字符串的形式这里需要注意

接下来这两个参数做一个分隔,一个是字段之间的分符,把数据按照字段进行切割,符是 -t,这样就会返回一个字符串的数组。另外一个是角标的分,角标传入进来,还是135这样的字符算法,所以需要切割

切割完之后,首先用了一个新的字符串,接下来针对数据做一个遍历遍历切割的下标,这时候做一个判断,因为最终的结果需要做的是把指定的字段保留下来,并且针对指定的字段做一个加密,所以要判断遍历的字段是不是指定的字段。如果等于加密的字段,就做一个加密处理,如果不是加密的,直接把它添加到当中,因为加密的只有一个。所以说首先是00当然是等于它,等于它之后做一个加密,然后保存在新的字符串

图片7.png

简单来说,就是遍历需要保留下来的下标数组,这个叫做字符下标切割,遍历之后,匹配到加密地进行加密,当中进行加密的字段,只有几个01356遍历这个数组遍历01356当中,0匹配到之后就拿了0做一个加密剩下的1356不匹配,直接把它保留下来放在字符串可以

其他的247没有保留直接被舍弃了我们在进行添加的时候,还做了一个小插曲,因为它们原来字段的数据之间的分隔符是为 -t,因此,最终输出的时候也需要加个 -t,所以说在进行添加的时候,在每个段之间也加上分-t。

所以说这最后一个循环是指定最终输出的字符串之间字段隔符,也就说如果遍历出来没到结束,每个字段之间加上一个分符,这样就完成了指定保留字段的获取,并且对第一个字段0做了一个加密,那么拦截要完成之后千万不要忘记把拦截完之后的数据保存在 event body 当中,这是最后一步然后 return 释放回去这样就完成了一个字段的拦截

梳理:

通过类获取到用户的配置参数之后,最终在 intercept 当中做一个具体的拦截操作通过 get body 获取到数据的内容转为字符串进行切割,然后遍历并判断哪些数字需要保留,然后该加密的加密,该保留的保留,不需要的直接过滤掉,最终再把它给返回回去,就完成了拦截器的功能

这就是通过 java代 码实现自定代码的自定义拦截器的核心逻辑,当中要注意的就是静态内部类的使用,其他的相关规范都是它的一个相关的指定不需要过多改变,只要按照这个规范来就可以

相关文章
|
2月前
|
Shell
Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
【2月更文挑战第17天】Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
72 2
|
2月前
|
Java Linux
Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
【2月更文挑战第16天】Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
42 0
|
2月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
170 0
|
2月前
|
消息中间件 分布式计算 Kafka
硬核!Apache Hudi中自定义序列化和数据写入逻辑
硬核!Apache Hudi中自定义序列化和数据写入逻辑
44 1
|
6月前
|
SQL Java 应用服务中间件
Apache Doris 自定义C++ UDF之流程详解(1)
Apache Doris 自定义C++ UDF之流程详解(1)
129 0
|
5月前
|
数据采集 消息中间件 缓存
Apache Flume及快速安装
Apache Flume及快速安装
42 0
|
5月前
|
消息中间件 存储 分布式计算
Hadoop学习笔记(HDP)-Part.20 安装Flume
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
58 0
Hadoop学习笔记(HDP)-Part.20 安装Flume
|
9月前
|
应用服务中间件 Apache nginx
Apache Doris 自定义C++ UDF之流程详解(1)2
Apache Doris 自定义C++ UDF之流程详解(1)2
189 0
Apache Doris 自定义C++ UDF之流程详解(1)2
|
9月前
|
SQL IDE Java
Apache Doris 自定义C++ UDF之流程详解(1)1
Apache Doris 自定义C++ UDF之流程详解(1)1
136 0
|
数据采集 Java Shell
flume自定义拦截器
向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。 #### 分析: 需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。
166 0
flume自定义拦截器

推荐镜像

更多