实时同步 Mysql 到 Hive-1 | 学习笔记

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 Tair(兼容Redis),内存型 2GB
简介: 快速学习实时同步 Mysql 到 Hive-1

开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):实时同步 Mysql 到 Hive-1学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/707/detail/12549


实时同步 Mysql 到 Hive-1

内容介绍:

一、处理器流程

二、CaptureChangeMySQL

三、DistributeMapCacheServer

四、DistributedMapCacheClientService

五、RouteOnAttribute

六、PutHiveQL

七、HiveConnectionPool


此案例和之前案例的区别是:之前的案例是临线的同步,而这案例是实时同步,之前是同步到 HDFS 而这是直接写入到 hive 当中去。


一、处理器流程

首先开启 binlog 的日志,开始以后就可以通过 Capture changemysql 处理器,进行监听,所有 mysql 的操作,它全部都可以监听到,监听到以后可以根据某一种类型或者是某几种类型的操作去进行特殊的处理。这时候就可以使用RouteOnAttribute,来根据不同的操作类型进行不同的路由分发,然后再使用 evaluatejasonpath 提取所需要的属性信息,evaluatejasonpath,可能会使用到 jasonpath 表达式,在案例当中将会讲解。使用 replacetext 来把获取到的属性利用起来拼接成所需要的 SQL 语句,最后是通过 SQL 语句把数据写入到 Hive 数据库当中。

1.准备工作

先创建好 Mysql 表,如果之前只想过 mysql 的 sql 文件,那么表是已经创建好了的,还要创建一个 hive 表,Hive 表和 MySQL 表是相对应的,它们的字段都是一致的。

Mysql 建库建表:

create table nifi_test.nifi_hive

(

Id  int auto_increment

primary key,

namevarchar(64) null,

day_time date   null

);

Hive 建表:

CREATE TABLE myhive.nifi_hive(id int , name string,day_time string)

STORED AS ORC

TBLPROPERTIES( 'transactiona1'='true ');

替换 Hive 支持 nar 包:

上传文件 NiFi\资料\nifi 安装包\nifi-hive-nar-1.9.2.nar,将其替换到 NiFi 服务的 lib 目录下,并重启 NiFi 集群。

nifi-hive-nar-1.9.2.nar 是新的版本,它所支持的 hive 已经变成 hive2版本了,不再支持 Hive1版本,这时候如果不去替换 nar 包,直接去操作的话,是会出现问题的,它会提示有一些 lib 不存在,把 nar 包替换完以后,才可以正常连接到 hive 数据库。

替换的方法是 nar 包,在资料 nifi 安装包文件下,把它上传到 nifi 的 lib 目录下,有一个名为 lib 的文件夹,重新启动 NiFi 集群。接下来把过程中涉及到的处理器做一下了解。


二、CaptureChangeMySQL

1.描述

它主要的目的是实时监测 MySQL 的事件,从 MySQL 数据库检索更改数据捕获(CDC)事件。CDC 事件包括 INSERT,UPDATE,DELETE 操作。事件将作为单独的流文件输出,并按操作发生的时间排序。

2.属性配置

在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值,属性是否支持NiFi表达式语言以及属性是否被视为“敏感",这意味着将加密其值。在敏感属性中输入值之前,请确保nifi.properties 文件具有属性 nifi.sensitive.props.key 的条目。

名称

默认值

允许值

描述

MysQL Hosts

