开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):离线同步 mysql 数据到 HDFS2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12545
离线同步 mysql 数据到 HDFS2
内容介绍
一、实际操作演示
一、实际操作演示
将 MySQL 的数据同步到到 HDFS
1.具体操作
首先来新建一个处理器组,便于操作和理解,将处理器组命名为 MySQL HDFS。
创建成功以后,双击进入到的处理组当中,然后按照之前的流程来进行创建。 首先第一点要先得到数据,通过 quarry database table 处理器来进行查询,处理器添加完毕。
添加完毕以后,要对它进行配置来读取数据。首先配置要在第一个节点,这里要把它改的大一点,否则的话查询速度会非常的快,是不间断的去进行查询,这会对的服务器而造成非常非常大的压力。 如果说现在是在的个人电脑上运行的服务,时候甚至可能会造成整个 WiFi 服务宕机。改成9999 sec,意思也是说我只有当重新启动的时候,它才会运行第二次,否则的话它只运行第一次。这对的测试来说已经足够。
然后是它的属性配置,属性配置这里面有一个很重要的一点是说它需要使用一个数据库连接池对象。
查看数据库连接池对象可以点击它的 value 值,这是弹出一个下拉框,在下拉框当中可以看到它提示没有值,没有数据。 那还有一个再往下看,还有一个是 Create new service。
那这时候既然没有,那要创建一个新的的连接池,这时候它弹出一个选择框。在选择框当中是提供各种不同的数据库连接池对象,这里面直接使用 dbcp connection,使用来进行创建。名字可以把它修改成 MySQL、 Connection库,mySQL 数据库连接池。
创建完以后,发现它已经进入进来到的默认选中到的配置相当中。 此时要注意时候它并不是可以直接用,因为只是创建还没有配置,可以看到在属性配置向右侧有一个构图的箭头,点击箭头。
它提示是否保存的修改信息,在配置之前要先保存,点 yes。
然后跳转到连接池的配置界面,在连接池配置界面,可以看到是刚才所创建的名称数据库连接池。
2.配置方法
在有一个右侧有一个 Configure,这样的一个按钮,点击按钮可以进行配置。点击按钮以后打开,配置选项界面,然后点击 properties,在里面是数据库连接是所需要的一些配置
第一个 Data Connection ul 是数据库的连接 UL 地址,数 Driver class name 是数据库的驱动类名。数据库驱动的地址目录,也是的架包。再往后还有一个是的数据库用户和密码,把这些属性配置上,那 MySQL,基本上可以进行连接。 UL 地址和账号密码,是和每个人的数据库保持一致,不可能大家都一模一样,因为使用的 uL 和账号和密码都不一样,这里使用的是 window mysql,如果使用 linlings MySQL 也可以,但是注意要是5.7版本以上才可以。
讲义中已经把的数据库配置的地址已经列出来。是数据库的一个连接地址。52.6是 windows 的 IP 地址,复制过来,在地址当中数据库 niFi-test,是之前在做环境准备的时候,运行那个 MySQL 语句创建的数据库。
把地址先复制过来,保存,若是存在访问不通现象,是由于长时间没有访问所造成。
可以重新执行。现在的 CPU 占用非常高,是来查看的 cluster。Cluster 都是正常的,继续进行配置,刚才进入到的配置页面以后,可以从这儿去访问 MySQL Connection 。 还有一种方式访问,在工作区上直接右键点 Configure 。
这时候可以看到打开 Configure 之后,在面板里面的第二个 tape 页,点击是 MySQL 连接池,继续去进行配置。在这里面填写 ul 地址。然后接下来是数据库驱动程序,驱动程序也直接把它复制过来,是 mysql.jbdc.Driver。架包的位置放在目录下,但是要注意,架包要在 niFi 服务器上存放,因为这是本地磁盘读取目录,读取的驱动。 所以说要上传到的 niFi 集群当中。
目前没有文件,那必须要先创建,需要把文件先上传进来。看目录现在还没有,那可以创建目录,创建一个 jar 文件夹,进入这样的目录下,再进行上传。架包位置在 MySQL 资料当中有一个 MySQL 架包。
把架包拖动进来,上传到的 NIFI 服务器上,这时候给他设置权限,此时文件已经存在,它可以正常使用。再接下来是的账号和密码,我的数据库账号和密码是 root 和123456,设置的时候,大家按照自己的数据库账号进行设置可以。在设置之前最好是通过自己的 mySQL 客户端进行连接测试,测试账号密码它是否有效。 如果测试的时候发现账号密码不正确的话,记得要找到账号正确密码再进行设置。
然后点击 APPLY,连接池原来这有一个感叹号的一个提醒警告现在已经没有,但是现在状态是 Disable 状态为不可用状态,禁用状态启动,在这里有闪电的符号,点击闪电 endable,然后这里面直接去点 endable 可以。
此时可以看到它的状态已经发生变化,将它关闭。数据库连接池配置完之后,继续配置。设置表名,查询表名 user-infor,此数据库在给创建的 sql 里面,已经有表表的数据结构,一共5个字段。Id 、 name、手机号、EMAIL 以及一个子 Json 数据。
表名配置完成后,还要再配置的 sql 语句进行查询。有两种方式,一种方式是通过默认的组合进行查询,还有一种方式是通过 Custom query。首先使用默认的方式,让表名和反馈的字段以及 where 条件组合的方式来进行上去,那可以这样,查询的字段包含有 ID、name(注意字段要和的数据库保持一致,如果不一致的话,他也是会查询报错的)、moblie, EMAIL,son-Josn,。然后点击 ok,主要来查询这几个字段。这里 where 条件是不需要添加的,可以加 limit 试一下。例如查询15条,查询出来表格中一共有21条数据。
只查询五条,limit5 后观察是否成功,where 条件无非是加到 SQL 语句服务表明的后面,但是其实它是wherelimit5,wherelimit5 的话,是不行的会报错,可以这样来讲,where1=1,然后再加上 limit5,通过这样的方式进行拼接,不会有错误。 where1=1,然后 limit5,然后点击 apply。那时候的 Query database table 已经创建并且配置成功。再往下,需要把它所读取出来的数据转换为 JSON 格式。因为读出来是一个 Avor 格式,转换需要创建一个 convert up to Json 这样的一个处理器。从这里拖出来,convert 可以选中第二个,这时候添加成功的第二个处理器,然后添加成功之后,可以先把这两个处理器进行一个连接。点击 add 这时候已经连接成功,那可以看现在服务器上没有正在运行的,现在都是处于停滞的状态,连接以后,还要注意设置一点,是第一个处理器, 它是单节点。如果不设置单节点运行,让他在整个集群都运行,那三个节点会同时运行产品数据重复。
在 SCHEDULING 进行设置,但是可以发现处理器仅支持 Primary node,All node 这种模式根本是无法选中。所以说数据它只支持单节点运行,是为避免这种情况,重复查询完之后,连接到 convert up to Json,在往后的这些数据处理的过程中,希望它能够通过负载均衡的方式来进行处理和消费。 也是说后续的这些处理,希望它能够来进行,把整个集群都利用上,这时候可以设置链接,去把它改为负载均衡的这样一个类型。在这里面选中轮巡的方式来进行负载均衡,然后消息内容也让它进行加密,加密来进行发送。这时候会发现这里多一个图标,负载系统的图标。
连接完以后,再接下来是配置 convert up to Json。 来点击看。
在属性信息里面,首先第一个配置项,按照阶层来进行转换。还有第二个,是如果是单独的 Record 的话, 选择中选择 ture。,这样处理的话,他可以已经可以把的 Awhereo 转化成 Json。把 Awhereo 转换成 Json 以后,可以去修改或读取的这些数据库数据。但是这样的还不够,因为想要往 hdfs 写的时候,是想一条一条的去写一条一条的去处理。需要对 JOSN 的数组进行拆分,把它拆分成一条一条的规定,可以通过拖动添加一个处理器叫做 splite,进行拆分