开发者学堂课程【NiFi 知识精讲与项目实战(第一阶段):了解 NiFi 处理器】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/705/detail/12513
了解 NiFi 处理器
内容介绍
一、NiFi 处理器
二、总结
一、NiFi处理器
1. 查看处理器
首先查看处理器,在菜单栏当中,选中 process 图标,把 processor 拖入到工作区当中:
弹出所有处理器的选择界面:
2.常用处理器
NiFi 官方提供了293个处理器,整合了大部分常用的技术,其他技术在不断的完善增加当中。在 filter 输入栏当中填写所需要的 NiFi 处理器的名称进行搜索,进行添加,常用的处理器包含:
(1)ExecuteScript:执行脚本处理器,支持: clojure, ecmascript, groovy, lua, python,ruby
(2)QueryDatabaseTable:数据库查询处理器,支持: mysql
(3)ConvertAvroToJSON: avro 数据格式转换为 JSON,avro 是一种常见的格式。在 NiFi 中使用较多。从数据库中查询出来的数据是 avro 格式,需要将其转换成 JSON 格式以后才方便处理 。
(4)SplitJson:该处理器主要用于切分 JSON,将 JSON 文件拆分为多个单独的 FlowFile, JSON 数据可能是一个数组。 可以用该处理器将数组拆分成不同的 JSON 对象。 每个 JSON 对象就是一个单独的 FlowFile。用于由JsonPath 表达式指定的数组元素,也可以指定帧,JsonPath 表达式使用也较多。以上3个组件经常结合在一起使用。 首先是查询数据库数据,将查出来的 avro 格式数据 ,通过 ConvertAvroToJSON 转换成 JSON 格式,然后再通过splitJSON,拆分成单独的对象。
(5)EvaluateJsonPath:根据 FlowEile 的内容评估一个或多个 JsonPath 表达式。表达式的结果将分配给 Flow 属性,或者写入 FlowFile 本身的内容,方便后续提取使用。具体取决于处理器的配置。
(6)ReplaceText:文本组装与替换,支持正则表达式,通过正则表达式进行文本的替换。
(7)PutHDFS:将 FlowFile 数据写入 Hadoop 分布式文件系统(HDFS)
(8)PutHiveQL:写入 hive 数据库当中。通过 ddI/dml sql 脚本来写入数据库当中。执行 hive ddI/dml 命令,如: insert, update
(9)Publishkafka_2_0∶用于推送 kafka 消息。根据配置将消息发送到 kafka topic,该处理器也是较为关键的处理器。
(10)SelectHiveQL:执行 hive select 语句并获取结果,查询 Hive 的数据。与 put 形成对比,Put 主要是 insert 和 update。
(11)PutSQL:执行 SQL 的 insert 或 update 命令。支持多种关系型数据库。
(12)GetFile:从目录中的文件创建 FlowFiles。从某个目录当中读取文件,将文件里面的内容转化为 FlowFiles。
(13)PutFile:将 FlowFile 数据写入本地磁盘目录当中。
(14)GetHDFS:从 Hadoop 分布式文件系统获取文件,作为 NiFi flow files。
(15)capturechangeMysQL:从 MySQL 数据库中 binlog 日志,检索更改数据捕获(CDC)事件。CDC 事件包括INSERT,UPDATE,DELETE 操作。查询到之后,作为 FLOW file 文件传输给处理器。事件作为单个流文件输出,文件按操作发生的时间排序。该功能可以实现实时采集 MySQL 离线数据。
(16)Executestreamcommand :—般用于执行 sh 脚本,spark 代码写完之后需要把 spark jar 包上传到集群当中,通过命令进行 update 提交和运行。如果使用 NiFi,即可通过该处理器定时自动执行 spark 脚本。 以上是常用的处理器。
3.配置处理器
首先添加一个处理器,例如添加 get file 处理器。如图所示:
在菜单的最左边选中 processor 选中,之后拖到工作区就出现了 processor 选择框,此时可以输入 processor 名称进行检索。例如,要添加 get file 处理器:
点击 add,就能够将该处理器成功添加到面板当中。添加处理器之后,还要对处理器进行配置。通过右键的方式来进行配置,右键以后有很多的菜单按钮:
(1)configure(配置)︰此选项允许用户建立或更改处理器的配置。首先是 config 配置,点击按钮以后,就会打开选项配置框。
(2)Start(启动或停止)︰此选项允许用户启动或停止处理器;该选项可以是 Start 或 Stop,具体取决于处理器的当前状态。它可以把处理器禁用掉,禁用以后按钮就变成 ennable,再右键选择 ennable 处理器,又变为启用状态。
(3)Disable(启用或禁用)︰此选项允许用户启用或启用处理器;该选项将为“启用"或"禁用”,具体取决于处理器的当前状态。
(4)View data provenance(查看数据来源)︰此选项显示 NiFi 数据来源表,其中包含有关通过该处理器路由FlowFiles 的数据来源事件的信息。以及 FlowFiles 的属性及内容信息。
(5)View status history(查看状态历史记录)︰此选项打开处理器统计信息随时间的图形表示。主要是统计信息,统计了数据随着时间的流转的图表,图表展示的较全。
(6)View usage(查看用法):此选项将用户带到处理器的使用文档。相当于用户手册,如果不会用处理器,可以通过该按钮,来查看使用和配置方法。
(7)View connection →Upstream(查看连接→上游)︰此选项允许用户查看和“跳转"入处理器的上游连接。当处理器连接进出其他进程组时,这尤其有用。查看当前处理器的上游处理器是什么,选中按钮以后,即可自动跳转到上一个处理器或者上一个处理器组。
(8)View connection → Downstream(查看连接→下游)︰此选项允许用户查看和"跳转"到处理器外的下游连接。当处理器连接进出其他进程组时,这尤其有用。点击下游按钮以后,就会自动跳转到所连接的下一个处理器,或者处理器组,当关联比较复杂,或者分组的时候,通过这两个按钮可以快速定位到连接。
(9) Centere in view(视图中心)︰此选项将画布的视图置于给定的处理器上。主要是把当前的处理器,定位到左侧视图的中心。
(10)Change color(更改颜色)︰此选项允许用户更改处理器的颜色,这可以使大流量的可视化管理更容易。主要是外观上的选项,在推荐用默认即可,如果有独特爱好,可以进行变更。
(11)Group(添加到组)︰把当前处理器添加到组
(12)Create template (创建模板) :此选项允许用户从所选处理器创建模板。把所选中的处理器,创建单独的模板文件,相当于备份,后续需要使用的时候,可以直接把模板上传直接使用,配置还会进行保留。
(13)Copy (复制) :此选项将所选处理器的副本放在剪贴板上,以便可以通过右键单击工作区并选择“粘贴”将其粘贴到工作区上的其他位置。复制粘贴操作也可以使用按键 Curl-C 和 Crl-V 完成。
(14)Delete (删除) :此选项允许从画布中删除处理器。 部分按钮无法操作展示,在后续案例展示当中,会为用户演示如何操作使用按钮。
使用配置按钮配置处理器,通过 config 按钮打开配置选项框:
在配置选项框当中有4个设置:
4.SETTINGS 设置
setting 选项卡中的名称默认与处理器类型相同,可以进行修改。当同类型处理器存在2个或多个的时候,名称可能会重复,此时需要修改。
enable 表示当前处理器是否可用,如果去掉勾选就不可用。
处理器的 ID、类型等信息。
再往下是处理器可能发生的事件,事件可以指示数据在稍后的时间进行处理,也就是延迟,例如30秒,意思是启动30秒之后,才会进行处理器的执行,一般保留默认值即可,不需要修改,除非需要进行延迟执行。
YieldDuration,默认情况下,也要使用默认值,尽量不修改,作用是阻止处理器运行一段时间,处理器运行的时候,会产生间隔,采用默认值。
日志级别,在调试的时候,可以改成 warn,在生产环境生产的时候,如果担心日志文件过多,可以改成 info 或者ever 级别。 自动终止关系,选中什么操作结果回到自身,例如勾选 success,那么在成功之后就会回到自身停止操作,不再往下一个处理器进行流传,一般不会勾选或只勾选失败。成功就继续往后执行,调整到下一个处理器。
在 setting 页面中,最关键的是 name 以及 enable,其属性暂时不修改。较核心的是处理连接关系,主要是处理的不同结果,路由到不同地方。
5.SCHEDULING 定时调度
主要涉及如何运行,何时运行处理器。在处理流程当中只有第一个处理器需要配置界面。 如果是第二个处理器,就算配置也不生效,因为是由上一个处理器决定,所以一个流程当中只需要在第一个处理器当中配置调度界面即可。
Scheduling Strategy 主要是调度的策略,一共有两种策略,第一种是 timer driven 是默认策略主要是时间驱动,处理器运行时间间隔的方式。例如选中时填写30秒,意思是每30秒处理器会运行一次,如果改成30毫秒,就是30毫秒运行一次。第二种是 current driven,使用 current 表达式进行定时任务执行。如果不了解,可以进行信息查询,有许多现成脚本提供使用。两个驱动都是用于定时调度,第一种是时间间隔,第二种通过 Current 表达式,第二种会更灵活。左侧是任务数,也就是线程数,配置越大即可采用越多线程并行执行。excussion 执行器,一般单节点默认是All nodes,如果是集群模式,就会有 primary node 和 All nodes 两种模式, primary nodes 是指在主节点执行当前处理器任务,如果是 All nodes 就是负载均衡的分布式的执行处理器任务。部分处理器支持 All nodes 执行,部分处理器只支持 primary nodes,例如查询 Mysql 处理器。该处理器仅支持 Primary nodes 模式,如果在所有节点都执行,就会产生重复的查询操作。如果需要负载均衡执行,有其他办法进行处理。
6.PROPERTIES 配置属性
每个处理器的属性不同,不同的处理器属性值可以为独特的业务进行服务。此处不展开介绍,因为处理器有很多在具体应用时针对不同处理器进行讲解。
7.COMMENTS 注释
该面板就是将注释信息写入,便于开发维护以及团队沟通。
二、总结
1.NiFi 的安装和启动
NiFi 的安装和启动,有以下步骤:
安装需要准备运行环境,在环境当中提供了 VML 虚拟机,需要能够进正常配置。GVM+gdk,已经安装过可以直接使用。其次需要下载安装包。在资料中已经提供,可以直接使用。接下来是修改配置,主要是修改端口号。最后开始解压运行,解压是通过 Linux 脚本进行解压,解压之后就可以启动,通过 NiFi 目录下1.9.0\bin,在 bin 目录下,有NiFi.sh 脚本,通过 star 命令进行启动,启动以后,即可通过访问 IP 地址+端口号58080- NiFi 进行访问,以上步骤就是 NiFi 的安装和启动。
2.NiFi 处理器的了解
在处理器中,了解如何查看处理器、如何配置处理器以及常用的处理器。处理器的重点是配置项,在配置项当中关键的点是设置页、任务调度页、属性页,注释页。调度是重点,它的属性能够掌握,属性也是重点,但是每个处理器属性不同,所以在之后用到不同处理器的时候再进行讲解。