Apache Flume- 自定义拦截器-需求描述|学习笔记

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 快速学习 Apache Flume- 自定义拦截器-需求描述

开发者学堂课程【Flume 基础应用实战-企业全场景解决方案 Apache Flume- 自定义拦截器-需求描述】学习笔记,与课程紧密联系,让用户快速学习知识。  

课程地址:https://developer.aliyun.com/learning/course/715/detail/12775


Apache Flume- 自定义拦截器-需求描述

 

目录

一、案例背景介绍

二、自定义拦截器

 

一、案例背景介绍:

Flume 作为一个大数据的采集、传输软件,如果没有拦截器就意味着它收集到什么样的数据,传递到目的地就还是什么样的数据。Flume 有各种自带的拦截器,比如:静态拦截器、TimestampInterceptor(可以往 event 的 header 当中插入自定义的kv 标识对)、HostInterceptor、 RegexExtractorInterceptor 等,通过使用不同的拦截器,实现不同的功能。

但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的 Flume 处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理。

 

二、自定义拦截器

1、定义:根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义 Flume 拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。

2、举例:比如我们是一个电商公司,收集的数据都是用户的订单信息,当中有用户的手机号、地址,要想把这些信息加密,可以在 flume 当中做一些简单的处理,接下来看一下这个需求的具体实现。

图片1.png

首先看一下项目最终要达到的效果,比如说原始的数据如上图,第一个字段表示手机号,后面有7个字段,每一行记录就是一条记录,需要对记录做一个采集,但是采集过程当中需要做两件事。

第一,留下需要的字段,如1356,舍弃不需要的字段,如247,第二,前面的是手机号,需要在采集中把手机号做一个 M D5 的加密,保证它的安全性,这属于用户个人隐私信息,该怎么去完成这个需求?

如果使用原始的 flume,不加拦截器,应该没有这方面能力,这时就可以去开发一个拦截器来满足于我们的需求,去拦截只需要的字段,并且对指定字段进行加密。

3、自定义拦截器的需求描述:

现在先不看代码,直接到最后一步看一下,假如现在已经开发好了,在 flume 采集当中,该怎么去配置。

图片2.png

在讲义上有一个核心的配置,比如配置相关的采集方案,首先来指定三个组件,channel、source、sink 分别为 c1 、r1、 s1,因为 source 对接数据,所以要把拦截器添加在 source 中。

图片3.png

重点看一下 source 该怎么去实现,下面这个配置,source 绑定的 channel 为 c,它的类型叫 spooldir,说明它可以去监控一个目录,这个目录下如果有文件的变化,它就会把变化的内容收集到。

图片4.png

重点在下面,给 C1 绑定了两个拦截器 interceptor,一个叫 i1,一个叫 i2,其中 i2的类型叫做 timestamp 是一个时间戳拦截器,这是它自定义的拦截器,或者说叫它内带的拦截器。可以去拦截时间相关功能,我们开发的自定义拦截器叫 i1,但我们现在并不知道它里面是怎么实现的,看一下该怎么去配置,首先需要去指定 i1 拦截器的类型,可以发现这个类型、这个包就是我们自定义开发的实现,叫cn.itcast,叫做 CustomParameterInterceptor,当中还有一个内部类叫做Builder,它最终应该调的是这个类当中的一个内部类,叫做 Builder类,可以猜想一下,它是通过这个类去构造出来外部 Interceptor 这个拦截器的实例。

下面还跟了四个配置参数,好像看起来有一点闷缺,但是可以认真梳理一下,看一下刚才那个数据结果,传递的这样一条是一个记录,当中不同的字段之间是以制表符来分割的,那怎么去知道要去加密哪些字段 、拦截哪些字段、释放哪些字段呢?可以通过相关的参数来指定,这些参数既然通过这里可以开启配置功能,就意味着这些参数会随着 flume 的运行传递到类当中,里面就有相关的代码去解析这些参数。

看一下这几个参数表示什么意思?

图片5.png

第一个 fileds_separator,叫做字段之间的分割符,这里是 unicode 编码来表示的。首先需要知道处理数据,它们字段之间的分割符是什么,是制表符,就是 -t,当然这里要以 unicode 编码来表示。第二,要传入一个 index,所以需要去保留哪些数据,哪些数据不需要管。这里传入了0,1,3,5,6,从下标开始手机号100,300,500,600,这些字段需要留下,0,1,3,5,6保留字的一个角标,角标之间是也有分割符,这里以逗号来分割,用 unicode 编码来表示 u002c。

图片6.png

接下来看一下最后一个字段配置属性,给每个字段进行加密,加密字段的角标是0,0表示第一个字段,这些相当于没有在代码中写死,而是以参数的形式开放给用户指定,意味着用户可以在这里去指定,根据你的业务需求指定加密哪些字段,哪些字段分割符,哪些进行抛弃,哪些进行相关的使用,这样就显得更加灵活,如果想做一个商业版的,可以把这一块写一个说明文档释放出去,别人根据规则就知道要填写什么了。

把这些参数配置完之后,当 flume 去加载拦截器的时候,就可以根据里面的相关参数,去切割数据,并且加密指定的字段,保留指定的字段,达到最终的功能。

因此,拦截器从最终效果来看,它就是 flume 得用的一个类,而具体的功能需要留到代码当中去实现,这是从配置最终使用的层面来反推它内部所需要的东西——接收这些参数,解析参数,拦截功能,然后拦截数据,去对数据做相关处理,再把数据释放掉。

这就是我们的需求,通过自定义拦截去拦截所需要的的字段,并且对指定的字段进行加密。

相关文章
|
Shell
Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
【2月更文挑战第17天】Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
577 2
|
Java Linux
Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
【2月更文挑战第16天】Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
248 0
|
存储 数据采集 监控
Flume 拦截器概念及自定义拦截器的运用
Apache Flume 的拦截器是事件处理组件,位于Source和Channel之间,用于在写入Channel前对数据进行转换、提取或删除。它们支持数据处理和转换、数据增强、数据过滤以及监控和日志功能。要创建自定义拦截器,需实现Interceptor接口,包含initialize、intercept、intercept(List<Event>)和close方法。配置拦截器时,通过Builder模式实现Interceptor.Builder接口。在Flume配置文件中指定拦截器全类名,如`TestInterceptor$Builder`,然后启动Flume进行测试。
565 0
|
中间件 Java 数据库连接
【Flume中间件】(13)自定义Source
【Flume中间件】(13)自定义Source
272 74
|
中间件
【Flume中间件】(14)自定义Sink
【Flume中间件】(14)自定义Sink
202 63
|
中间件
【Flume中间件】(12)自定义拦截器
【Flume中间件】(12)自定义拦截器
194 53
|
数据采集 消息中间件 缓存
Apache Flume及快速安装
Apache Flume及快速安装
224 0
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
数据采集 Java Shell
flume自定义拦截器
向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。 #### 分析: 需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。
280 0
flume自定义拦截器
|
分布式计算 Hadoop
flume报错记录:java.net.ConnectException:拒绝连接; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
flume报错记录:java.net.ConnectException:拒绝连接; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
778 0
flume报错记录:java.net.ConnectException:拒绝连接; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused

热门文章

最新文章

推荐镜像

更多