实时同步 Mysql 到 Hive-2 | 学习笔记

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 快速学习实时同步 Mysql 到 Hive-2

开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):实时同步 Mysql 到 Hive-2学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/707/detail/12550


实时同步 Mysql 到 Hive-2


内容介绍:

一、操作

二、Windows 开启 binlog

三、实时获取 Mysql 变更

四、根据条件路由

五、提取关键属性

六、写入 Hive


一、操作

首先开启 Mysql 的 binlog 日志,Capture change Mysql 处理器,是监听 mysql 实施的变更,实时的事件信息,要想做到实时的监听,就要必须提前开启 MySQL bin log 日志。

1.开始 Mysql 的 binlog 日志

Mysql 的版本号要求5.7

2.登录 Mysql 查看日志状态

# mysq1 -u root -p123456(登录mysql)mysql> show variables ike '%1og_ bin% ;(查看日志状态)

image.png

如果显示 off,即没有打开;如果显示 on,即已经打开。Sql 语句可以在 linux 客户端上去执行,也可以在 window 上面去执行。

3.退出 MySQL 登录

Mysql>exit

4.Linux 开始 binlog

编辑配置文件

vi /etc/my.cnf(linux 直接在 etc 目录下)行尾加上 server_id = 1

1og- bin = mysql-bin(用别的文件名也可以,一般都是使用 mysql-bin)

bin1og. format = row

server-id :表示 mysql 单个结点的 id 标识,单个节点可以随意写,多个节点不能重复,log bin 指定 binlog 日志文件的名字为 mysql-bin,以及其存储路径。

5.重启服务

两种重启方式,主要看 linux 支持哪一种方式,重启以后再去登录执行 sql 状态就会显示为 on,说明 log 日志成功开启。

systemct1 restart mysq1d.service 或者 service mysq1d restart

重新登录查询开启状态

image.png


二、Windows 开启 binlog

1.修改配置文件

找到 mysq| 配置文件 my.ini 所在目录,它的位置一般在 C:\ProgramData\MySQL\MySQL Server 5.7。

注意目录不是 C:\ Program Files \MySQL\MySQL Server 5.7.

server_id = 1

Log_bin = mysq1-bin

bin1og_ format = row

加上三行代码即可生效。

image.png

image.png

2.通过 net stop 重启 Mysql 服务,也可以在服务界面进行重启

image.png

3.执行 sql 语句,查询开启状态,如果显示为 on 代表开启成功

image.png

开启 mysqlbinlog日志以后,还需要开启 MySQL 远程访问权限。

4.开启 Mysql 远程访问权限

如果不开启 MySQL 远程访问权限,只限于本地访问的话,不同的 linux 虚拟机,去读取 MySQL,或者读取WINDOWS 的 MySQL,就会因为 MySQL 自身的权限问题,而导致失败,所以要通过这语句来开启访问权限,开启的是用户远程访问的权限,赋予了权限以后,最后用 flash frivileges 更新权限,提交生效。GRANT ALL PRIVILEGES ON ** TO 'root'@'%' IDENTIFIED BY'

123456' WITH GRANT OPTION;

FLUSH PRIVILEGES;


三、实时获取 Mysql 变更

1.创建处理器组

组名:MySQLTOHive_Timely,进行实时同步。

2.创建 CaptureChangeMysql

在创建 MySQL 时,需要先有一个缓存服务,所以先创建缓存服务。缓存服务涉及到服务端和客户端,先去创建服务端再去创建客户端,客户端连接服务端。CaptureChangeMySQL 的配置中需要DistributedMapCacheClientService. DistributedMapCacheServer 处理器,一并创建。

3.配置 DistributedMapCacheServer

image.png

DistributedMapCacheServer 为服务端,服务端的配置项都不需要修改,直接使用默认配置即可。

4.配置 DistributedMapCacheClientService

image.png

客户端第一个配置项,要把它设置为 nifi 的服务器地址

5.启动 Cache 服务和客户端

在模拟的集群模式下,因为三台服务都在同一台主机,所以会存在端口冲突的问题,但是并不影响使用。因为三台服务只要有一个节点的缓存服务启动正常就可以使用。

出现错误的原因是因为现在的集群是在单台主机下午模拟,都在同一个主机上,端口号都一样,所以会出现端口号冲突的问题。

image.png

6.配置 CaptureChangeMySQL

image.pngMysQL Hosts = 192.168.52.6:3306

MysQL Driver class Name = com.mysq1.jdbc.Driver

MysQLDriverLocation(s)=/export/down1oad/jars/mysq1-connector-java-5.1.40.jarusername = root

Password = 123456

Include Begin/commit Events = true

Include DDL Events = true

