Apache Flume-- 自定义 sink(扩展)--数据写入本地|学习笔记

简介: Apache Flume-- 自定义 sink(扩展)--数据写入本地

开发者学堂课程【Flume 基础应用实战-企业全场景解决方案Apache Flume-- 自定义 sink(扩展)--数据写入本地】学习笔记,与课程紧密联系,让用户快速学习知识。  

课程地址:https://developer.aliyun.com/learning/course/715/detail/12780


Apache Flume--自定义 sink(扩展)--数据写入本地


flume 中自定义 sink 组件的操作

sink 在服务当中主要充当把数据写到哪里一个功能,把它称之为数据的目的地也称之为数据的下沉地官方提供了很多 sink 的种类组件,包括把数据写到 hdfs等。

这么多组件当中,如果说有一个需求满足不了官方也提供了相关的结果和方法,让我们去自定义实现自己的 sink,比如看一下这个需求,首先不用关心数据从哪里来的,从哪里来跟 source 相关在这里是指从网络端口当中接触数据就是说在网络当中绑定一个端口,这个端口会源源不断地有数据进来,我们要做的是制定一个 sink 组件,把从端口当中说到的数据保存在本地磁盘中某个文件中去,就是写到 linux 机器上去,接下来看如何实现这个需求。

针对 sink 的实现原理比较简单,apr 也比较方便官方要求需要去继承一个叫做Abstractsink 的类,而且要实现 configurable 的一个接口里面有一个方法用于初始化,接下来看一下具体逻辑。

打开编译器看一下代码逻辑,在这里定一个类,叫做 mysink去继承 Abstractsink 这个抽象类,然后实configurable

图片1.png

这里定义了几个属性,上下文 context (用于解析相关参数)、还有文件的路径文件的名字还有个 file 对象。我们要做的是把数据写到本地的磁盘上,得到名字和路径,通常来说不应该写死而是通过采集方案动态地传输进来

回到讲义中来看一下,在去搭建这样的 sink 的时候,需要在方案当中做一个这样配置

图片2.png

filePath fileName 就要通过 Flume 的 context 解析到

图片3.png

所以在第一个方法 configure 当中主要就是通过上下文去解析采集方案中的参数获得文件路径文件名字,然后通过这个文件路径构建一个 file 的对象,判断一下这个路径在不在如果不存在,帮它 make 把它创建一下。如果存在,就不创建

初始化之后它的整个逻辑是这个叫做 process 方法,这个方法会被周期重复调用或者反复地调用这个方法Flume 的工作流程source 把数据读取到放在event 当中,然后传入到 channel 当中,因此针对 sink 组件主要是从 channel当中读取 event然后把数据放到所需要的地方中去写,所以说首先是 get channel获取到 channel。

然后这里有一个 transaction 的概念,通过配置大家可以感受到,在去配置一个channel 的时候,通常会有几个参数

Capacity最大可以存储 event 的数量,下面很重要叫做 transactionCapacity叫做事物容量100也就是说,在这100个 event 当中,要么一起成功,要么一起失败,如果有问题做一个回滚保证数据的安全性,不重复不遗漏

首先获得一个 transaction,然后开始 transaction这里用 while(true)做了一个死循环,这就是读数据的因为 channel 里面有没有数据不是 sink 可以决定的而在于 source 数据有没有读取数据,所以说这里不断的读取,读取的数据叫做 event 的对象,如果没有,就一直读取,但如果 event 的对象不为空说明读到数据了,读到数据直接 break 返回

图片4.png

这样就拿到event 对象之后注意数据是封装在 event 的 body 当中,所以说通过它的 body 数据体获得字节数据,然后把它变字符串,调用工具类,把它写到指定路径下,指定文件当中并且是一个最佳模式里面写这是提供的一个fileutils 工具类,写成功之后,做一个 commit 事物的提交

图片5.png

如果出现异常这里做了一个 try catch 操作,做一个事物的 rollback 回滚,非常重要

整体来看需要注意两点:

(1) sink 写的时候有一个 transaction 物的概念,要控制事开始结束以及出现问题怎么回本事物怎么关闭

(2) 数据是以 event 形式存在的,真正的数据是放在 event body 当中,是一个byte 数组存在的因此需要去获body把它转成所需要的样子

图片6.png

这里是往文件去写还可以调其他工具只要能够写出具体逻辑写到哪里都可以所以说自定义 sink 组件非常方便。写好代码之后就针对这个工程,打一个 jar选择 maven package project,选package 双击一下就可以打成一个 jar包。

