开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):实时同步 Mysql 到 Hive-2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12550
实时同步 Mysql 到 Hive-2
内容介绍:
一、操作
二、Windows 开启 binlog
三、实时获取 Mysql 变更
四、根据条件路由
五、提取关键属性
六、写入 Hive
一、操作
首先开启 Mysql 的 binlog 日志,Capture change Mysql 处理器,是监听 mysql 实施的变更,实时的事件信息,要想做到实时的监听,就要必须提前开启 MySQL bin log 日志。
1.开始 Mysql 的 binlog 日志
Mysql 的版本号要求5.7
2.登录 Mysql 查看日志状态
# mysq1 -u root -p123456(登录mysql)mysql> show variables ike '%1og_ bin% ;(查看日志状态)
如果显示 off,即没有打开;如果显示 on,即已经打开。Sql 语句可以在 linux 客户端上去执行,也可以在 window 上面去执行。
3.退出 MySQL 登录
Mysql>exit
4.Linux 开始 binlog
编辑配置文件
vi /etc/my.cnf(linux 直接在 etc 目录下)行尾加上 server_id = 1
1og- bin = mysql-bin(用别的文件名也可以,一般都是使用 mysql-bin)
bin1og. format = row
server-id :表示 mysql 单个结点的 id 标识,单个节点可以随意写,多个节点不能重复,log bin 指定 binlog 日志文件的名字为 mysql-bin,以及其存储路径。
5.重启服务
两种重启方式,主要看 linux 支持哪一种方式,重启以后再去登录执行 sql 状态就会显示为 on,说明 log 日志成功开启。
systemct1 restart mysq1d.service 或者 service mysq1d restart
重新登录查询开启状态
二、Windows 开启 binlog
1.修改配置文件
找到 mysq| 配置文件 my.ini 所在目录,它的位置一般在 C:\ProgramData\MySQL\MySQL Server 5.7。
注意目录不是 C:\ Program Files \MySQL\MySQL Server 5.7.
server_id = 1
Log_bin = mysq1-bin
bin1og_ format = row
加上三行代码即可生效。
2.通过 net stop 重启 Mysql 服务,也可以在服务界面进行重启
3.执行 sql 语句,查询开启状态,如果显示为 on 代表开启成功
开启 mysqlbinlog日志以后,还需要开启 MySQL 远程访问权限。
4.开启 Mysql 远程访问权限
如果不开启 MySQL 远程访问权限,只限于本地访问的话,不同的 linux 虚拟机,去读取 MySQL,或者读取WINDOWS 的 MySQL,就会因为 MySQL 自身的权限问题,而导致失败,所以要通过这语句来开启访问权限,开启的是用户远程访问的权限,赋予了权限以后,最后用 flash frivileges 更新权限,提交生效。GRANT ALL PRIVILEGES ON ** TO 'root'@'%' IDENTIFIED BY'
123456' WITH GRANT OPTION;
FLUSH PRIVILEGES;
三、实时获取 Mysql 变更
1.创建处理器组
组名:MySQLTOHive_Timely,进行实时同步。
2.创建 CaptureChangeMysql
在创建 MySQL 时,需要先有一个缓存服务,所以先创建缓存服务。缓存服务涉及到服务端和客户端,先去创建服务端再去创建客户端,客户端连接服务端。CaptureChangeMySQL 的配置中需要DistributedMapCacheClientService. DistributedMapCacheServer 处理器,一并创建。
3.配置 DistributedMapCacheServer
DistributedMapCacheServer 为服务端,服务端的配置项都不需要修改,直接使用默认配置即可。
4.配置 DistributedMapCacheClientService
客户端第一个配置项,要把它设置为 nifi 的服务器地址
5.启动 Cache 服务和客户端
在模拟的集群模式下,因为三台服务都在同一台主机,所以会存在端口冲突的问题,但是并不影响使用。因为三台服务只要有一个节点的缓存服务启动正常就可以使用。
出现错误的原因是因为现在的集群是在单台主机下午模拟,都在同一个主机上,端口号都一样,所以会出现端口号冲突的问题。
6.配置 CaptureChangeMySQL
MysQL Hosts = 192.168.52.6:3306
MysQL Driver class Name = com.mysq1.jdbc.Driver
MysQLDriverLocation(s)=/export/down1oad/jars/mysq1-connector-java-5.1.40.jarusername = root
Password = 123456
Include Begin/commit Events = true
Include DDL Events = true
刚刚创建好的缓存服务,在此时把它进行选择,然后是 MySQL 的 url 地址以及驱动和驱动程序、提供夹包位置,都设置完成以后进行保存。
7.启动 CaptureChangeMySQL
启动以后会出现一个错误信息,有一个链接使用了,相同的 server ID,是因为监听处理器是在三个季群上面全部都有的。启动以后三个集群上面节点都会同时去进行监听,这三个监听的处理器信息是完全一样,就会造成 server ID冲突的问题。可以运用之前学过的知识来解决以上问题。
问题解决以后就可以正常运行,还有输出的 flow file 信息。
创建成功以后就可以通过 RouteOnAttribute 处理器将采集到的数据请求进行分发路由。
四、根据条件路由
1.RouteOnAttribute 多线程消费
根据自己的服务器硬件配置,以及数据的更新速率,进行评估后填写。根据自定义的属性进行分发,里面涉及到多线程处理,接收到数据以后,会通过三个线程并行进行数据的处理和分发。
2.NIFI 表达式
后面会使用 equals 语句进行判断,equals 主要是判断两个字符串是否相等,它直接比较两个 string 的值,参数就是传入的 string,返回的是布尔值,即 ture 或者是 false。${filename:equals('hello. txt ')}判断 filename 的值是否等于hello.text。
NiF i表达式官网: htp://ifiapache.org/docs/nifi-docs/html/expression-language-guide.html
之前已经了解过 NiFi 表达式语言,这里仅针对 equals 函数进行说明。
NiFi 表达式的 equals 函数
equals
说明: equals 函数使用非常广泛,它确定其主题是否等于另一个 String 值。 请注意,该 equals 函数直接比较两个String 值。注意不要将此函数与 matchs 函数混淆,后者会根据正则表达式评估其主题。
学科类型:任意
参数:
●value:用于比较 Subject 的值。必须与主题类型相同。
返回类型:布尔值
示例:我们可以使用表达式${filename:equals('hello. txt ')}
检查FlowFile的文件名是否为“hello.txt",或者可以检查属性 he11o 的值是否等于属性的值filename: ${he11o:equa1s( ${filename} )}。
3.设置自定义属性
设置自定义属性,涉及到 equals 函数,通过 cdc.event.type 类型,判断它是不等于 insert,如果等于 insert,就把数据标识为 insert 类型。如果等于 update 就标示为 update 类型。其它的类型不需要处理,不需要处理就不用进行数据标记,把其它的数据类型都划分到不处理队列中。这里 cdc.event.type 和之前监听到的数据有关,监听的数据在输出的时候就有叫做 cdc.event.type 的内容,通过 NIFI 表达式获取到 cdc.event.type 的值,来判断这条处理信息到底是什么样的类型。通过 NIFI 表达式判断出来类型信息以后,用 route 进行路由。
4.运行并查看输出
{
"type": "insert" ,
"timestamp": 1582484253000,
"bin1og_ filename": "mysq1-bin .000005",
"binlog_ position": 375,
"database": "nifi_ test",
"table_ name": "nifi_hive_streaming" ,
"tab1e_ ,id": 108,
"columns": [
{
"id": 1,
"name": "id" ,
"column_type": 4,
"value": 7
},
{
"id": 2,
"name": "name",
"column_ type": 12,
"value": "testName5"
},
{
"id": 3,
"name": "day_time",
"co1umn_type": 91,
"value": "2020-02-24"
}
]
}
数据的类型是 Jason 格式的,Type 为 insert 或者是 update,不可能是 select,Delete 等等,只会路由这两种数据,在路由的过程当中,会包括数据库的信息和表的名字以及不同数据包括 ID、name 等等。接下来需要提取数据,转化为 SQL 插入到 HIVE 之中,如何提取 Jason 字符串中的值和对应的变量,这时候就用到了一个组件即evaluejasonPath,之前都是简单了解 evaluejasonPath,没有进行深入的了解,接下来了解 jason 表达式应如何使用。
五、提取关键属性
EvaluatelsonPath 等处理器在提取数据时,可以使用 JsonPath 表达式, 来灵活的获取信息。
1.JasonPath 表达式
(1)简介
类似于 XPath 在 xml 文档中的定位,jsonPath 表达式通常是用来路径检索或设置 son 的。XPath 主要是用来检索xml。
JsonPath 中的“根成员对象“始终称为$,无论是对象还是数组。
其表达式可以接受"dot-notation"和"bracket-notation"格式,例如
$.store. book [0].title. S['store']['book'][0]['title']
$为根节点,store 为变量,它的意思是根节点有一个名为 store,的对象还有一个顾客对象,book 是数组。取 book数组里面的第一个对象,再取它 title 的值,后面的表达式就不再是.而是[]的形式,.和[]的意思是一样的。
(2)操作符
符号 |
描述 |
$ |
查询的根节点对象,用于表示一个 json 数据, 可以是数组或对象 |
@ |
过滤器断言(filter predicate)处理的当前节点对象,类似于 java 中的 this 字段 |
* |
通配符,可以表示一个名字或数字 |
.. |
可以理解为递归搜索,Deep scan. Available anywhere a name is required. |
<name> |
表示一个子节点 |
['<name>'(, '<name>')] |
表示一个或多个子节点 |
[<number> (, <number>)] |
获取数组下标,出现 number 就代表一定是数组对象。表示一个或多个数组下标 |
[start:end] |
获取一个区间,数组片段,区间为[start,end),不包含end |
[?(<expression>)] |
进行搜索和过滤,可以组合使用,如果使用[?(<@expression>)]判断当前对象的某一个属性或者属性是否满足某一个条件。满足的话会把符合条件的数据进行返回。 过滤器表达式,表达式结果必须是 boolean |
(3)函数
名称 |
描述 |
输出 |
Min() |
获取数值类型数组的最小值 |
Double |
max() |
获取数值类型数组的最大值 |
Double |
Avg() |
获取数值类型数组的平均值 |
Double |
stddev() |
获取数值类型数组的标准差 |
Double |
length() |
获取数值类型数组的长度 |
Integer |
(4)过滤器
过滤器在 Jason 里面是比较高级的用法,和写代码是类似的。过滤器是用于过滤数组的逻辑表达式,一个通常的表达式形如: [?(@.age> 18)],@是当前对象,总的意思是,在整个对象里面如果年龄>18,就把对象返回到数组对象之中,如果不满足这条件,就不返回,起到过滤的效果。 可以通过逻辑表达式&&或| |组合多个过滤器表达式,例如[(@.price < 10 && @.category == fiction)]
,先判断 price<10,加上一个且 Category=fiction,这就是两个判断组成的且判断。字符串必须用单引号或双引号包围,不包围就会被认定为是变量,会出现报错。例如
[?(@.color == 'blue')] or [?(@.color == "blue")].
操作符 |
描述 |
== |
等于符号,但数字1不等于字符1(note that 1 is not equal to 11 |
!= |
不等于符号 |
< |
小于符号 |
<= |
小于等于符号 |
> |
大于符号 |
>= |
大于等于符号 |
=- |
判断是否符合正则表达式,例如[?(@.name =- /foo.*?/i] |
in |
所属符号,例如[?(@.size in ['S', 'M])] |
nin |
排除符号 |
size |
size of left (array or string) should match right |
empty |
判空符号 |
以上是 JasonPath 的判断操作符。
(5)示例{
"store": {
"book": [
{
"category": "reference",
"author": "Nige1 Rees",
"title": "sayings of the Century" ,
"price": 8.95
{
"category": "fiction",
"author": "Evelyn waugh" ,
"title": "Sword of Honour",
"price": 12.99|
}
{
"category": "fiction",
"author": "Herman Me1ville" ,
"tit1e": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
{
"category": "fiction"
,"author": "I。 R. R. Tolkien",
"tit1e": "The Lord of the Rings",
"isbn": "0-395-19395-8" ,
"price": 22.99
],
"bicycle": {
"color": "red"
"price": 19.95
}
}
"expensive":10
}
ason 对象里面又包含了子对象 store,store 下面包含了一个数组 book 以及对象 bicycle,book 数组里面包含了不同的子对象。
JasonPath(点击链接测试) |
结果 |
$.store.book[*].author <br/> 或<br/> $..author |
获取 json 中 store 下 book 下的所有 author 值。[*]标 book 里面任意一个对象,获取 store 的 book 里面,所有对象的 author 字段,返回的肯定是一个数组。..的含义是递归搜索,搜索它的所有下级,只要下级包含 author 信息就会返回,如果在 store 里面也包含了 author 信息,二者是不相等的,只要包含author 就返回。使用$.store.book[*].author <br/> 会更加精确。 |
$.store.*显示所有叶子节点值 |
所有的东西,书籍和自行车 |
$.store..price |
获取 json 中 store下所有 price 的值,返回数组之中 |
$.book[2] |
获取 json 中 book 数组的第3个值 |
$..book[-2] |
倒数的第二本书,没有-0的说法 |
$..book[,1]<br/> 或<br/> $.book[:2] |
前两本书 :代表最开始,相当于0的意思,2代表取两个 |
$..book[1:2] |
从索引1 (包括)取到索引2 (排除)的所有图书,不包含第二个索引 |
$.book[-2:] |
获取 json 中 book 数组的最后两个值 |
$.book[2:] |
获取 json 中 book 数组的第3个到最后一个的区间值 |
$.book[?(@.isbn)] |
@指的是 this,this 指的是 BOOK 下一级,获取 json中 book 数组中包含 isbn 的所有值 |
$store book[?(@.price < 10)] |
获取 jason 中 book 数组中 price<10的所有值,最终返回的是数组 |
$.book[?(@.price <=$['expensive])] |
获取 json 中 book 数组中 pricec=expensive 的所有值,expensive 相当于传递某一个变量,如果小于变量值,就代表成立。$是根节点,根节点里面的expensive 数据和 price 进行对比。 |
$.book[?(@.author =- /.*REES/i] |
获取 json 中 book 数组中的作者以 REES 结尾的所有值(REES 不区分大小写) |
$.* |
逐层列出 json 中的所有值,层级由外到内,是一个完整的 Jason |
$..book.length() |
获取 json 中 book 数组的长度,book 数组一共有多少个 |
以上为 jasonpath 表达式的用法。
2.提取 json 属性到 Attribute
属性当中使用了 jason 表达式,jasonpath 表达式进行获取,$是根节点,$database 指的是数据库名称,Columns 下面第二个值得 value 就是 date_time,Columns 下面的第一个值的 value 就是 ID,Columns 下面第二个 value 值是 name,获取到不同的属性值,获取到这些关键信息以后,就可以把这些关键信息转变为 SQL 语句进行插入,运行以后就可以看到这些数据已经获取到了。
3.运行并查看输出
4、ReplaceText 转换 sql
配置 ReplaceText
在 Replacement value 当中写了一个sql语句,SQL 语句的 value 值用到了刚才提取到的变量信息。Replacement value = insert into myhive .nifi. _hive (id,name ,day_ time) values(S{id},'S{name}','${day_ time}'
通过这些变量,拼接成为SQL语句然后写入到hive。Database Connection URL = jdbc :hive2://192.168.52.120: 10000
Hive Confi guration Resources = /export/down oad/config/core-site . xml,/export/download/config/hdfs-site .xm1 ,/export/down1oad/config/hive-site .xm1
六、写入 Hive
1.创建 PutHiveQL
PutHiveQL 也需要连接池,和 mysql 是类似的,所以先要创建一个连接池对象。
2.创建配置 HiveConnectonPool
连接池的配置对象非常简单,和 mysql 是一致的,首先是 JDBC URL 连接地址,第二个是 HIVE 的配置文件,主要包含 hive-site、hdfs-site.xml 以及 core-site.xml 都要配置进来,否则会出现报错。配置完成后,记得启用HiveConnectonPool
3.PutHiveQL 关联 HiveConnectonPool
通过以上操作就可以写入 hive 了。写入完成后,通过观察 hive 表的数据确认程序是否能够正常运转。