与 MySQL 群集中的节点相对应的主机名/端口条目的列表。条目应使用冒号(例如host1: port,host2: port 等)以逗号分隔。例如mysql.myhost.com:3306。该处理器将尝试按顺序连接到列表中的主机。如果一个节点发生故障并为集群启用了故障转移,则处理器将连接到活动节点(假定在此属性中指定了其主机条目。MySQL 连接的默认端口为3306。支持表达式语言: true (将为仅使用变量注册表进行评估)

MysQL Driver

class Name

MySQL 数据库驱动程序类的类名称支持表达式语言;true(仅使用变量注册表进行评估)

MySQL Driver

Location(s)

包含 MySQL 驱动程序 JAR 及其依赖项(如果有)的文件/文件夹和/或URL 的逗号分隔列表。例如,“/ var / tmp /mysql-connector-java-5.1.38-bin.jar”支持表达式语言:true(仅使用变量注册表进行评估)

Username

访问 MySQL 集群的用户名支持表达式语言: true(仅使用变量注册表进行评估)

Password

访问 MysQL 集群的密码敏感属性: true 支持表达式语言:true(仅使用变量注册表进行评估)

Server ID


连接到 MySQL 复制组的客户端实际上是一个简化的从属服务器(服务器),并且服务器 ID 值在整个复制组中必须是唯一的(即不同于任何主服务器或从属服务器使用的任何其它服务器 ID)。因此,每个CaptureChangeMySQL 实例在复制

组中必须具有唯一的服务器 ID。 如果未指定服务器 ID,则默认值为65535。支持表达式语言: true (仅使用变量注册表进行评估)

Database/Schema

Name Pattern

用于将数据库(或模式,取决于RDBMS 术语)与 CDC 事件列表进行匹配的正则表达式(regex) 。正则表达式必须与存储在 RDBMS中的数据库名称匹配。如果未设置该属性,则数据库名称将不会用于过滤CDC 事件。注意: DDL 事件(即使它们影响不同的数据库)也与会话用来执行 DDL 的数据库相关联。这意味着,如果与-个数据库建立了连接,但针对另一个数据库发出了DDL,则连接的数据库将是与指定模式匹配的数据库。

Table Name Pattern

用于影响影响匹配表的 CDC 事件的正则表达式(regex) 。

正则表达式必须与存储在数据库中的表名匹配。如果未设置该属性,则不会基于表名过滤任何事件。

Max Wait Time

30s

建立连接所允许的最长时间,零表示实际上没有限制。支持表达式语言: true (仅使用变量注册表进行评估)

Distributed Map

Cache Client

标识用于保留有关处理器所需的各种表,列等的信息的分布式映射缓存客户端控制器服务。如果未指定客户端,则生成的事件将不包括列类型或名称信息。

MySQL hosts 是 MySQL 集群的主机和端口号,它就是常说的 JDBC 连接 URL。MySQLdriverclassname 是驱动命名,MySQLdriverlocations 是夹包地址,之前的案例当中已经演示过直接可以把路径写入就可以。

username 是用户名,password 是密码,Distributed Map Cache Client 是缓存客户端,主要就是把实时读取的数据放到缓存当中去,然后通过后续的处理,不断地从缓存中读取数据,如果直接发送到 Dateup Flow,就会涉及到一个问题,CaptureChangeMySQL 可能会去存储当前读取到哪一个位置。比如监听到数据是今天12点钟的数据,如果说服务器停止运行了,到明天又启动的时候,这过程当中,需要重新上次时间点进行获 MySQL 数据的变更,但是如果没有缓存的话,可能无法记录下该从哪里读取,这时候要么重新去读取,要么从最新的数据去读取,所以需要一个缓存。使用 CacheCache 客户端首先要先去创建组件,接下来就要了解一下 cache 组件。


三、DistributeMapCacheServer

Cache 组件有两个,一个是 server 端,一个是 client 端,从名称可以看出来一个是 client service,一个是 server,顺序上一定是先有 server,然后客户端才能连接 service 进行操作。

1.描述

提供可通过套接字访问的映射(键/值) 缓存。类似于 redis,与该服务的交互通常是通过 DitributedMapCacheClient 服务完成的。

2.属性配置

在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值。

名称

默认值

允许值

描述

端口

4557

侦听传入连接的端口

最大缓存条目

10000

缓存可以容纳的最大缓存条目数

驱逐策略

最少使用

最少使用最近最少使用先进先出

确定应使用哪种策略从缓存中逐出值以为条目腾出空间

持久性目录

如果指定,则缓存将保留在给定目录中; 如果未指定,则高速缓存将仅在内存中

SSL 上下文服务

StandardRestrictedSSLContextService

如果指定,此服务将用于创建SSL上下文,以用于保护通信;如果未指定,则通信将安全

端口号和最大缓存条目都不需要进行修改,创建一整个集群只需要创建一个 cacheserver,不需要出现一个处理器组创建一个十个处理器组创建十个的情况,十个处理器组可以使用一个 cacheserver,和 redis 服务是一样的,redis 服务是可以共用的,如果数据量很大,可以去创建多个。如果太大的话,不太适合使用本地的划分,缓存是 nifi 本地缓存,如果需要的缓存非常大,建议使用 redis 提供足够的缓存空间。有了 cacheserver 以后,就要通过 client 进行连接。连接的时候需要肯定IP和端口号。


四、DistributedMapCacheClientService

1.描述

提供与 DistributedMapCacheSenyer 通信的功能。可以使用它来在 NiFi 群集中的节点之间共享地图

2.属性配置

在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值。

名称

默认值

描述

服务器主机名(IP 地址)

一般来说填写 nifi 服务器的 IP 即可,本机可以写 local host

运行 DistributedMapCacheServer服务的服务器的名称

服务器端口

4557(端口号要保持和 server一致,直接使用即可,无需变更)

与 DistributedMapCacheServer 服务通信时将使用的远程服务器上的端口

SSL 上下文服务

如果指定,则表示用于与远程服务器通信的 SSL 上下文服务。如果未指定,通讯将不会被加密<br>

StandardSSLContextService<br/>

StandardRestrictedSSLContextService

通讯超时

30秒

指定在无法发送或接收数据时确定存在通信故障之前与远程服务器通信之前要等待多长时间

两个 cache 服务和 cache 服务创建好以后,就可以在 Capturechangemysql 处理器之中进行配置,去使用 cache client 操纵缓存信息,这三个是配合使用的。


五、RouteOnAttribute

1.描述

该处理器使用属性表达式语言,根据流文件的属性去计算然后进行路由。该处理器往往用于判断逻辑。

2.属性配置

在下面的列表中,必需属性的名称以粗体显示。任何其它属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值), 以及属性是否支持表达式语言。

