开发者学堂课程【Flume 基础应用实战-企业全场景解决方案:Apache Flume-- 自定义 sink(扩展)--数据写入本地】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/715/detail/12780
Apache Flume--自定义 sink(扩展)--数据写入本地
flume 中自定义 sink 组件的操作
sink 在服务当中主要充当把数据写到哪里的一个功能,把它称之为数据的目的地,也称之为数据的下沉地,官方提供了很多 sink 的种类组件,包括把数据写到 hdfs等。
这么多组件当中,如果说有一个需求满足不了,官方也提供了相关的结果和方法,让我们去自定义实现自己的 sink,比如看一下这个需求,首先不用关心数据从哪里来的,从哪里来跟 source 相关,在这里是指从网络端口当中接触数据,就是说在网络当中绑定一个端口,这个端口会源源不断地有数据进来,我们要做的是制定一个 sink 组件,把从端口当中说到的数据保存在本地磁盘中某个文件中去,就是写到 linux 机器上去,接下来看如何实现这个需求。
针对 sink 的实现,原理比较简单,apr 也比较方便,官方要求需要去继承一个叫做Abstractsink 的类,而且要实现 configurable 的一个接口,里面有一个方法用于初始化,接下来看一下具体逻辑。
打开编译器看一下代码逻辑,在这里定一个类,叫做 mysink,去继承 Abstractsink 这个抽象类,然后实现 configurable。
这里定义了几个属性,上下文 context (用于解析相关参数)、还有文件的路径、文件的名字、还有个 file 对象。我们要做的是把数据写到本地的磁盘上,得到名字和路径,通常来说不应该写死,而是通过采集方案动态地传输进来。
回到讲义中来看一下,在去搭建这样的 sink 的时候,需要在方案当中做一个这样的配置。
filePath 和 fileName 就要通过 Flume 的 context 解析到。
所以在第一个方法 configure 当中,主要就是通过上下文去解析采集方案中的参数,获得文件路径和文件名字,然后通过这个文件路径构建一个 file 的对象,判断一下这个路径在不在,如果不存在,帮它 make 把它创建一下。如果存在,就不创建。
初始化之后,它的整个逻辑是这个叫做 process 方法,这个方法会被周期重复地调用或者反复地调用,这个方法在 Flume 的工作流程中 source 把数据读取到放在event 当中,然后传入到 channel 当中,因此针对 sink 组件,主要是从 channel当中读取 event,然后把数据放到所需要的地方中去写,所以说首先是 get channel获取到 channel。
然后这里有一个 transaction 的概念,通过配置大家可以感受到,在去配置一个channel 的时候,通常会这有几个参数。
Capacity,最大可以存储 event 的数量,下面的很重要叫做 transactionCapacity,叫做事物容量100,也就是说,在这100个 event 当中,要么一起成功,要么一起失败,如果有问题就做一个回滚,保证数据的安全性,不重复不遗漏。
首先获得一个 transaction,然后开始 transaction,这里用 while(true)做了一个死循环,这就是读数据的,因为 channel 里面有没有数据不是 sink 可以决定的,而在于 source 数据源有没有读取数据,所以说这里不断的读取,读取的数据叫做 event 的对象,如果没有,就一直读取,但如果 event 的对象不为空,说明读到数据了,读到数据后直接 break 返回。
这样就拿到了 event 对象,之后注意数据是封装在 event 的 body 当中,所以说通过它的 body 数据体获得字节数据,然后把它变成字符串,调用工具类,把它写到指定路径下,指定文件当中,并且是一个最佳模式往里面写。这是提供的一个fileutils 工具类,写成功之后,做一个 commit 事物的提交。
如果出现异常,这里做了一个 try catch 操作,做一个事物的 rollback 回滚,非常重要。
整体来看需要注意两点:
(1) 在往 sink 写的时候有一个 transaction 事物的概念,要控制事物开始结束,以及出现问题怎么回本,事物怎么关闭。
(2) 数据是以 event 形式存在的,真正的数据是放在 event 的 body 当中,是一个byte 数组存在的,因此需要去获得 body,把它转成所需要的样子。
这里是往文件去写,还可以调其他工具,只要能够写出具体逻辑,写到哪里都可以,所以说自定义 sink 组件非常方便。写好代码之后就针对这个工程,打一个 jar包,选择 maven package project,选择 package 双击一下就可以打成一个 jar包。
工程 target 下有一个 example-flume-1.1.jar,把它复制出来放在 Flume 的 lib 路径下。
把 jar 包拖拽上传。之后 cd.. 到 conf 当中,编写采集方案。
上面针对三个组件起个名字,source 是使用它内置的 source 叫做 netcat,监听一个网络端口的数据,把数据从端口中采集到放在 channel 当中,sink 就是我们自定义的 sink,路径就是刚才写的类的全路径,文件的 path 可以自己根据需求来改,接下来把采集方案放在 conf 路径下。
有了之后,cd.. 返回上一级,也就是 Flume 的根目录下。因为这里没有配置 Flume的环境变量,只能在根目录下进行相关的启动。
接下来回到我义上把启动 Flume 的命令做一个执行。
回到服务器当中,做一个执行。
这样就开始执行了,它绑定的是本机5、6、7、8端口。
接下来模拟往端口发送数据,这里用一个叫做 telnet 的 linux 小命令,如果没有telnet,使用 yum 在线安装一下,如果有,直接 telenet 到指定机器,指定端口上,接下来克隆一个窗口来看一下。
这样就通过工具连接到 node-1 端口上,接下来发送数据看它能不能采集到并写到本地文件当中。
比如发送一个 hello flume,这里写 ok, 说明把数据发过去了,然后可以验证它有没有收到数据,多写几个,如果写错了,自己注意按住 ctrl 键进行删除,再克隆一个窗口,来到路径 /export/servers 下。
这时候多了一个文件,叫做 filesink.txt,用 vim 编辑器读取一下里面的内容。
可以看到数据已经在里面了,这样就完成了数据的采集,当然针对文件的格式是否要追加以及其他的细节等,都可以经过各种方式进行改变。
整体逻辑非常清晰,重点就在于去实现这一个类,然后里边有一个接口,这样就完成了自定义 sink 组件的操作,在企业当中,如果真的需要去满足自己的业务需求,就可以按照这个规范进行相关的开发编写。