Apache Flume-自定义 source(扩展)|学习笔记

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云解析 DNS,旗舰版 1个月
简介: 快速学习 Apache Flume-自定义 source(扩展),具体实现代码逻辑 Flume 提供了很多内置的 source、sink、channel。但是在某些场合下,它自带的组件可能不满足需求,为此 Flume 官方也提供了相关的接口,我们可以按照它的接口和规范进行开发,实现自己的需求。

开发者学堂课程【Flume 基础应用实战-企业全场景解决方案Apache Flume-自定义 source(扩展)】学习笔记,与课程紧密联系,让用户快速学习知识。  

课程地址:https://developer.aliyun.com/learning/course/715/detail/12778


Apache Flume-自定义 source(扩展)


目录

一、 自定义 source 的功能完成

二、 自定义 source 的需求描述

三、 自定义 source 的代码逻辑梳理


一、自定义 source 的功能完成

source 在服中充当数据的角色,对接各种各样的数据,收集数据,把收集到数据变成 event 对象放入到 channel 中。虽然官方带的 source 组件很多,但是在实际中,万一我们的场景需要的一个组件它没有,我们就可以用官方提供source 开发的模板,进行相关的开发。

三、 在制定当中,有一个类型叫做 Pollable Source它底层是一个通过线程去不断地调用 process 方法,相当于主动地拉取数据在这个接口当中,就可以自定义实现相关的逻辑。如何读数读什么数据跟的具体方法实现相关此外,它要继承一个叫做 configurable 的一个接口,用于一些配置的初始化或者一些属性参数的配置。


二、自定义 source 的需求描述

实时地监控 MySQL 中的一张表,如果这张表当中有数据发生改变,就把这个数据采集传输到 hdfs 或者其他的存储框架当中。

简单来讲,构建一张表比如昨天这个表的数据可能到了100,把它收集到100今天如果数据发生改变,又插入两条数据变成102,把这变化的这条数据也收集进去


三、自定义 source 的逻辑梳理

我们要去开发一个专门的类去继承 abstract source 并且实现这两个接口当中最重要的就是两个方法,一个叫做 configure有一个上下环境变量,用于环境的一些相关初始化一些参数的解析。另外一个方法,叫做 process是周期性主动调的方法,在这个方法中,我们做的就是去读取 mySQL 当中的数据,然后数据封装成 event 放入到 channel 当中,这个方法可以想象它会被循环的调用,在这里为了方便,从 mySQL 中读数据,把数据装,还要更新读取到哪里,一个偏移量。它的逻辑比较复杂地写在一个类当中,也不太靠谱,所以在这里专门定一个类似于工具的类,叫做 Query MySQL,专门用来跟 MySQL 打交道,里面可以定义各种各样的方法,然后就可以通过外部定义的 source 去调用这个工具类来进行相关的交互


四、具体实现代码逻辑

首先创建一下 mysql 数据库和 mysql 数据库表,讲义上有相关的语句,建议大家从参考资料中去获取。在资料的软件下面选择 flume,里面有自定义组件案例自定义 source这里面有一个 sql 脚本,打开数据库进行初始化,连接到服务器上选择运行 sql 文件,运行完成之做一个刷新又多了一个数据库

在这个数据库中,首先定义的第一个叫做 student 的表,表当中没有其他内容,只有两个字段,一个叫 id,一个叫 name,里面有四条数据

图片1.png

第二个表叫 Flume_meta,flume 的一个原数据,也就是说后面进行查询进行数据收集,收集到哪里,表格里面要进行实时的更新,比如现在没有数据说明这个表的数据还没有读取,如果有读取,就会有记录。这两个字段,第一个叫做source_tab 哪张表,第二字段叫做 currentindex 当前的缩影,当前的序列通过这两个表,可以知道数据有没有变化比如查询一下当前数据库的最大值,发现是4,而现在也读到是4说明就没有变化

接下来看一下具体的代码逻辑,打开开发工具首先看一下第一个类叫做mysqisource是我们自定义的一个 source 类,按照它的规范需要去继承abstractsource,并且实现两个接口在这两个接口当中,方法比较多,重要的还是实现两个方法,一个叫做 configure,一个叫做 process

图片2.png

接下来梳理一下里面逻辑,首先可以发现在进行初始化方法的时候,干了一件事去 new 了一个类叫做 query mysql,把上下文环境变量构造了一个实例,可以想象一下这个对象当中应该会封装着大量的逻辑就相当于 sqlSourceHelper 这个用来跟数据库进行各种交互的一个工具类