刚刚创建好的缓存服务,在此时把它进行选择,然后是 MySQL 的 url 地址以及驱动和驱动程序、提供夹包位置,都设置完成以后进行保存。

7.启动 CaptureChangeMySQL

image.png

启动以后会出现一个错误信息,有一个链接使用了,相同的 server ID,是因为监听处理器是在三个季群上面全部都有的。启动以后三个集群上面节点都会同时去进行监听,这三个监听的处理器信息是完全一样,就会造成 server ID冲突的问题。可以运用之前学过的知识来解决以上问题。

问题解决以后就可以正常运行,还有输出的 flow file 信息。

image.png

创建成功以后就可以通过 RouteOnAttribute 处理器将采集到的数据请求进行分发路由。


四、根据条件路由

1.RouteOnAttribute 多线程消费

image.png

根据自己的服务器硬件配置,以及数据的更新速率,进行评估后填写。根据自定义的属性进行分发,里面涉及到多线程处理,接收到数据以后,会通过三个线程并行进行数据的处理和分发。

2.NIFI 表达式

后面会使用 equals 语句进行判断,equals 主要是判断两个字符串是否相等,它直接比较两个 string 的值,参数就是传入的 string,返回的是布尔值,即 ture 或者是 false。${filename:equals('hello. txt ')}判断 filename 的值是否等于hello.text。

NiF i表达式官网: htp://ifiapache.org/docs/nifi-docs/html/expression-language-guide.html

之前已经了解过 NiFi 表达式语言,这里仅针对 equals 函数进行说明。

NiFi 表达式的 equals 函数

equals

说明: equals 函数使用非常广泛,它确定其主题是否等于另一个 String 值。 请注意,该 equals 函数直接比较两个String 值。注意不要将此函数与 matchs 函数混淆,后者会根据正则表达式评估其主题。

学科类型:任意

参数:

●value:用于比较 Subject 的值。必须与主题类型相同。

返回类型:布尔值

示例:我们可以使用表达式${filename:equals('hello. txt ')}检查FlowFile的文件名是否为“hello.txt",或者可以检查属性 he11o 的值是否等于属性的值filename: ${he11o:equa1s( ${filename} )}。

3.设置自定义属性

image.png

设置自定义属性,涉及到 equals 函数,通过 cdc.event.type 类型,判断它是不等于 insert,如果等于 insert,就把数据标识为 insert 类型。如果等于 update 就标示为 update 类型。其它的类型不需要处理,不需要处理就不用进行数据标记,把其它的数据类型都划分到不处理队列中。这里 cdc.event.type 和之前监听到的数据有关,监听的数据在输出的时候就有叫做 cdc.event.type 的内容,通过 NIFI 表达式获取到 cdc.event.type 的值,来判断这条处理信息到底是什么样的类型。通过 NIFI 表达式判断出来类型信息以后,用 route 进行路由。

4.运行并查看输出

{

"type": "insert" ,

"timestamp": 1582484253000,

"bin1og_ filename": "mysq1-bin .000005",

"binlog_ position": 375,

"database": "nifi_ test",

"table_ name": "nifi_hive_streaming" ,

"tab1e_ ,id": 108,

"columns": [

{

"id": 1,

"name": "id" ,

"column_type": 4,

"value": 7

},

{

"id": 2,

"name": "name",

"column_ type": 12,

"value": "testName5"

},

{

"id": 3,

"name": "day_time",

"co1umn_type": 91,

"value": "2020-02-24"

}

]

}

数据的类型是 Jason 格式的,Type 为 insert 或者是 update,不可能是 select,Delete 等等,只会路由这两种数据,在路由的过程当中,会包括数据库的信息和表的名字以及不同数据包括 ID、name 等等。接下来需要提取数据,转化为 SQL 插入到 HIVE 之中,如何提取 Jason 字符串中的值和对应的变量,这时候就用到了一个组件即evaluejasonPath,之前都是简单了解 evaluejasonPath,没有进行深入的了解,接下来了解 jason 表达式应如何使用。


五、提取关键属性

EvaluatelsonPath 等处理器在提取数据时,可以使用 JsonPath 表达式, 来灵活的获取信息。

1.JasonPath 表达式

(1)简介

类似于 XPath 在 xml 文档中的定位,jsonPath 表达式通常是用来路径检索或设置 son 的。XPath 主要是用来检索xml。

JsonPath 中的“根成员对象“始终称为$,无论是对象还是数组。

其表达式可以接受"dot-notation"和"bracket-notation"格式,例如

$.store. book [0].title. S['store']['book'][0]['title']

$为根节点,store 为变量,它的意思是根节点有一个名为 store,的对象还有一个顾客对象,book 是数组。取 book数组里面的第一个对象,再取它 title 的值,后面的表达式就不再是.而是[]的形式,.和[]的意思是一样的。