属性配置

默认值

可选值

描述

Routing Strategy

Route to Property name

Route to Property nameRoute to 'matched' if all

matchRoute to 'matched' if any matches

指定如何确定在计算表达式语言时使用哪个关系

3.动态属性

该处理器允许用户指定属性的名称和值

属性名称

属性值

描述

用户自由定义的属性名称(Relationship Name)

用户自由定义的属性值

(Atribute Expression

Language)

将其属性与动态属性值中指定的属性表达式语言相匹配的流文件路由到动态属性键中指定的关系.支持表达式语言:true

4.连接关系

名称

描述

Unmatched

不匹配任何用户定义表达式的流文件将被路由到这里

5.自定义连接关系

可以根据用户配置处理器的方式创建动态连接关系。

Name

Description

动态属性的属性名

匹配动态属性的属性表达式语言的流文件

主要是根据刘文件的属性,做一个路由分发工作,作用是根据事件的内容,和不同的信息,去进行关联关系的分发,处理器可以添加,自定义的属性,也就是动态属性,属性来源于 flow file 的属性,Flow file 通过 nifi 表达式直接进行调用,添加到处理器的自定义属性当中,然后根据表达式,实际的值进行不同的路由,实际上就是通过 flow file 的内容进行分发。


六、PutHiveQL

1.描述

主要是向 HIVE 中进行插入或者更新操作。执行 HiveQL DDL/DML 命令(例如,UPDATE, INSERT) 。预期传入 File 的内容是要执行的 HiveQL 命令。HiveQL 命令可以使用?转义参数。在这种情况下,要使用的参数必须作为 FlowFile 属性存在,命名约定为 hiveql.args.N.type和 hiveql.args.N.value,其中 N 是一个正整数。hiveql.args.N.type 应该是指示 JDBC 类型的数字。FlowFile 的内容应采用 UTF-8格式。因为新版本不再支持 hive 1.1,所以要提前把 nar 包更新上去。

2.属性配置

在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值。

名称

默认值<br>

描述

**Hive Database Connection Pooling Service**<br/>

I Hive Controller Service,用于获取与 Hive 数据库的连接

Batch Size

100

在单个事务中放入数据库的首选FlowFiles 数

指定记录数据的字符集。

Character Set

UTF-8

指定记录数据的字符集。

Statement

Delimiter

语句分隔符,用于在多语句脚本中分隔 SQL 语句

Rollback On Failure

False

指定如何处理错误。默认情况下(false) ,如果在处理 FlowFile 时发生错误,则 FlowFile 将根据错误类型路由到失败”或“重试”关系,处理器可以继续下一个

FlowFile。相反,您可能想回滚当前已处理的 FlowFile,并立即停止进一步的外

理。在这种情况下,您可以通过启用此"回滚失败”属性来实现。如果启用,失败的 FlowFiles 将保留在输入关系中,而不会受到惩罚,并会反复处理,直到成功处理或通过其它方式将其删除为止。重要的是要设置足够的"有效期限”,以免重试次数过多。

