开发者学堂课程【Flume 基础应用实战-企业全场景解决方案:Apache Flume-自定义拦截器-功能实现】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/715/detail/12777
Apache Flume-自定义拦截器-功能实现
自定义拦截器最终功能的实现
当开发好相关代码之后,需要把自定义拦截器的程序,打成一个 jar 包,因为配置的相关的 Maven 插件,所以点击选择 Maven project,选择 lifestyle 当中的Package 双击,这样就会在工程的 Package 目录下打上一个相关的jar包,静静地等待一会,显示 build success,可以发现在 target 目录下有了一个 jar 包。
把这个 jar 包拖出来,这就是自定义拦截器的 jar 包。有了 jar 包之后,需要把它 flume 的 lib 下,来到第一台机器上,第一步 cd 到 lib 当中,在这个文件夹路径下把 jar 包拖曳进来。
拖拽之后,接下来就是配置方案的编写。打开参考资料,有一个叫做 spool-interceptor-hdfs.conf 的文件 ,
打开看一下,里面列举出来了采用自定义拦截器收集数据的方案。
首先定义了三个组件的名字。
Channel 是内存的 channel 缓存。
Source 绑定了 channel 为 c1,路径叫做 spooldir 类型,监控文件夹,这个文件在 /root/logs3/,下面是 batchsize,50一个批次。
拦截器的内容是 i1,i2 两个拦截器,一个是它自带的时间拦截器,一个是自定义拦截器,注意包名一定要跟开发的时候保持一致,叫做 cn.itcast.interceptor ,所以这也叫 Interceptor&Builder,我们的内部类。
下面这些根据需求可以进行相关的修改。
接下来把这个采集方案放在 Flume 的 conf 路径下。
打开 Flume,首先 cd 到 conf 当中,编辑一下方案名字,配置好之后,一定要注意打开 insert 的编辑模式,把里面的参数复制过来,下面这个命令不要复制过来,复制一遍,做一个相关的粘贴,要保证当中的 ip 路径相关的参数包括包的路径保持一致,不然就会出错,接下来做一个保存完之后,准备下数据。
它放在 /root/logs4/ 路径下下,因此来到 logs4 创建文件夹。
cd 到 logs4 当中,把刚才的数据文件上传过来,接下来就回到 Flume 当中,执行一下 Flume,看一下它能不能进行数据的收集,收集到 Flume 上面之后,并且把数据做一个拦截加密的过程。接下来打开文件当中的启动命令,复制一下,做一个执行。
现在这些把文件写在这个路径下,前缀后面叫 itcasr ,后缀叫做 dat,处理完之后,看一下数据最终的结果长什么样。
现在没有变,因为还是一个临时文件,它没有发生滚动。
滚动控制条件为20秒,可以发现现在滚动完毕。最后把这个文件下载下来,验证一下到底有没有对数据做相关的加密工作,右键打开来看一下,可以发现左边是原始的数据,当中有手机号,7个字段,但数字当中,手机号做了一个 MD5 加密,而且只保留1356这四个字段,这样就满足需求了。
在企业当中,这一个自定义拦截器用的比较多,因为每家公司可能都有自己的需求,不管是针对什么加密,什么拦截,什么处理,只要按照它的规范来开发代码就可以满足于相关的需求,这就是我们自定义拦截器的功能实现。