(2)操作符

符号

描述

$

查询的根节点对象,用于表示一个 json 数据, 可以是数组或对象

@

过滤器断言(filter predicate)处理的当前节点对象,类似于 java 中的 this 字段

*

通配符,可以表示一个名字或数字

..

可以理解为递归搜索,Deep scan. Available anywhere a name is required.

<name>

表示一个子节点

['<name>'(, '<name>')]

表示一个或多个子节点

[<number> (, <number>)]

获取数组下标,出现 number 就代表一定是数组对象。表示一个或多个数组下标

[start:end]

获取一个区间,数组片段,区间为[start,end),不包含end

[?(<expression>)]

进行搜索和过滤,可以组合使用,如果使用[?(<@expression>)]判断当前对象的某一个属性或者属性是否满足某一个条件。满足的话会把符合条件的数据进行返回。

过滤器表达式,表达式结果必须是 boolean

(3)函数

名称

描述

输出

Min()

获取数值类型数组的最小值

Double

max()

获取数值类型数组的最大值

Double

Avg()

获取数值类型数组的平均值

Double

stddev()

获取数值类型数组的标准差

Double

length()

获取数值类型数组的长度

Integer

(4)过滤器

过滤器在 Jason 里面是比较高级的用法,和写代码是类似的。过滤器是用于过滤数组的逻辑表达式,一个通常的表达式形如: [?(@.age> 18)],@是当前对象,总的意思是,在整个对象里面如果年龄>18,就把对象返回到数组对象之中,如果不满足这条件,就不返回,起到过滤的效果。 可以通过逻辑表达式&&或| |组合多个过滤器表达式,例如[(@.price < 10 && @.category == fiction)],先判断 price<10,加上一个且 Category=fiction,这就是两个判断组成的且判断。字符串必须用单引号或双引号包围,不包围就会被认定为是变量,会出现报错。例如

[?(@.color == 'blue')] or [?(@.color == "blue")].

操作符

描述

==

