开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):实时同步 Mysql 到 Hive-1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12549
实时同步 Mysql 到 Hive-1
内容介绍:
一、处理器流程
二、CaptureChangeMySQL
三、DistributeMapCacheServer
四、DistributedMapCacheClientService
五、RouteOnAttribute
六、PutHiveQL
七、HiveConnectionPool
此案例和之前案例的区别是:之前的案例是临线的同步,而这案例是实时同步,之前是同步到 HDFS 而这是直接写入到 hive 当中去。
一、处理器流程
首先开启 binlog 的日志,开始以后就可以通过 Capture changemysql 处理器,进行监听,所有 mysql 的操作,它全部都可以监听到,监听到以后可以根据某一种类型或者是某几种类型的操作去进行特殊的处理。这时候就可以使用RouteOnAttribute,来根据不同的操作类型进行不同的路由分发,然后再使用 evaluatejasonpath 提取所需要的属性信息,evaluatejasonpath,可能会使用到 jasonpath 表达式,在案例当中将会讲解。使用 replacetext 来把获取到的属性利用起来拼接成所需要的 SQL 语句,最后是通过 SQL 语句把数据写入到 Hive 数据库当中。
1.准备工作
先创建好 Mysql 表,如果之前只想过 mysql 的 sql 文件,那么表是已经创建好了的,还要创建一个 hive 表,Hive 表和 MySQL 表是相对应的,它们的字段都是一致的。
Mysql 建库建表:
create table nifi_test.nifi_hive
(
Id int auto_increment
primary key,
namevarchar(64) null,
day_time date null
);
Hive 建表:
CREATE TABLE myhive.nifi_hive(id int , name string,day_time string)
STORED AS ORC
TBLPROPERTIES( 'transactiona1'='true ');
替换 Hive 支持 nar 包:
上传文件 NiFi\资料\nifi 安装包\nifi-hive-nar-1.9.2.nar,将其替换到 NiFi 服务的 lib 目录下,并重启 NiFi 集群。
nifi-hive-nar-1.9.2.nar 是新的版本,它所支持的 hive 已经变成 hive2版本了,不再支持 Hive1版本,这时候如果不去替换 nar 包,直接去操作的话,是会出现问题的,它会提示有一些 lib 不存在,把 nar 包替换完以后,才可以正常连接到 hive 数据库。
替换的方法是 nar 包,在资料 nifi 安装包文件下,把它上传到 nifi 的 lib 目录下,有一个名为 lib 的文件夹,重新启动 NiFi 集群。接下来把过程中涉及到的处理器做一下了解。
二、CaptureChangeMySQL
1.描述
它主要的目的是实时监测 MySQL 的事件,从 MySQL 数据库检索更改数据捕获(CDC)事件。CDC 事件包括 INSERT,UPDATE,DELETE 操作。事件将作为单独的流文件输出,并按操作发生的时间排序。
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值,属性是否支持NiFi表达式语言以及属性是否被视为“敏感",这意味着将加密其值。在敏感属性中输入值之前,请确保nifi.properties 文件具有属性 nifi.sensitive.props.key 的条目。
名称 |
默认值 |
允许值 |
描述 |
MysQL Hosts |
与 MySQL 群集中的节点相对应的主机名/端口条目的列表。条目应使用冒号(例如host1: port,host2: port 等)以逗号分隔。例如mysql.myhost.com:3306。该处理器将尝试按顺序连接到列表中的主机。如果一个节点发生故障并为集群启用了故障转移,则处理器将连接到活动节点(假定在此属性中指定了其主机条目。MySQL 连接的默认端口为3306。支持表达式语言: true (将为仅使用变量注册表进行评估) |
||
MysQL Driver class Name |
MySQL 数据库驱动程序类的类名称支持表达式语言;true(仅使用变量注册表进行评估) |
||
MySQL Driver Location(s) |
包含 MySQL 驱动程序 JAR 及其依赖项(如果有)的文件/文件夹和/或URL 的逗号分隔列表。例如,“/ var / tmp /mysql-connector-java-5.1.38-bin.jar”支持表达式语言:true(仅使用变量注册表进行评估) |
||
Username |
访问 MySQL 集群的用户名支持表达式语言: true(仅使用变量注册表进行评估) |
||
Password |
访问 MysQL 集群的密码敏感属性: true 支持表达式语言:true(仅使用变量注册表进行评估) |
||
Server ID |
连接到 MySQL 复制组的客户端实际上是一个简化的从属服务器(服务器),并且服务器 ID 值在整个复制组中必须是唯一的(即不同于任何主服务器或从属服务器使用的任何其它服务器 ID)。因此,每个CaptureChangeMySQL 实例在复制 组中必须具有唯一的服务器 ID。 如果未指定服务器 ID,则默认值为65535。支持表达式语言: true (仅使用变量注册表进行评估) |
||
Database/Schema Name Pattern |
用于将数据库(或模式,取决于RDBMS 术语)与 CDC 事件列表进行匹配的正则表达式(regex) 。正则表达式必须与存储在 RDBMS中的数据库名称匹配。如果未设置该属性,则数据库名称将不会用于过滤CDC 事件。注意: DDL 事件(即使它们影响不同的数据库)也与会话用来执行 DDL 的数据库相关联。这意味着,如果与-个数据库建立了连接,但针对另一个数据库发出了DDL,则连接的数据库将是与指定模式匹配的数据库。 |
||
Table Name Pattern |
用于影响影响匹配表的 CDC 事件的正则表达式(regex) 。 正则表达式必须与存储在数据库中的表名匹配。如果未设置该属性,则不会基于表名过滤任何事件。 |
||
Max Wait Time |
30s |
建立连接所允许的最长时间,零表示实际上没有限制。支持表达式语言: true (仅使用变量注册表进行评估) |
|
Distributed Map Cache Client |
标识用于保留有关处理器所需的各种表,列等的信息的分布式映射缓存客户端控制器服务。如果未指定客户端,则生成的事件将不包括列类型或名称信息。 |
MySQL hosts 是 MySQL 集群的主机和端口号,它就是常说的 JDBC 连接 URL。MySQLdriverclassname 是驱动命名,MySQLdriverlocations 是夹包地址,之前的案例当中已经演示过直接可以把路径写入就可以。
username 是用户名,password 是密码,Distributed Map Cache Client 是缓存客户端,主要就是把实时读取的数据放到缓存当中去,然后通过后续的处理,不断地从缓存中读取数据,如果直接发送到 Dateup Flow,就会涉及到一个问题,CaptureChangeMySQL 可能会去存储当前读取到哪一个位置。比如监听到数据是今天12点钟的数据,如果说服务器停止运行了,到明天又启动的时候,这过程当中,需要重新上次时间点进行获 MySQL 数据的变更,但是如果没有缓存的话,可能无法记录下该从哪里读取,这时候要么重新去读取,要么从最新的数据去读取,所以需要一个缓存。使用 CacheCache 客户端首先要先去创建组件,接下来就要了解一下 cache 组件。
三、DistributeMapCacheServer
Cache 组件有两个,一个是 server 端,一个是 client 端,从名称可以看出来一个是 client service,一个是 server,顺序上一定是先有 server,然后客户端才能连接 service 进行操作。
1.描述
提供可通过套接字访问的映射(键/值) 缓存。类似于 redis,与该服务的交互通常是通过 DitributedMapCacheClient 服务完成的。
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值。
名称 |
默认值 |
允许值 |
描述 |
端口 |
4557 |
侦听传入连接的端口 |
|
最大缓存条目 |
10000 |
缓存可以容纳的最大缓存条目数 |
|
驱逐策略 |
最少使用 |
最少使用最近最少使用先进先出 |
确定应使用哪种策略从缓存中逐出值以为条目腾出空间 |
持久性目录 |
如果指定,则缓存将保留在给定目录中; 如果未指定,则高速缓存将仅在内存中 |
||
SSL 上下文服务 |
StandardRestrictedSSLContextService |
如果指定,此服务将用于创建SSL上下文,以用于保护通信;如果未指定,则通信将安全 |
端口号和最大缓存条目都不需要进行修改,创建一整个集群只需要创建一个 cacheserver,不需要出现一个处理器组创建一个十个处理器组创建十个的情况,十个处理器组可以使用一个 cacheserver,和 redis 服务是一样的,redis 服务是可以共用的,如果数据量很大,可以去创建多个。如果太大的话,不太适合使用本地的划分,缓存是 nifi 本地缓存,如果需要的缓存非常大,建议使用 redis 提供足够的缓存空间。有了 cacheserver 以后,就要通过 client 进行连接。连接的时候需要肯定IP和端口号。
四、DistributedMapCacheClientService
1.描述
提供与 DistributedMapCacheSenyer 通信的功能。可以使用它来在 NiFi 群集中的节点之间共享地图
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值。
名称 |
默认值 |
描述 |
服务器主机名(IP 地址) |
一般来说填写 nifi 服务器的 IP 即可,本机可以写 local host |
运行 DistributedMapCacheServer服务的服务器的名称 |
服务器端口 |
4557(端口号要保持和 server一致,直接使用即可,无需变更) |
与 DistributedMapCacheServer 服务通信时将使用的远程服务器上的端口 |
SSL 上下文服务 |
如果指定,则表示用于与远程服务器通信的 SSL 上下文服务。如果未指定,通讯将不会被加密<br> StandardSSLContextService<br/> StandardRestrictedSSLContextService |
|
通讯超时 |
30秒 |
指定在无法发送或接收数据时确定存在通信故障之前与远程服务器通信之前要等待多长时间 |
两个 cache 服务和 cache 服务创建好以后,就可以在 Capturechangemysql 处理器之中进行配置,去使用 cache client 操纵缓存信息,这三个是配合使用的。
五、RouteOnAttribute
1.描述
该处理器使用属性表达式语言,根据流文件的属性去计算然后进行路由。该处理器往往用于判断逻辑。
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。任何其它属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值), 以及属性是否支持表达式语言。
属性配置 |
默认值 |
可选值 |
描述 |
Routing Strategy |
Route to Property name |
Route to Property nameRoute to 'matched' if all matchRoute to 'matched' if any matches |
指定如何确定在计算表达式语言时使用哪个关系 |
3.动态属性
该处理器允许用户指定属性的名称和值
属性名称 |
属性值 |
描述 |
用户自由定义的属性名称(Relationship Name) |
用户自由定义的属性值 (Atribute Expression Language) |
将其属性与动态属性值中指定的属性表达式语言相匹配的流文件路由到动态属性键中指定的关系.支持表达式语言:true |
4.连接关系
名称 |
描述 |
Unmatched |
不匹配任何用户定义表达式的流文件将被路由到这里 |
5.自定义连接关系
可以根据用户配置处理器的方式创建动态连接关系。
Name |
Description |
动态属性的属性名 |
匹配动态属性的属性表达式语言的流文件 |
主要是根据刘文件的属性,做一个路由分发工作,作用是根据事件的内容,和不同的信息,去进行关联关系的分发,处理器可以添加,自定义的属性,也就是动态属性,属性来源于 flow file 的属性,Flow file 通过 nifi 表达式直接进行调用,添加到处理器的自定义属性当中,然后根据表达式,实际的值进行不同的路由,实际上就是通过 flow file 的内容进行分发。
六、PutHiveQL
1.描述
主要是向 HIVE 中进行插入或者更新操作。执行 HiveQL DDL/DML 命令(例如,UPDATE, INSERT) 。预期传入 File 的内容是要执行的 HiveQL 命令。HiveQL 命令可以使用?转义参数。在这种情况下,要使用的参数必须作为 FlowFile 属性存在,命名约定为 hiveql.args.N.type和 hiveql.args.N.value,其中 N 是一个正整数。hiveql.args.N.type 应该是指示 JDBC 类型的数字。FlowFile 的内容应采用 UTF-8格式。因为新版本不再支持 hive 1.1,所以要提前把 nar 包更新上去。
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值。
名称 |
默认值<br> |
描述 |
**Hive Database Connection Pooling Service**<br/> |
I Hive Controller Service,用于获取与 Hive 数据库的连接 |
|
Batch Size |
100 |
在单个事务中放入数据库的首选FlowFiles 数 指定记录数据的字符集。 |
Character Set |
UTF-8 |
指定记录数据的字符集。 |
Statement Delimiter |
; |
语句分隔符,用于在多语句脚本中分隔 SQL 语句 |
Rollback On Failure |
False |
指定如何处理错误。默认情况下(false) ,如果在处理 FlowFile 时发生错误,则 FlowFile 将根据错误类型路由到失败”或“重试”关系,处理器可以继续下一个 FlowFile。相反,您可能想回滚当前已处理的 FlowFile,并立即停止进一步的外 理。在这种情况下,您可以通过启用此"回滚失败”属性来实现。如果启用,失败的 FlowFiles 将保留在输入关系中,而不会受到惩罚,并会反复处理,直到成功处理或通过其它方式将其删除为止。重要的是要设置足够的"有效期限”,以免重试次数过多。 |
操纵 hive 数据库需要有连接池,和 my circle 是一样的,可以选择去创建 hive 连接池,配置 hive 信息。其它的配置项都可以直接使用默认值,通过这种方式就可以通过接收之前,传送过来的,填写好的 SQL 语句,直接去操作 hive,因为用过好多次 replacetext 处理器,所以大纲里面没有再提及。PuthiveQL 要使用连接池,需要使用hiveconnectionpool。
七、HiveConnectionPool
1.描述
为 Apache Hive 提供数据库连接池服务。可以从池中请求连接,使用后返回连接。
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值,属性是否支持 NiFi 表达式语言。
DatebaseConnectionURL 配置的是 JDBC 的地址链接,类似于 mysql,配置端口号、IP 地址,HiveConfigurationResources 主要配置 hive 的信息,文件提及的主要是 hive-site.xml,实际中配置的是一定要把hadoop 的链接也配置进来。Hive 中一般没有设置账户密码,如果设置了就可以填写进去。
名称 |
默认值<br> |
描述 |
** Database Connection URL |
用于连接数据库的数据库连接URL。可能包含数据库系统名称,主机,端口,数据库名称和一些参数。数据库连接 URL 的确切语法由Hive 文档指定。例如,当连接到安全的 Hive 服务器时,通常将服务器主体作为连接参数包括在内。支持表达式语言: true (仅使用变量注册表进行评估) |
|
Hive Configuration Resources |
包含 Hive 配置(例如,hive-site.xml) 的文件或文件的逗号分隔列表。否则,Hadoop 将在类路径中索”hive-site.xml"文件,或恢复为默认配置。请注意,例如要启用Kerberos 身份验证,必须在配置文件中设置适当的属性。请参阅Hive 文档以获取更多详细信息。支持表达式语言: true (仅使用变量注册表进 行评估) |
|
Database User |
数据库用户名支持表达式语言: true (仅使用变量注册表进行评估) |
|
Password |
数据库用户的密码敏感属性: true 支持表达式语言: true (仅使用变量注册表 进行评估) |
|
Max Wait Time |
500毫秒 |
池在失败之前将等待(如果没有可用连接时)返回连接的最大时间,或者无限期 等待-1。支持表达式语言: true (仅使用变量注册表进行评估) |
Max Total Connections |
8 |
可以同时从该池分配的活动连接的最大数量,或者为无限制的最大数量。支持表达式语言: true (仅使用变量注册表进行评估) |
Validation query |
验证查询,用于在返回连接之前对其进行验证。当借用的连接无效时,它将被丢弃并返回新的有效连接。注意:使用验证可能会降低性能。支持表达式语言:true (仅使用变量注册表进行评估) |