Apache Flume-- 自定义 sink(扩展)--数据写入本地|学习笔记

简介: Apache Flume-- 自定义 sink(扩展)--数据写入本地

开发者学堂课程【Flume 基础应用实战-企业全场景解决方案Apache Flume-- 自定义 sink(扩展)--数据写入本地】学习笔记,与课程紧密联系,让用户快速学习知识。  

课程地址:https://developer.aliyun.com/learning/course/715/detail/12780


Apache Flume--自定义 sink(扩展)--数据写入本地


flume 中自定义 sink 组件的操作

sink 在服务当中主要充当把数据写到哪里一个功能,把它称之为数据的目的地也称之为数据的下沉地官方提供了很多 sink 的种类组件,包括把数据写到 hdfs等。

这么多组件当中,如果说有一个需求满足不了官方也提供了相关的结果和方法,让我们去自定义实现自己的 sink,比如看一下这个需求,首先不用关心数据从哪里来的,从哪里来跟 source 相关在这里是指从网络端口当中接触数据就是说在网络当中绑定一个端口,这个端口会源源不断地有数据进来,我们要做的是制定一个 sink 组件,把从端口当中说到的数据保存在本地磁盘中某个文件中去,就是写到 linux 机器上去,接下来看如何实现这个需求。

针对 sink 的实现原理比较简单,apr 也比较方便官方要求需要去继承一个叫做Abstractsink 的类,而且要实现 configurable 的一个接口里面有一个方法用于初始化,接下来看一下具体逻辑。

打开编译器看一下代码逻辑,在这里定一个类,叫做 mysink去继承 Abstractsink 这个抽象类,然后实configurable

图片1.png

这里定义了几个属性,上下文 context (用于解析相关参数)、还有文件的路径文件的名字还有个 file 对象。我们要做的是把数据写到本地的磁盘上,得到名字和路径,通常来说不应该写死而是通过采集方案动态地传输进来

回到讲义中来看一下,在去搭建这样的 sink 的时候,需要在方案当中做一个这样配置

图片2.png

filePath fileName 就要通过 Flume 的 context 解析到

图片3.png

所以在第一个方法 configure 当中主要就是通过上下文去解析采集方案中的参数获得文件路径文件名字,然后通过这个文件路径构建一个 file 的对象,判断一下这个路径在不在如果不存在,帮它 make 把它创建一下。如果存在,就不创建

初始化之后它的整个逻辑是这个叫做 process 方法,这个方法会被周期重复调用或者反复地调用这个方法Flume 的工作流程source 把数据读取到放在event 当中,然后传入到 channel 当中,因此针对 sink 组件主要是从 channel当中读取 event然后把数据放到所需要的地方中去写,所以说首先是 get channel获取到 channel。

然后这里有一个 transaction 的概念,通过配置大家可以感受到,在去配置一个channel 的时候,通常会有几个参数

Capacity最大可以存储 event 的数量,下面很重要叫做 transactionCapacity叫做事物容量100也就是说,在这100个 event 当中,要么一起成功,要么一起失败,如果有问题做一个回滚保证数据的安全性,不重复不遗漏

首先获得一个 transaction,然后开始 transaction这里用 while(true)做了一个死循环,这就是读数据的因为 channel 里面有没有数据不是 sink 可以决定的而在于 source 数据有没有读取数据,所以说这里不断的读取,读取的数据叫做 event 的对象,如果没有,就一直读取,但如果 event 的对象不为空说明读到数据了,读到数据直接 break 返回

图片4.png

这样就拿到event 对象之后注意数据是封装在 event 的 body 当中,所以说通过它的 body 数据体获得字节数据,然后把它变字符串,调用工具类,把它写到指定路径下,指定文件当中并且是一个最佳模式里面写这是提供的一个fileutils 工具类,写成功之后,做一个 commit 事物的提交

图片5.png

如果出现异常这里做了一个 try catch 操作,做一个事物的 rollback 回滚,非常重要

整体来看需要注意两点:

(1) sink 写的时候有一个 transaction 物的概念,要控制事开始结束以及出现问题怎么回本事物怎么关闭

(2) 数据是以 event 形式存在的,真正的数据是放在 event body 当中,是一个byte 数组存在的因此需要去获body把它转成所需要的样子

图片6.png

这里是往文件去写还可以调其他工具只要能够写出具体逻辑写到哪里都可以所以说自定义 sink 组件非常方便。写好代码之后就针对这个工程,打一个 jar选择 maven package project,选package 双击一下就可以打成一个 jar包。

图片7.png