接下来看一下整个 source 执行的叫做 process 方法,会被不断地调用,它只是去查询数据库表,如果有数据,把数据放event 注意 event 的对象是有头有 body 的一个结构,所以获取的数据应该把它 set body 当中变成字节,还可以根据自己的需求添加 header,最后把 event 返回写入到 channel 当中就可以。

图片3.png

从这里看上面这一块逻辑很简单,就是查询数据,把数据封装成一个 event 对象写入了 channel,这都是固定 apr最重要的在后面,当读取完一批数据之后要注意把数据表中的偏移量做一个更新,为什么要更新?比如今天从零开始读这个表,然后读了四条记录那么明天开始这四条记录可能不能再去读取了,不然数据就重复,所以要把今天读取的这四条记录更新到这个表当中,记录个表叫做student 当前记录为四,明天一看就从第五去读这里所有的东西都是以方法封装因此接下来回到封装的工具当中来看一下到底做了那些事

图片4.png

首先在这个类当中,我们开启了日志上面定义的一些属性,用于相关的参数或者当前需求参数大概包括去查询的间隔时间开始 I D结束 I D每次查询返回的条数下面就是要操作哪个表查询哪些,有没有用户传入了查询语句,如果用户没有要不要自己构建查询语句字符编码等,上下文是用来解析相关配置参数的这些参数按照标准开发应该把选择权交给用户根据需求去设定相关的开I D哪个表,哪个时间,但是要考虑一点如果用户没有去设置,就使用默认值,简单来讲就是如果说用户没有修改,我们就没有传入参数,就使用这些默认值,当然默认值只是最终用来兜底的

接下来看下面的具体逻辑。

首先第一块就是加载静态资源,在工程的静态中有一个 jdbc.properties主要是加载跟数据库相关的四大属性——驱动地址用户名和密码获得资源之后第二步就是一个方法叫做初始化和数据库的连接,这里采用的是非常传统的 jdbc 方式,用DriverManager 去获得一个,刚才这里已经注册了一个驱动有地址有用户名用密码就可以获得一个就会通过这个方法返回整个代码中的逻辑就是通过去获得过数据库的各种交互增删改查操作

图片5.png

接下来看一下第一个叫做 query mysql 构造方法,构造这个类构造方是读取flume 配置当中的参数

这里通过读这些参数打开讲义看一下当我们开发好之后,用户要想使用这个source就要去配置一个 Flume 的采集方案,在这个采集方案当中,source 类型,就是自己这个类名的一个全路径,然后地址接,这里定义了好多类名属性,包括的地址,用户名,密码,操作哪个表,哪些字段被选择要不要去对哪一个自增长的字段是谁查询时间,些属性既然开放了给用户去填写应该就要来获取到,所以说在构造当中我们干的一件事就是获取配置文件当中的参数,可以看一下这个叫做根据 C 点 to select大家发现这里就是 columns.to. select,所以说这里就根据它获得它对应值。

但如果用户这些没有配置在后面要把默认值获取到,算是做一个保底的配置如果用户没有在 Flume 配置当中进行相关的属性指定就使用默认值来读取

图片6.png

有了这些之后,接下检测配置信息,这个相当于做了一个保证,如果默认值也忘了写用户也没有赋值,这时候进行各种参验,比如表空的地址也是空的的用户密码都为空的,这时直接抛出一个异常说配置有问题,抛出对应的错误信息这相当于给用户一个友好化的提示

图片7.png

在校验好配置文件之后,做的第一件事就是获取数据库当前的 ID因为只有根据当前 ID 才能开始后面相关的操作,去构一个 build query怎么去getStatusDBIndex这个方法也在当前这个类当中,点进去看一下。刚才说过当前读到哪里操作哪里针对这个表的相关信息都放在这一个叫做 Flume meta表当中,所以查取当前 ID 值就是从 meta 表进行查询

图片8.png

所以构建了一个查询的语句。Select current index 就是当前的缩影,然后from flume data 等于只查询的那个表只有表明是不固定的,其他都是写这个值查询出来之后如果不为空,就返回查询的值,如果为空就查回默认的值,可以发现当前这个里面就是为空的,它可以查回默认值。点进去看一下默认值默认值在最上面,它由用户传

值通过调用 startfrom 传输进来的

这里调用了 startfrom。它通过获取用户的传入的参数或者默认值点进去发现默认值是零

也就是说如果当前从数据库中查询出来为空,默认值就是零就从第一条开始进行查询

