开发者学堂课程【Flume 基础应用实战-企业全场景解决方案:Apache Flume-自定义 source(扩展)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/715/detail/12778
Apache Flume-自定义 source(扩展)
目录
一、 自定义 source 的功能完成
二、 自定义 source 的需求描述
三、 自定义 source 的代码逻辑梳理
一、自定义 source 的功能完成
source 在服务中充当数据源的角色,对接各种各样的数据源,收集数据,把收集到的数据变成 event 对象放入到 channel 中。虽然官方带的 source 组件很多,但是在实际中,万一我们的场景需要的一个组件它没有,我们就可以用官方提供的 source 开发的模板,进行相关的开发。
三、 在制定当中,有一个类型叫做 Pollable Source,它底层是一个通过线程去不断地调用 process 方法,相当于主动地拉取数据,在这个接口当中,就可以自定义实现相关的逻辑。如何读数、读什么数据跟的具体方法实现相关,此外,它要继承一个叫做 configurable 的一个接口,用于一些配置的初始化或者一些属性参数的配置。
二、自定义 source 的需求描述
实时地监控 MySQL 中的一张表,如果这张表当中有数据发生改变,就把这个数据采集传输到 hdfs 或者其他的存储框架当中。
简单来讲,构建一张表,比如昨天这个表的数据可能到了100,就把它收集到100,今天如果数据发生改变,又插入两条数据变成102,就把这变化的这条数据也收集进去。
三、自定义 source 的逻辑梳理
我们要去开发一个专门的类去继承 abstract source ,并且实现这两个接口,当中最重要的就是两个方法,一个叫做 configure,有一个上下文环境变量,用于环境的一些相关初始化一些参数的解析。另外一个方法,叫做 process,是周期性主动调的方法,在这个方法中,我们做的就是去读取 mySQL 当中的数据,然后把数据封装成 event 放入到 channel 当中,这个方法可以想象它会被循环的调用,在这里为了方便,从 mySQL 中读数据,把数据封装,还要更新读取到哪里,一个偏移量。它的逻辑比较复杂地写在一个类当中,也不太靠谱,所以在这里专门定一个类似于工具的类,叫做 Query MySQL,专门用来跟 MySQL 打交道,里面可以定义各种各样的方法,然后就可以通过外部定义的 source 去调用这个工具类来进行相关的交互。
四、具体实现代码逻辑
首先创建一下 mysql 数据库和 mysql 数据库表,讲义上有相关的语句,建议大家从参考资料中去获取。在资料的软件下面选择 flume,里面有自定义组件案例自定义 source,这里面有一个 sql 脚本,打开数据库进行初始化,连接到服务器上,选择运行 sql 文件,运行完成之后做一个刷新,又多了一个数据库。
在这个数据库中,首先定义的第一个是叫做 student 的表,表当中没有其他内容,只有两个字段,一个叫 id,一个叫 name,里面有四条数据。
第二个表叫 Flume_meta,flume 的一个原数据,也就是说后面进行查询,进行数据收集,收集到哪里,表格里面要进行实时的更新,比如现在没有数据说明这个表的数据还没有被读取,如果有读取,就会有记录。这两个字段,第一个叫做source_tab 哪张表,第二字段叫做 currentindex 当前的缩影,当前的序列。通过这两个表,可以知道数据有没有变化。比如查询一下当前数据库的最大值,发现是4,而现在也读到是4 ,说明就没有变化。
接下来看一下具体的代码逻辑,打开开发工具,首先看一下第一个类叫做mysqisource,这是我们自定义的一个 source 类,按照它的规范需要去继承abstractsource,并且实现两个接口,在这两个接口当中,方法比较多,重要的还是实现两个方法,一个叫做 configure,一个叫做 process。
接下来梳理一下里面的逻辑,首先可以发现在进行初始化方法的时候,干了一件事,去 new 了一个类叫做 query mysql,把上下文环境变量构造了一个实例,可以想象一下这个对象当中应该会封装着大量的逻辑,就相当于 sqlSourceHelper 这个类用来跟数据库进行各种交互的一个工具类。
接下来看一下整个 source 执行的叫做 process 的方法,它会被不断地调用,它只是去查询数据库表,如果有数据,把数据放到 event 中,注意 event 的对象是有头有 body 的一个结构,所以获取的数据应该把它 set 到 body 当中变成字节,还可以根据自己的需求添加 header,最后把 event 返回值写入到 channel 当中就可以。
从这里看,上面这一块逻辑很简单,就是查询数据,把数据封装成一个 event 对象写入了 channel,这都是固定 apr。最重要的在后面,当读取完一批数据之后,要注意把数据表中的偏移量做一个更新,为什么要更新?比如今天从零开始读这个表,然后读了四条记录,那么明天开始这四条记录可能不能再去读取了,不然数据就会重复,所以要把今天读取的这四条记录更新到这个表当中,记录这个表叫做student 当前记录为四,明天一看就从第五去读。这里所有的东西都是以方法封装。因此接下来回到封装的工具当中来看一下到底做了那些事。
首先在这个类当中,我们开启了日志、上面定义的一些属性,用于相关的参数或者当前需求参数。大概包括去查询的间隔时间、开始 I D、结束 I D、每次查询返回的条数,下面就是要操作哪个表,查询哪些列,有没有用户传入了查询语句,如果用户没有,要不要自己构建查询语句,字符编码集等,上下文是用来解析相关配置参数的,这些参数按照标准开发,应该把选择权交给用户,根据需求去设定相关的开始 I D,哪个表,哪个时间,但是要考虑一点,如果用户没有去设置,就使用默认值,简单来讲就是如果说用户没有修改,我们就没有传入参数,就使用这些默认值,当然默认值只是最终用来兜底的。
接下来看下面的具体逻辑。
首先第一块就是加载静态资源,在工程的静态中有一个 jdbc.properties,主要是加载跟数据库相关的四大属性——驱动地址、用户名和密码,获得资源之后,第二步就是一个方法叫做初始化和数据库的连接,这里采用的是非常传统的 jdbc 方式,用DriverManager 去获得一个连接,刚才这里已经注册了一个驱动,有地址有用户名,用密码就可以获得一个连接,连接就会通过这个方法返回,整个代码中的逻辑就是通过连接去获得过数据库的各种交互,增删改查操作。
接下来看一下第一个叫做 query mysql 的构造方法,构造这个类。构造方法是读取flume 配置当中的参数。
这里通过读这些参数,打开讲义看一下,当我们开发好之后,用户要想使用这个source,就要去配置一个 Flume 的采集方案,在这个采集方案当中,source 类型,就是自己这个类名的一个全路径,然后地址连接,这里定义了好多类名属性,包括连接的地址,用户名,密码,操作哪个表,哪些字段被选择,要不要去对哪一个自增长的字段是谁、查询时间,这些属性既然开放了给用户去填写,应该就要来获取到,所以说在构造当中我们干的一件事就是获取配置文件当中的参数,可以看一下这个叫做根据 C 点 to select,大家发现这里就是 columns.to. select,所以说这里就根据它获得它对应的值。
但如果用户这些没有配置,在后面要把默认值获取到,算是做一个保底的配置。如果用户没有在 Flume 配置当中进行相关的属性指定,就使用默认值来读取。
有了这些之后,接下来检测配置信息,这个相当于做了一个保证,如果默认值也忘了写,用户也没有赋值,这时候进行各种参验,比如表是空的、连接地址也是空的,连接的用户密码都为空的,这时直接抛出一个异常,说配置有问题,抛出对应的错误信息,这相当于给用户一个友好化的提示。
在校验好配置文件之后,要做的第一件事就是获取数据库当前的 ID,因为只有根据当前 ID 才能开始后面相关的操作,去构建一个 build query,怎么去getStatusDBIndex,这个方法也在当前这个类当中,点进去看一下。刚才说过,当前读到哪里,操作哪里,针对这个表的相关信息,都放在这一个叫做 Flume meta表当中,所以查取当前 ID 值就是从 meta 表进行查询。
所以构建了一个查询的语句。Select current index 就是当前的缩影,然后,from flume data 等于只查询的那个表,只有表明是不固定的,其他都是写死的。这个值查询出来之后,如果不为空,就返回查询的值,如果为空,就查回默认的值,可以发现当前这个里面就是为空的,它可以查回默认值。点进去看一下默认值,默认值在最上面,它由用户传进来。
值通过调用 startfrom 传输进来的。
这里调用了 startfrom。它通过获取用户的传入的参数或者默认值,点进去发现默认值是零。
也就是说如果当前从数据库中查询出来为空,默认值就是零。就从第一条开始进行查询。
接下来就是再回到这里,已经获取到当前 ID,接下来就去构建查询 sql 语句。当前查询到哪里,接着就基于此开始构建查询语句,下面调的方法叫做 build query。
build query 当中看起来很复杂,其实没有什么,已经获取到当前的 ID,然后就是拼接 sql,这个 sql 也做了一个判断,定义了一个 custom query,如果用户没有传入sql,我们就帮他拼接,查询哪些字段、来自那个表和字段表都是属性,如果用户已经传过来一个 sql,sql 就是用户指定的,所以说这一段逻辑判断就是用户如果不写 sql,为空,我们帮他写,用户如果写了,就用用户的。
接下来就是把接口做一个拼接,用一个 stringbuilder 来做拼接,加上 where 条件,where id 大于刚才查询的当前 id,比如这里记录的当前 id 为四,肯定要从大于四开始,从第五条开始查询。但是现在没有,所以默认是零,从大于零开始也就是从一开始,所以这一块比较灵活。这样就开始构建查询语句,这是一块逻辑。
在进行相关获取的时候,第一行就是 sqlSourceHelper 去 executQuery,在初始化中,主要查询一下当前 id 偏移量,构建查询 sql,一旦这个方法 process 被调用,就开始查询数据库的表。
这里面就是非常原始的 jdbc,刚才已经获得连接,获得一个 preparestatement 的站位,然后进行 executeQuery 执行 sql 语句,把 Sql 语句做一个结果遍历放入 list 集合当中,然后把它返回,这一段逻辑就是自行查询的结果。
有了结果之后,把结果做一个解析,遍历一下结果,把结果放在 channel 当中写出去。但是要注意最终应该更新一下偏移量的值,因为如果当前是空,那默认值就是零,从零开始进行读取读1、2、3、4,把这四条读完之后,要把当前的四的索引更新到这里,就是说当前读了几条记录,要把最高的一个放在这里面。因为下一次需要从大于4的开始,从5开始读。
所以说最后要调一个方法叫 updateOffset2DB 更新 result size,把新读取的加进去。
最后这一块又是一个 insert into flum_data,往 meta 表当中进行数据的更新,更新的值就是刚才查询结果 size 的值。
简单概括:当 source 启动之后,在初始化的过程当中,会去解析用户配置采集方案中的参数,然后构建查询语句,很多参数如果用户不写不指定,包括 sql 语句,需要我们帮助构建,构建之后就等待着 process 方法的执行。当这个方法被调用的时候,执行 sql 的查询,把结果封装成 event 放到 channel 当中,最终再更新一下表中的偏移量。
当然这一个方法会进行重复调用,当第一次执行完之后,下面有一个可以控制的等待时长(有默认值)。
这是针对这一个需求所做的描述,虽然代码很多,但大量地都是业务代码,主要就是传统的 jdbc 数据的一个查询操作,要弄清楚在哪一个方法当中去编写相关的业务逻辑。