工程 target 下有一个 example-flume-1.1.jar,把它复制出来放在 Flume lib 路径下

图片8.png

jar 拖拽上传之后 cd.. conf 当中,编写采集方案。

 

图片9.png

上面针对三个组件起个名字,source 是使用它内置的 source 叫做 netcat监听一个网络端口的数据,把数据从端口中采集到放在 channel 当中,sink 就是我们自定sink路径就是刚才写的类的全路径文件的 path 可以自己根据需求来改,接下来把采集方案放在 conf 路径下。

图片10.png

有了之后,cd.. 返回上一级,就是 Flume 的根目录下。因为这里没有配置 Flume的环境变量,只能在根目录下进行相关的启动

接下来回到我义上把启动 Flume 的命令做一个执行

回到服务器当中,做一个执行

这样就开始执行了,它绑定的是本机5678端口

接下来模拟端口发送数据,这里用一个叫做 telnet 的 linux 小命,如果没有telnet使用 yum 在线安装一下,如果有,直接 telenet 到指定机器指定端口上,接下来克隆一个窗口来看一下

样就通过工具连接到 node-1 端口上,接下来发送数据看它能不能采集到写到本地文件当中

比如发送一hello flume这里写 ok, 说明把数据发过去了,然后可以验证它有没有收到数据多写几个,如果写错了,自己注意按住 ctrl 键进行删除,再克隆一个窗口,来到路径 /export/servers 下

图片11.png

这时候多了一个文件,叫做 filesink.txtvim 编辑器读取一下里面的内容。

可以看到数据已经在里面了,这样就完成数据的采集,当然针对文件的格式是否要追加以及其他的细节等,都可以经过各种方式进行改变

整体逻辑非常清晰重点就在于去实现这一个类然后里边有一个接口,这样就完成了自定义 sink 组件的操作,在企业当中,如果真的需要去满足自己的业务需求,就可以按照这个规范进行相关的开发编写。

 

相关文章
|
2月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
36 1
|
4月前
|
物联网 数据管理 Apache
拥抱IoT浪潮,Apache IoTDB如何成为你的智能数据守护者?解锁物联网新纪元的数据管理秘籍!
【8月更文挑战第22天】随着物联网技术的发展,数据量激增对数据库提出新挑战。Apache IoTDB凭借其面向时间序列数据的设计,在IoT领域脱颖而出。相较于传统数据库,IoTDB采用树形数据模型高效管理实时数据,具备轻量级结构与高并发能力,并集成Hadoop/Spark支持复杂分析。在智能城市等场景下,IoTDB能处理如交通流量等数据,为决策提供支持。IoTDB还提供InfluxDB协议适配器简化迁移过程,并支持细致的权限管理确保数据安全。综上所述,IoTDB在IoT数据管理中展现出巨大潜力与竞争力。
117 1
|
2月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
72 1
|
2月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
87 1
|
2月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
80 1
|
2月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
56 1
|
3月前
|
SQL 消息中间件 Java
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
通过兼容 Connector 插件,Apache Doris 能够支持 Trino/Presto 可对接的所有数据源,而无需改动 Doris 的内核代码。
兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)
|
4月前
|
存储 消息中间件 人工智能
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
早期 MiniMax 基于 Grafana Loki 构建了日志系统,在资源消耗、写入性能及系统稳定性上都面临巨大的挑战。为此 MiniMax 开始寻找全新的日志系统方案,并基于阿里云数据库 SelectDB 版内核 Apache Doris 升级了日志系统,新系统已接入 MiniMax 内部所有业务线日志数据,数据规模为 PB 级, 整体可用性达到 99.9% 以上,10 亿级日志数据的检索速度可实现秒级响应。
AI大模型独角兽 MiniMax 基于阿里云数据库 SelectDB 版内核 Apache Doris 升级日志系统,PB 数据秒级查询响应
|
3月前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
162 11
|
4月前
|
数据采集 存储 Apache
Flume核心组件大揭秘:Agent、Source、Channel、Sink,一文掌握数据采集精髓!
【8月更文挑战第24天】Flume是Apache旗下的一款顶级服务工具,专为大规模日志数据的收集、聚合与传输而设计。其架构基于几个核心组件:Agent、Source、Channel及Sink。Agent作为基础执行单元,整合Source(数据采集)、Channel(数据暂存)与Sink(数据传输)。本文通过实例深入剖析各组件功能与配置,包括Avro、Exec及Spooling Directory等多种Source类型,Memory与File Channel方案以及HDFS、Avro和Logger等Sink选项,旨在提供全面的Flume应用指南。
253 1

推荐镜像

更多
下一篇
DataWorks