开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):离线同步 mysql 数据到 HDFS1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12544
离线同步 mysql 数据到 HDFS1
内容介绍
一、课堂目标
二、案例一 离线同步 MySQL 数据到的 HDFS
三、操作演示
一、课堂目标
本节课主要学习 NiFi 的一些典型案例。本课程主要有4个目标,第一点,要实现离线同步 MySQL 的数据到 HDFS 当中。第二点,要实现把 Json 的内容转化为 Hive 所支持的文本格式,以实现 Hive 外部表的数据同步。第三点,要实现一个实时同步 MySQL 数据到 Hive 数据库,这样的一个功能。第四点,是掌握 Kafka 在 NiFi 中的使用。 这几个案例发现它都是和大数据相关的一些案例。在第四点,虽然它只是针对 Kafka,Kafka 在大数据中系统当中占用重要的一个地位。Kafka 的吞吐量以及它的各种流量学风以及结构合这样的一些东西,会在数仓以及大数据的实时计算当中占有很重要的一个地位。
二、案例一 离线同步 MySQL 数据到的 HDFS
1. 准备阶段
首先进入第一个案例,离线同步 MySQL数据到的 HDFS 当中,这个案例会从以下三个方面来进行分析。首先会整个处理器流程是什么样的,然后会针对需要用到的这些处理器,进行一个详细的说明。最后是实际的去操作,来实现这个 MySQL 同步到 HDFS 这样的一个功能。 在大数据仓库的这个系统当中,经常会进行数据同步的一些操作,可以使用 NiFi 灵活的全流程这个操作,来实现具体操作。
在做这个功能之前,需要先做一些准备工作。第一点,需要有一个 MySQL 服务,并且启动成功,这个 MySQL 服务要使用5.7版本,然后需要在 MySQL 当中运行,给大家提供一个 SQL 语句。这个 SQL 语句运行完以后,提供案例中所需要的 MySQL 数据库表,已经创建成功,可以直接进行同步。
第二点,是需要启动的一个 Hadoop 集群,因为要把这个数据同步到 HDFS 之中,所以说必须要有个 Hadoop 集群,而且还要求这个 Hadoop 集群与的 NIFI 集群是可以互相访问的,也是他们俩是在一个同样的相同的移动网段。这两个准备工作做完以后,可以开始进行的案例。
2.处理器流程
首先来看这个案例的处理器流程。MySQL 同步到 HDFS 的流程,首先要使用的 QueryDatabaseTable 这个处理器,然后通过框架分析处理器读取这个数据以后,通过 ConvertAwhereoToJSON 来把读取出来的数据转换为从 awhereo而转化为 json 格式。再通过 Splitjson 阶层处理器,把数据随机的进行一个切割,获取到每一个单组的数据,也是每一行的数据。最后针对每一行的数据,通过的 putHDFS 把它写入到的 HDFS 当中。
(1)QueryDatabaseTable 属性说明
名称 |
默认值 |
描述 |
Database connection Pooling service | 用于获得与数据库的连接的ControllerService<br>DBCPConnectionPoolLookup<br />DBCPConnectionPoo<br/>HiveConnectionPool | |
Database Type | 泛型 |
数据库的类型/风格,用于生成特定于数据库的代码。在许多情况下,通用类型就足够 了,但是某些数据库(例如 Oracle)要自定义SQL 子句。<br>Genericbr/>orace<br />oracle 12+<br />Ms sQL 2012+<br />Ms sQL 2008<br/>MysQL |
Table Name | 要查询的数据库表的名称。使用自定义查询时,此属性用于别名查询,并在 FlowFile 上显示为属性。支持表达式语言:true(仅使 用变量注册表进行评估) | |
columns to Return | 查询中要使用的列名的逗号分隔列表。如果 您的数据库需要对名称进行特殊处理(例如,引号),则每个名称都应包括这种处理。如果未提供任何列名,则将返回指定表中的所有列。注意:对于给定的表使用一致的列名很重要,这样增量提取能正常工作。支持表达式语言: true(仅使用变量注册表进行评估) | |
Additional WHERE clause | 构建 SQL 查询时要在 WHERE 条件中添加的自定义子句。支持表达式语言: true(仅使用变量注册表进行评估) | |
Maximum- value Columns | 列名的逗号分隔列表。自处理器开始运行以来,处理器踪返回的每一列的最大值。使用多列意味着列列表的顺序,并且期望每列的值比前一列的值增长得更慢。因此,使用多个列意味着列的层次结构,通常用于分区表。该处理器只能用于检索自上次检索以来已添加/更新的那些行。请注意,某些 JDBC 类型(例如位布尔值)不利于保持最大值,因此这些类型的列不应在此属性中列出,并且会在处理期间导致错误。如果未提供任何列,则将考虑表中的所有行,这可能会对性能产生影响。注意:对于给定的表使用一致的最大值列名称很重要,这样增量提取才能正常工作。支持表达式语言:true(仅使用变量注册表进行评估) | |
Max Wait Time | 0秒 |
正在运行的 SQL 选择查询所允许的最长时间,零表示没 有制。少于1秒的最长时间将等于零。支持表达式语言:true(仅使用变量注册表进行评估) |
Fetch size | 0 |
一次要从结果集中获取的结果行数。这是对数据库驱动程 的提示,可能不被尊重和/或精确。如果指定的值为零,则忽略提示。支持表达式语言:true(仅使用变量注册表进行评估) |
Max Rows Per Flow File | 0 |
一个 FlowFile 中将包含的最大结果行数。这将使您可以 将非常大的结果集分解为多个 FlowFiles。如果指定的值为零,那么所有行都将在单个 FlowFile 中返回。支持 O.式语言: true(仅使用变量注册表进行评估) |
Fetch Size | 一次要从结果集中获取的结果行数。这是对数据库驱动程 序的提示,可能不被尊重和/或精确。如果指定的值为零,则忽略提示。支持表达式语言: true(仅使用变量注册表进行评估) | |
Max Rows Per Flow File | 一个 FlowFile 中将包含的最大结果行数。这将使您可以 将非常大的结果集分解为多个 FlowFiles。如果指定的值为零,那么所有行都将在单个 FlowFile 中返回。支持表达式语言: true(仅使用变量注册表进行评估) | |
Transaction lsolation Level | 如里一个值的小数位数超过中写入那些未定义的数字时,将使用“默认小数位数"。如果一个值的小数位数超过.中是否对DECIMAL/NUMBER,DATET和TIMESTAMP 列使用 Awhereo 逻辑类型。如果禁用,则写为字符串。如果启用,则使用逻辑类型并将其写为其基础类型,DECIMAL/NUMBER 为逻辑"十进制:以具有附加精度和小数位元数据的字节形式写入,DATE 为逻辑” date-millis":以int表示天自Unix时代(1970-01-01)起,TIME 为逻辑'time-millis':写为 int,表示自Unix 纪元以来的毫秒数;TIMESTAMP 为辑'timestamp-millis':写为长时,表示自 Unix 纪元以来的毫秒数。如果书面Awhereo 记录的阅读者也知道这些逻辑类型,则可以根据阅读器的实现在更多上下文中反序列化这些值。<br>真正<br /> |
接下来分析这4个处理器具体的一些属性。
①QueryDatabaseTable 处理器,是为查询的数据库表看它的描述信息。QueryDatabaseTable 是用来通过生成一个SQL 语句来进行查询,或者他使用前置的 QueryData 传输 SQL 语句。通过这样的两种方式来执行查询。QueryDatabaseTable 它输出的数据是 Awhereo 格式的,而且它的属性都支持的 NiFi 表达式语言。 QueryDatabaseTable 它所包含一些属性。第一个属性,database collection pooling service 这个属性主要是用来配置的数据库连接池,然后这个组件可以使用这个数据库连接池来和的数据库进行通信。第二个属性,Database Type 这个属性主要是为配置究竟使用的是什么数据库类型。Oracle 还是 mysql,或者是 MSsql,这里面可以使用mysql。第三个属性,Table Name,这个属性主要是指定,去查询数据的表明。这个如果说使用自定义查询的时候,这个相当于是一个别名,在后面会给大家来讲解。
②columns to Return,这个是要查询的一些字段,还有一个是 where clause 指 SQL 语句的条件,写在这里。然后还有一个叫 custom Query,这个它是 SQL 语句。
这既然是一个完整的 SQL 语句,那还要写这个查询的列以及 where 条件,原因是他们是2选1,你使用上面的这些数据,这些项目那你不需要写 custom Query。如果使用 custom Query,上面这两个不需要进行设置。但是 Table Name 它是必须要设置的。如果 Table Name 和这两个配合使用,它是表明如果 Table Name 和 custom Query 配合使用,相当于是一个别名,他会把这个 SQL 语句封装成一个子查询,然后把它的结果作为一个临时表,这个临时表表明,即在这里配置的表明。
③Maximum-value Columns 属性主要是配置查询出来数据,一次查询数据,返回的最大的这个行数。如果说超过所涉及的这个数值以后,他会 把那多余的数据给剔除掉。还有一些其他的信息,是最大的等待时间,以及这个输出的大小等这些属性。这些属性一般都不需要更改直接使用,它的默认值可以。
use Awhereo logical。这个属性它是需要刚才所说的这个 custom Query 来配合使用,当使用 custom Query 的时候,需要修改这个对象,默认的话,它这里面默认值为 false,如果要使用 custom Query 的时候,这个值需要改成ture,它可以避免这个 custom Query 因为一些这个字符的问题而产生一些错误。
(2)conver Awhereo Jason 属性配置
conver Awhereo Jason 将发布 Awhereo 格式的数据转化为 Jason 格式,接下来来看描述信息,这个处理器是将awhereo 转化为 json,然后生成的基层数据和原始的 awhereo 文档它具有相同的这个阶层结构,然后输出的 jason编码格式是 UTF-8编码格式。然后如果说 Flow Flie 中的这个 Awhereo 它包含有多个记录,生成的 Json 是一个 Json Array,如果说在的这个 Array 记录当中,它是一个单个的对接的话,一个单个数据,可以通过队列项转化为Json Arrary,如果说传出传入这个 Flow Flie,他没有任何数据,输出的是一个空的 Json 对象,单独的一个 Awhereo 记录,Flow Flie 根据配置项,把它包装在的 Json Awhereo 当中去。
看它具体属性,第一个属性是 Json 容器选项,默认值是数组,也说默认输出的 Json 是一个 Awhereo,是一个 Json数组,不是一个 Json Object。当然可以把它改变,改成是不是数组,而是 Object 这种类型。如果说第二个选项,来看第二个选择,第二个选项是包装单条记录默认是 false,也是说如果说当它是 false 的时候,输出的是单条记录,它是一个 JSON Object 的类型。如果把它改成出的话,进入来的这个数据它是单调的,或者是空的,他会把这个数据的单条记录,或者说空记录也作为一个数组的形式往后传递。通常建议大家在这里把它改为出处理,因为这会对后续的处理比较方便。全部统一都按照这种接着 Json Awhereo 格式输出,否则的话还需要进行一个复杂的判断
名称 |
默认值 |
允许值 |
描述 |
JSOn 容器选项 | 数组 | 没有数组 | 确定如何显示记录流:作单个 Object 序列(无)(即,将每个 object 写入新行),或者作为Objects 数组(array). |
包装单条记录 | 假 |
真正假 |
确定是否将空记录或单个记录的结果输出包装在” JSON 容器选项"指定的容器数组中 |
Awhereo 模式 |
如果 Awhereo 记录不包含架构(仅基准),则必须在此处指定。 |
(3)Splitjson 处理器
Splitjson 处理器,它主要会使用一个叫做 Json Path 这样的一个表达式来指定,去根据哪个 Json 元素来进行切割。它的功能是相当于把的监测数组分割成每一个单独的文件,每一个文件当中的是一个 Json object。然后如果说切割成功的话,会把切割以后的 Json object,输出到一个叫 split 的这样的关系中。如果说是失败的话,没有匹配成功的话,他会输出到 faliure 也是失败这个关系当中,如果成功的时候,除会把这个成功的信息穿上 split 之外的,原始的文件也会传到一个叫做这个 original 这样的一个关系当中,一般来说用不到这个关系,除非是真的需要原始数据。
①属性
第一个属性,Jasonpath expression。这个主要是是填写一个Json表达式来指定这个 SplitJson 处理器,它到底是从Json 里边的哪个位置,那一个元素开始进行往下分割。第二个属性,null value Representation。是说当的输出数据为空的时候,输出的什么样的数据?第一个选项,空数据。第二个选项是一个字符串叫做 null 的这样一个字母串。
属性名称 |
默认值 |
可选值 |
描述 |
JsonPath Expression | 一个] sonPath 表达式,它指定用以分割的数组元素。 | ||
Null Value Representation | 1 |
emptystring<br> thestring 'null' | 指定结果为空值时的表示形式。 |
(4)PutHDFS 处理器
再往下是的 putHDFS 处理器。putHDFS 是将 Hadoop 数据写入到 HDFS 分布式文件存储系统当中,来看它核心的属性,首先第一个,是 Hadoop Configuration Resources 也是 Hadoop 的配置文件资源,那这个地方需要把 Hadoop 的 core-site .xml 以及 hdf-site .xml,把这两个文件指定给这个配置项,让它能够通过这里面所配置的这个目录读取到这两个配置文件,读取到这两个配置文件以后,它可以才可以进行这个后续的一系列操作,知道要往哪去写。然后再往下是还有一个叫做 Directory 的配置项,这个属性它主要是配置 HDFS 做目录,也是说这个写入文件的时候,他应该写到 HDFS 和哪个目录像,是在这儿进行配置,还有一个比较重要的是这个 Conflict resolution。Stategy。这个是主要是指当的这个 Hdfs 这个目录当中已经有重名的文件的时候,可以通过在这设置这个属性来更改,想要做的一些决策。了解需要用到的处理器以后,接下来一起来过,所要进行的一些操作。
名称 | 默认值 br> | 允许值 |
描述 |
Hadoop Configuration Resources | 由文件或逗号分隔的文件列表,其中包含 Hadoop 文件系统配置。否则,Hadoop 将在类路径中搜索” core-site.xml"和 hdfs-site.xml"文件,或者将恢复为默认配置。要使用 swebhdfs,请参阅 PutHDFS 文档的“其他详细信息""部分。支持表达式语言: true(仅使用变量注册表进行评估) | ||
Kerberos credentials Service | 指定应用于 Kerberos 身份验证的Kerberos 凭据 控制器服务 | ||
Kerberos Principal | Kerberos 主体作为身份验证。需要在您的 nifi.properties中设置nifi.kerberos.krb5.file。支持的表达语言: true(仅使用变量注册表进行评估) | ||
Kerberos Keytab | 与主体关联的 Kerberos 密钥表。需要在您的 nifi.properties中设置nifi.kerberos.krb5.file。支持的表达语言:true(仅使用变量注册表进行评估) | ||
Kerberos Relogin Period | 4小时 |
尝试重新登录 kerberos 之前应该经过的时间。 此属性已被弃用,并且对处理没有影响。现在,重新登录会自动发生。支持表达式语言: true(仅使用变量注册表进行评估) | |
Additional Classpath Resources | 以逗号分隔的文件和/或目录的路径列表,该列 表将添加到类路中,并用于加载本机库。指定目录时,该目录中所有具有的文件都将添加到类路径中,但不包括其他子目录。 |
三、操作演示
第一步,先创建一个处理器组,这个处理器组的名称可以叫做 MySQL HDFS。
创建完组之后,可以创建的第一个处理器 quarry database table。
创建完这个处理器以后,这个处理器要想进行配置,可以看到这个处理器它必须要有一个连接池,需要创建一个连接池对象,
然后在这个连接池对象当中,去配置的数据库地址以及账号密码和驱动文件等等这些信息。
处理完之后,然后再来启动连接池,启动连接池以后,可以回到的这个 quarry database table 上面,选中这个新建的这个处理器,然后将的这个调度,把它的这个时间修改,如果它默认是0的话,它会不断的执行,这对的这个服务会造成非常大的压力,因为都知道查询数据库他这个会有的时候可能会非常的慢,如果他不断的进行调用的话,可能会导致这个服务器压力崩溃。
然后还要进行配置这个 SQL 语句,是说去查询哪个表哪些字段,把这个 Configure processor 这个处理器上进行查询,
查询以后,然后再接下来是把查询出来的数据转换成 Conver Awhereo TO Json。这个时候创建这个处理器,创建完之后,这两个处理器连接到一起,然后设置为负载均衡消费的方式,来提升这个处理的效率,
然后开始把这个 json 数组通过 SpilitionJson 切割成不同的单独的这个 Json objet。这里面直接使用默认的配置,是从根结点开始进行切割。
切割以后,再去这个 Conver Awhereo TO Json 和 SpilitionJson 把它们两个进行一个连接。最后是的这个 putHDFS把它添加进来,然后要做一个自连接,因为它是最后一个处理器,不做自连接的话,他会无法启动无法使用。
然后在的配置项里面,需要可以配置这个 HDFS 它的一些配置文件,让的这个 NiFi 能够读取出来。这里面大家要注意一点,是这里面配的这个文件数是这个文件地址,是这个文件目录,它必须是在的 NiFi 服务器当中,因为他读的时候,写的这个是本地目录,所以说他读的时候他肯定是读 NiFi 自己的。如果说这个不想使用这种方式,也可以采用读取的这种 htfs 文件,这个存储,也是可以的。,然后还有一个是的这个 Directory,这个是要写入到 HDFS 当中那个目录,然后这个是最后一个这个 append 配置项,是这个策略是进行替换覆盖写入还是进行 append 添加或者是忽略。,到这这个处理器整个这个操作基本上操作完,最后是去运行,查看最终的这个结果是不是正常,一个一个的逐个启动的处理器,然后看看的消息内容是否正常。每一个都是逐个的启动来再进行查看,最后启动 Puthdfs 开发数据,然后再查看的 HDFS 当中,观察 puthdfs 是不是已经成功写入的数据文件。这个是操作的简要的流程。