等于符号,但数字1不等于字符1(note that 1 is not equal to 11

!=

不等于符号

<

小于符号

<=

小于等于符号

>

大于符号

>=

大于等于符号

=-

判断是否符合正则表达式,例如[?(@.name =- /foo.*?/i]

in

所属符号,例如[?(@.size in ['S', 'M])]

nin

排除符号

size

size of left (array or string) should match right

empty

判空符号

以上是 JasonPath 的判断操作符。

(5)示例{

"store": {

"book": [

{

"category": "reference",

"author": "Nige1 Rees",

"title": "sayings of the Century" ,

"price": 8.95

{

"category": "fiction",

"author": "Evelyn waugh" ,

"title": "Sword of Honour",

"price": 12.99|

}

{

"category": "fiction",

"author": "Herman Me1ville" ,

"tit1e": "Moby Dick",

"isbn": "0-553-21311-3",

"price": 8.99

{

"category": "fiction"

,"author": "I。 R. R. Tolkien",

"tit1e": "The Lord of the Rings",

"isbn": "0-395-19395-8" ,

"price": 22.99

],

"bicycle": {

"color": "red"

"price": 19.95

}

}

"expensive":10

}

ason 对象里面又包含了子对象 store,store 下面包含了一个数组 book 以及对象 bicycle,book 数组里面包含了不同的子对象。

JasonPath(点击链接测试)

结果

$.store.book[*].author <br/>

或<br/>

$..author

获取 json 中 store 下 book 下的所有 author 值。[*]标 book  里面任意一个对象,获取 store 的 book 里面,所有对象的 author 字段,返回的肯定是一个数组。..的含义是递归搜索,搜索它的所有下级,只要下级包含 author 信息就会返回,如果在 store 里面也包含了 author 信息,二者是不相等的,只要包含author 就返回。使用$.store.book[*].author <br/>

会更加精确。

$.store.*显示所有叶子节点值

所有的东西,书籍和自行车

$.store..price

获取 json 中 store下所有 price 的值,返回数组之中

$.book[2]

获取 json 中 book 数组的第3个值

$..book[-2]

倒数的第二本书,没有-0的说法

$..book[,1]<br/>

或<br/>

$.book[:2]

前两本书

:代表最开始,相当于0的意思,2代表取两个

$..book[1:2]

从索引1 (包括)取到索引2 (排除)的所有图书,不包含第二个索引

$.book[-2:]

获取 json 中 book 数组的最后两个值

$.book[2:]

获取 json 中 book 数组的第3个到最后一个的区间值

$.book[?(@.isbn)]

@指的是 this,this 指的是 BOOK 下一级,获取 json中 book 数组中包含 isbn 的所有值

$store book[?(@.price < 10)]

获取 jason 中 book 数组中 price<10的所有值,最终返回的是数组

$.book[?(@.price <=$['expensive])]

获取 json 中 book 数组中 pricec=expensive 的所有值,expensive 相当于传递某一个变量,如果小于变量值,就代表成立。$是根节点,根节点里面的expensive 数据和 price 进行对比。

$.book[?(@.author =- /.*REES/i]

获取 json 中 book 数组中的作者以 REES 结尾的所有值(REES 不区分大小写)

$.*

逐层列出 json 中的所有值,层级由外到内,是一个完整的 Jason

$..book.length()

获取 json 中 book 数组的长度,book 数组一共有多少个

以上为 jasonpath 表达式的用法。

2.提取 json 属性到 Attribute

image.png

属性当中使用了 jason 表达式,jasonpath 表达式进行获取,$是根节点,$database 指的是数据库名称,Columns 下面第二个值得 value 就是 date_time,Columns 下面的第一个值的 value 就是 ID,Columns 下面第二个 value 值是 name,获取到不同的属性值,获取到这些关键信息以后,就可以把这些关键信息转变为 SQL 语句进行插入,运行以后就可以看到这些数据已经获取到了。

3.运行并查看输出

image.png

4、ReplaceText 转换 sql

配置 ReplaceText

image.png

在 Replacement value 当中写了一个sql语句,SQL 语句的 value 值用到了刚才提取到的变量信息。Replacement value = insert into myhive .nifi. _hive (id,name ,day_ time) values(S{id},'S{name}','${day_ time}'

通过这些变量,拼接成为SQL语句然后写入到hive。Database Connection URL = jdbc :hive2://192.168.52.120: 10000

Hive Confi guration Resources = /export/down oad/config/core-site . xml,/export/download/config/hdfs-site .xm1 ,/export/down1oad/config/hive-site .xm1


六、写入 Hive

1.创建 PutHiveQL

PutHiveQL 也需要连接池,和 mysql 是类似的,所以先要创建一个连接池对象。

2.创建配置 HiveConnectonPool

image.png


连接池的配置对象非常简单,和 mysql 是一致的,首先是 JDBC URL 连接地址,第二个是 HIVE 的配置文件,主要包含 hive-site、hdfs-site.xml 以及 core-site.xml 都要配置进来,否则会出现报错。配置完成后,记得启用HiveConnectonPool

3.PutHiveQL 关联 HiveConnectonPool

image.png

通过以上操作就可以写入 hive 了。写入完成后,通过观察 hive 表的数据确认程序是否能够正常运转。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
存储 SQL 关系型数据库
Mysql学习笔记(二):数据库命令行代码总结
这篇文章是关于MySQL数据库命令行操作的总结,包括登录、退出、查看时间与版本、数据库和数据表的基本操作(如创建、删除、查看)、数据的增删改查等。它还涉及了如何通过SQL语句进行条件查询、模糊查询、范围查询和限制查询,以及如何进行表结构的修改。这些内容对于初学者来说非常实用,是学习MySQL数据库管理的基础。
131 6
|
21天前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
1月前
|
SQL 关系型数据库 MySQL
Mysql学习笔记(三):fetchone(), fetchmany(), fetchall()详细总结
MySQL中用于数据检索的`fetchone()`, `fetchmany()`, `fetchall()`函数的功能、SQL语句示例和应用场景。
59 3
Mysql学习笔记(三):fetchone(), fetchmany(), fetchall()详细总结
|
1月前
|
SQL Ubuntu 关系型数据库
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
本文为MySQL学习笔记,介绍了数据库的基本概念,包括行、列、主键等,并解释了C/S和B/S架构以及SQL语言的分类。接着,指导如何在Windows和Ubuntu系统上安装MySQL,并提供了启动、停止和重启服务的命令。文章还涵盖了Navicat的使用,包括安装、登录和新建表格等步骤。最后,介绍了MySQL中的数据类型和字段约束,如主键、外键、非空和唯一等。
71 3
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
|
1月前
|
关系型数据库 MySQL 数据库
Mysql学习笔记(四):Python与Mysql交互--实现增删改查
如何使用Python与MySQL数据库进行交互,实现增删改查等基本操作的教程。
67 1
|
1月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
100 0
|
3月前
|
SQL druid Java
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
58 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
|
3月前
|
SQL Java 关系型数据库
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
135 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
|
3月前
|
SQL 关系型数据库 MySQL
MySQL学习笔记
这篇文章是一份关于MySQL数据库操作的学习笔记,涵盖了数据库的终端操作、数据类型、建表约束、事务处理以及SQL的连接查询等基础知识点。
|
3月前
|
SQL 关系型数据库 MySQL
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(一、MySQL超详细学习笔记)
39 6
下一篇
无影云桌面