Apache Flume-- 自定义 sink(扩展)--数据写入本地|学习笔记

简介: Apache Flume-- 自定义 sink(扩展)--数据写入本地

开发者学堂课程【Flume 基础应用实战-企业全场景解决方案Apache Flume-- 自定义 sink(扩展)--数据写入本地】学习笔记,与课程紧密联系,让用户快速学习知识。  

课程地址:https://developer.aliyun.com/learning/course/715/detail/12780


Apache Flume--自定义 sink(扩展)--数据写入本地


flume 中自定义 sink 组件的操作

sink 在服务当中主要充当把数据写到哪里一个功能,把它称之为数据的目的地也称之为数据的下沉地官方提供了很多 sink 的种类组件,包括把数据写到 hdfs等。

这么多组件当中,如果说有一个需求满足不了官方也提供了相关的结果和方法,让我们去自定义实现自己的 sink,比如看一下这个需求,首先不用关心数据从哪里来的,从哪里来跟 source 相关在这里是指从网络端口当中接触数据就是说在网络当中绑定一个端口,这个端口会源源不断地有数据进来,我们要做的是制定一个 sink 组件,把从端口当中说到的数据保存在本地磁盘中某个文件中去,就是写到 linux 机器上去,接下来看如何实现这个需求。

针对 sink 的实现原理比较简单,apr 也比较方便官方要求需要去继承一个叫做Abstractsink 的类,而且要实现 configurable 的一个接口里面有一个方法用于初始化,接下来看一下具体逻辑。

打开编译器看一下代码逻辑,在这里定一个类,叫做 mysink去继承 Abstractsink 这个抽象类,然后实configurable

图片1.png

这里定义了几个属性,上下文 context (用于解析相关参数)、还有文件的路径文件的名字还有个 file 对象。我们要做的是把数据写到本地的磁盘上,得到名字和路径,通常来说不应该写死而是通过采集方案动态地传输进来

回到讲义中来看一下,在去搭建这样的 sink 的时候,需要在方案当中做一个这样配置

图片2.png

filePath fileName 就要通过 Flume 的 context 解析到

图片3.png

所以在第一个方法 configure 当中主要就是通过上下文去解析采集方案中的参数获得文件路径文件名字,然后通过这个文件路径构建一个 file 的对象,判断一下这个路径在不在如果不存在,帮它 make 把它创建一下。如果存在,就不创建

初始化之后它的整个逻辑是这个叫做 process 方法,这个方法会被周期重复调用或者反复地调用这个方法Flume 的工作流程source 把数据读取到放在event 当中,然后传入到 channel 当中,因此针对 sink 组件主要是从 channel当中读取 event然后把数据放到所需要的地方中去写,所以说首先是 get channel获取到 channel。

然后这里有一个 transaction 的概念,通过配置大家可以感受到,在去配置一个channel 的时候,通常会有几个参数

Capacity最大可以存储 event 的数量,下面很重要叫做 transactionCapacity叫做事物容量100也就是说,在这100个 event 当中,要么一起成功,要么一起失败,如果有问题做一个回滚保证数据的安全性,不重复不遗漏

首先获得一个 transaction,然后开始 transaction这里用 while(true)做了一个死循环,这就是读数据的因为 channel 里面有没有数据不是 sink 可以决定的而在于 source 数据有没有读取数据,所以说这里不断的读取,读取的数据叫做 event 的对象,如果没有,就一直读取,但如果 event 的对象不为空说明读到数据了,读到数据直接 break 返回

图片4.png

这样就拿到event 对象之后注意数据是封装在 event 的 body 当中,所以说通过它的 body 数据体获得字节数据,然后把它变字符串,调用工具类,把它写到指定路径下,指定文件当中并且是一个最佳模式里面写这是提供的一个fileutils 工具类,写成功之后,做一个 commit 事物的提交

图片5.png

如果出现异常这里做了一个 try catch 操作,做一个事物的 rollback 回滚,非常重要

整体来看需要注意两点:

(1) sink 写的时候有一个 transaction 物的概念,要控制事开始结束以及出现问题怎么回本事物怎么关闭