接下来就再回到这里已经获取到当前 ID,接下来就去构建查询 sql 语句当前查询到哪里,接着就基于此开始构建查询语句,下面调的方法叫做 build query。

图片9.png

build query 当中起来很复杂,其实没有什么,已经获取到当前的 ID,然后就是拼接 sql这个 sql 也做了一个判断,定义了一个 custom query,如果用户没有传入sql我们帮他拼接查询哪些字段、来自那个表字段表都是属性如果用户经传过来一个 sqlsql 就是用户指定的,所以说这一段逻辑判断就是用户如果不写 sql,为空,我们帮用户如果写了,就用户的

图片10.png

接下来就是把接口做一个拼接,用一个 stringbuilder 来做拼接,加上 where 条件where id 大于刚才查询的当前 id,比如这里记录的当前 id 为四,肯定要大于四开始,从第五条开始查询。但是现在没有,所以默认是零,大于零开始从一开始,所这一块比较灵活这样就开始构建询语句这是一块逻辑

图片11.png

在进行相关获取的时候,第一行就是 sqlSourceHelper 去 executQuery,在初始中,主要查询一下当前 id 偏移量构建查询 sql,一旦这个方法 process 调用,就开始查询数据库的表

图片12.png

这里面就是非常原始的 jdbc,刚才已经获得连接,获得一个 preparestatement 站位,然后进行 executeQuery 执行 sql 语句,把 Sql 语句做一个结果遍历放入 list 集合当中,然后把它返回这一段逻辑就是自行查询的结果。

图片13.png

有了结果之后把结果做一个解析,遍历一下结果,把结果放在 channel 当中写出去。但是要注意最终应该更新一下偏移量的值,因为如果当前是空那默认值就是零从零开始进行读取读1234,把这四条读完之后要把当前的四索引更新到这里,就是说当前读了几条记录,要把最高的一个放在这里面。因为下一次需要从大于4的开始,从5开始读

所以说最后要调一个方法叫 updateOffset2DB 更新 result size,把新读取的加进去。

图片14.png

最后这一块又是一个 insert into flum_data往 meta 表当中进行数据的更新更新的值就是刚才查询结果 size 的值

简单概括:source 启动之后,在初始化的过程当中,去解析用户配置采集方案中的参数,然后构建查询语句很多参数如果用户不写不指定,包括 sql 语句,需要我们帮构建构建之后就等待着 process 方法的执行。当这个方法被调的时候,执行 sql 的查询,把结果封装成 event 放到 channel 当中,最终再更新一下表中的偏移量

当然这一个方法会进行重复调用,当第一次执行完之后,下面有一个可以控制的等待时长(有默认值)

图片15.png

这是针对这一个需求所做的描述,虽然代码很多,但大量地都是业务代码,主要就是传统的 jdbc 数据的一个查询操作,要弄清楚在哪一个方法当中去编写相关的业务逻辑。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
63 3
|
2月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
32 1
|
7月前
|
Shell
Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
【2月更文挑战第17天】Flume【问题记录 01】【at org.apache.flume.node.Application.main(Application.java:xxx) 类问题整理+其他类型问题总结】【避坑指南】
325 2
|
2月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
48 1
|
4月前
|
数据采集 存储 Apache
Flume核心组件大揭秘:Agent、Source、Channel、Sink,一文掌握数据采集精髓!
【8月更文挑战第24天】Flume是Apache旗下的一款顶级服务工具,专为大规模日志数据的收集、聚合与传输而设计。其架构基于几个核心组件:Agent、Source、Channel及Sink。Agent作为基础执行单元,整合Source(数据采集)、Channel(数据暂存)与Sink(数据传输)。本文通过实例深入剖析各组件功能与配置,包括Avro、Exec及Spooling Directory等多种Source类型,Memory与File Channel方案以及HDFS、Avro和Logger等Sink选项,旨在提供全面的Flume应用指南。
172 1
|
7月前
|
Java Linux
Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
【2月更文挑战第16天】Flume【环境搭建 01】CentOS Linux release 7.5 安装配置 apache-flume-1.9.0 并验证
130 0
|
SQL Java 应用服务中间件
Apache Doris 自定义C++ UDF之流程详解(1)
Apache Doris 自定义C++ UDF之流程详解(1)
262 0
|
7月前
|
监控 Apache
【Flume】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
【4月更文挑战第4天】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
|
7月前
|
消息中间件 分布式计算 Kafka
硬核!Apache Hudi中自定义序列化和数据写入逻辑
硬核!Apache Hudi中自定义序列化和数据写入逻辑
153 1
|
7月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
349 0

推荐镜像

更多