图片7.png

工程 target 下有一个 example-flume-1.1.jar,把它复制出来放在 Flume lib 路径下

图片8.png

jar 拖拽上传之后 cd.. conf 当中,编写采集方案。

 

图片9.png

上面针对三个组件起个名字,source 是使用它内置的 source 叫做 netcat监听一个网络端口的数据,把数据从端口中采集到放在 channel 当中,sink 就是我们自定sink路径就是刚才写的类的全路径文件的 path 可以自己根据需求来改,接下来把采集方案放在 conf 路径下。

图片10.png

有了之后,cd.. 返回上一级,就是 Flume 的根目录下。因为这里没有配置 Flume的环境变量,只能在根目录下进行相关的启动

接下来回到我义上把启动 Flume 的命令做一个执行

回到服务器当中,做一个执行

这样就开始执行了,它绑定的是本机5678端口

接下来模拟端口发送数据,这里用一个叫做 telnet 的 linux 小命,如果没有telnet使用 yum 在线安装一下,如果有,直接 telenet 到指定机器指定端口上,接下来克隆一个窗口来看一下

样就通过工具连接到 node-1 端口上,接下来发送数据看它能不能采集到写到本地文件当中

比如发送一hello flume这里写 ok, 说明把数据发过去了,然后可以验证它有没有收到数据多写几个,如果写错了,自己注意按住 ctrl 键进行删除,再克隆一个窗口,来到路径 /export/servers 下

图片11.png

这时候多了一个文件,叫做 filesink.txtvim 编辑器读取一下里面的内容。

可以看到数据已经在里面了,这样就完成数据的采集,当然针对文件的格式是否要追加以及其他的细节等,都可以经过各种方式进行改变

整体逻辑非常清晰重点就在于去实现这一个类然后里边有一个接口,这样就完成了自定义 sink 组件的操作,在企业当中,如果真的需要去满足自己的业务需求,就可以按照这个规范进行相关的开发编写。

 

相关文章
|
1月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
30 1
|
6月前
|
Shell
Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
【2月更文挑战第17天】Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
302 2
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
2月前
|
SQL 消息中间件 Java
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
通过兼容 Connector 插件,Apache Doris 能够支持 Trino/Presto 可对接的所有数据源,而无需改动 Doris 的内核代码。
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
|
3月前
|
数据采集 存储 Apache
Flume核心组件大揭秘:Agent、Source、Channel、Sink,一文掌握数据采集精髓!
【8月更文挑战第24天】Flume是Apache旗下的一款顶级服务工具,专为大规模日志数据的收集、聚合与传输而设计。其架构基于几个核心组件:Agent、Source、Channel及Sink。Agent作为基础执行单元,整合Source(数据采集)、Channel(数据暂存)与Sink(数据传输)。本文通过实例深入剖析各组件功能与配置,包括Avro、Exec及Spooling Directory等多种Source类型,Memory与File Channel方案以及HDFS、Avro和Logger等Sink选项,旨在提供全面的Flume应用指南。
124 1
|
6月前
|
Java Linux
Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
【2月更文挑战第16天】Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
126 0
|
5月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
126 2
|
6月前
|
存储 数据采集 监控
Flume 拦截器概念及自定义拦截器的运用
Apache Flume 的拦截器是事件处理组件,位于Source和Channel之间,用于在写入Channel前对数据进行转换、提取或删除。它们支持数据处理和转换、数据增强、数据过滤以及监控和日志功能。要创建自定义拦截器,需实现Interceptor接口,包含initialize、intercept、intercept(List<Event>)和close方法。配置拦截器时,通过Builder模式实现Interceptor.Builder接口。在Flume配置文件中指定拦截器全类名,如`TestInterceptor$Builder`,然后启动Flume进行测试。
175 0
|
6月前
|
Apache PHP
百度搜索:蓝易云【Apache环境php安装扩展swoole。】
通过以上步骤,你就可以在Apache环境中成功安装和配置Swoole扩展了。请确保你按照正确的步骤进行操作,并根据你的系统和环境进行相应的调整。如果遇到问题,你可以参考Swoole官方文档或社区的支持资源来获取更多帮助。
54 1
|
6月前
|
SQL 消息中间件 存储
案例:Flume消费Kafka数据保存Hive
案例:Flume消费Kafka数据保存Hive
203 0

推荐镜像

更多