(2) 数据是以 event 形式存在的,真正的数据是放在 event body 当中,是一个byte 数组存在的因此需要去获body把它转成所需要的样子

图片6.png

这里是往文件去写还可以调其他工具只要能够写出具体逻辑写到哪里都可以所以说自定义 sink 组件非常方便。写好代码之后就针对这个工程,打一个 jar选择 maven package project,选package 双击一下就可以打成一个 jar包。

图片7.png

工程 target 下有一个 example-flume-1.1.jar,把它复制出来放在 Flume lib 路径下

图片8.png

jar 拖拽上传之后 cd.. conf 当中,编写采集方案。

 

图片9.png

上面针对三个组件起个名字,source 是使用它内置的 source 叫做 netcat监听一个网络端口的数据,把数据从端口中采集到放在 channel 当中,sink 就是我们自定sink路径就是刚才写的类的全路径文件的 path 可以自己根据需求来改,接下来把采集方案放在 conf 路径下。

图片10.png

有了之后,cd.. 返回上一级,就是 Flume 的根目录下。因为这里没有配置 Flume的环境变量,只能在根目录下进行相关的启动

接下来回到我义上把启动 Flume 的命令做一个执行

回到服务器当中,做一个执行

这样就开始执行了,它绑定的是本机5678端口

接下来模拟端口发送数据,这里用一个叫做 telnet 的 linux 小命,如果没有telnet使用 yum 在线安装一下,如果有,直接 telenet 到指定机器指定端口上,接下来克隆一个窗口来看一下

样就通过工具连接到 node-1 端口上,接下来发送数据看它能不能采集到写到本地文件当中

比如发送一hello flume这里写 ok, 说明把数据发过去了,然后可以验证它有没有收到数据多写几个,如果写错了,自己注意按住 ctrl 键进行删除,再克隆一个窗口,来到路径 /export/servers 下

图片11.png

这时候多了一个文件,叫做 filesink.txtvim 编辑器读取一下里面的内容。

可以看到数据已经在里面了,这样就完成数据的采集,当然针对文件的格式是否要追加以及其他的细节等,都可以经过各种方式进行改变

整体逻辑非常清晰重点就在于去实现这一个类然后里边有一个接口,这样就完成了自定义 sink 组件的操作,在企业当中,如果真的需要去满足自己的业务需求,就可以按照这个规范进行相关的开发编写。

 

相关文章
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
265 1
|
存储 人工智能 数据处理
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
秉承“以场景驱动创新” 的核心理念,持续深耕三大核心场景的关键能力,并对大模型 GenAI 场景的融合应用进行重点投入,为智能时代构建实时、高效、统一的数据底座。
625 10
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
|
存储 运维 监控
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
日志数据已成为企业洞察系统状态、监控网络安全及分析业务动态的宝贵资源。网易云音乐引入 Apache Doris 作为日志库新方案,替换了 ClickHouse。解决了 ClickHouse 运维复杂、不支持倒排索引的问题。目前已经稳定运行 3 个季度,规模达到 50 台服务器, 倒排索引将全文检索性能提升7倍,2PB 数据,每天新增日志量超过万亿条,峰值写入吞吐 6GB/s 。
1034 5
从 ClickHouse 到 Apache Doris:在网易云音乐日增万亿日志数据场景下的落地
|
存储 SQL 数据挖掘
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
湖仓一体架构融合了数据湖的低成本、高扩展性,以及数据仓库的高性能、强数据治理能力,高效应对大数据时代的挑战。为助力企业实现湖仓一体的建设,Apache Doris 提出了数据无界和湖仓无界核心理念,并结合自身特性,助力企业加速从 0 到 1 构建湖仓体系,降低转型过程中的风险和成本。本文将对湖仓一体演进及 Apache Doris 湖仓一体方案进行介绍。
1337 1
数据无界、湖仓无界, Apache Doris 湖仓一体解决方案全面解读(上篇)
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
409 1
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
247 1
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
354 1
|
6月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1146 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
561 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
8月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
967 9
Apache Flink:从实时数据分析到实时AI

推荐镜像

更多