操纵 hive 数据库需要有连接池,和 my circle 是一样的,可以选择去创建 hive 连接池,配置 hive 信息。其它的配置项都可以直接使用默认值,通过这种方式就可以通过接收之前,传送过来的,填写好的 SQL 语句,直接去操作 hive,因为用过好多次 replacetext 处理器,所以大纲里面没有再提及。PuthiveQL 要使用连接池,需要使用hiveconnectionpool。


七、HiveConnectionPool

1.描述

为 Apache Hive 提供数据库连接池服务。可以从池中请求连接,使用后返回连接。

2.属性配置

在下面的列表中,必需属性的名称以粗体显示。其它任何属性(非粗体)均视为可选。该表还指示任何默认值,属性是否支持 NiFi 表达式语言。

DatebaseConnectionURL 配置的是 JDBC 的地址链接,类似于 mysql,配置端口号、IP 地址,HiveConfigurationResources 主要配置 hive 的信息,文件提及的主要是 hive-site.xml,实际中配置的是一定要把hadoop 的链接也配置进来。Hive 中一般没有设置账户密码,如果设置了就可以填写进去。

名称

默认值<br>

描述

** Database

Connection URL

用于连接数据库的数据库连接URL。可能包含数据库系统名称,主机,端口,数据库名称和一些参数。数据库连接 URL 的确切语法由Hive 文档指定。例如,当连接到安全的 Hive 服务器时,通常将服务器主体作为连接参数包括在内。支持表达式语言: true (仅使用变量注册表进行评估)

Hive Configuration Resources

包含 Hive 配置(例如,hive-site.xml) 的文件或文件的逗号分隔列表。否则,Hadoop 将在类路径中索”hive-site.xml"文件,或恢复为默认配置。请注意,例如要启用Kerberos 身份验证,必须在配置文件中设置适当的属性。请参阅Hive 文档以获取更多详细信息。支持表达式语言: true (仅使用变量注册表进

行评估)

Database

User

数据库用户名支持表达式语言: true (仅使用变量注册表进行评估)

Password

数据库用户的密码敏感属性: true 支持表达式语言: true (仅使用变量注册表

进行评估)

Max Wait

Time

500毫秒

池在失败之前将等待(如果没有可用连接时)返回连接的最大时间,或者无限期

等待-1。支持表达式语言: true (仅使用变量注册表进行评估)

Max Total

Connections

8

可以同时从该池分配的活动连接的最大数量,或者为无限制的最大数量。支持表达式语言: true (仅使用变量注册表进行评估)

Validation query

验证查询,用于在返回连接之前对其进行验证。当借用的连接无效时,它将被丢弃并返回新的有效连接。注意:使用验证可能会降低性能。支持表达式语言:true (仅使用变量注册表进行评估)

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
15天前
|
存储 SQL 关系型数据库
Mysql学习笔记(二):数据库命令行代码总结
这篇文章是关于MySQL数据库命令行操作的总结,包括登录、退出、查看时间与版本、数据库和数据表的基本操作(如创建、删除、查看)、数据的增删改查等。它还涉及了如何通过SQL语句进行条件查询、模糊查询、范围查询和限制查询,以及如何进行表结构的修改。这些内容对于初学者来说非常实用,是学习MySQL数据库管理的基础。
64 6
|
15天前
|
SQL 关系型数据库 MySQL
Mysql学习笔记(三):fetchone(), fetchmany(), fetchall()详细总结
MySQL中用于数据检索的`fetchone()`, `fetchmany()`, `fetchall()`函数的功能、SQL语句示例和应用场景。
33 3
Mysql学习笔记(三):fetchone(), fetchmany(), fetchall()详细总结
|
15天前
|
SQL Ubuntu 关系型数据库
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
本文为MySQL学习笔记,介绍了数据库的基本概念,包括行、列、主键等,并解释了C/S和B/S架构以及SQL语言的分类。接着,指导如何在Windows和Ubuntu系统上安装MySQL,并提供了启动、停止和重启服务的命令。文章还涵盖了Navicat的使用,包括安装、登录和新建表格等步骤。最后,介绍了MySQL中的数据类型和字段约束,如主键、外键、非空和唯一等。
49 3
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
|
15天前
|
关系型数据库 MySQL 数据库
Mysql学习笔记(四):Python与Mysql交互--实现增删改查
如何使用Python与MySQL数据库进行交互,实现增删改查等基本操作的教程。
41 1
|
20天前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
40 3
|
20天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
58 0
|
24天前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
58 0
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之MySQL到MySOL的批量实时同步该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
48 3
Mysql(4)—数据库索引