Flink JDBC Connector:Flink 与数据库集成最佳实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
简介: Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。

简介:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。


作者:徐榜江(雪尽)

整理:陈政羽(Flink 社区志愿者)

摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。大纲如下:

  1. JDBC connector
  2. JDBC Catalog
  3. JDBC Dialect
  4. Demo

Tips:点击下方链接可查看作者原版 PPT 及分享视频:

https://flink-learning.org.cn/developers/flink-training-course3/

JDBC-Connector 的重构

JDBC Connector 在 Flink 1.11 版本发生了比较大的变化,我们先从以下几个 Feature 来具体了解一下 Flink 社区在这个版本上对 JDBC 所做的改进。

这个 issue 主要为 DataStream API 新增了 JdbcSink,对于使用 DataStream 编程的用户会更加方便地把数据写入到 JDBC;并且规范了一些命名规则,以前命名使用的是 JDBC 加上连接器名称,目前命名规范为 Jdbc+ 连接器名称

这个 issue 将 flink-jdbc 包名重命名为 flink-connector-jdbc,与 Flink 的其他 connector 统一,将所有接口和类从 org.apache.flink.java.io.jdbc(旧包)规范为新包路径 org.apache.flink.connector.jdbc(新包),通过这种重命名用户在对底层源代码的阅读上面会更加容易的理解和统一。

由于早期数据类型系统并不是很完善,导致了比较多的 Connector 在使用上会经常报数据类型相关的异常,例如 DECIMAL 精度类型,在以往的 Flink 1.10 版本中有可能出现下图问题:

12FL%[}%1WAU[9U}MB@ZPL5.png

基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面做了重构,目前数据精度方面的支持已经很完善了。

在 Flink 1.11 版本中,我们对 DDL 的 WITH 参数相对于 1.10 版本做了简化,从用户视角看上就是简化和规范了参数,如表格所示:

Old Key (Flink 1.10) New Key (Flink1.11)
connector.type connector.type
connector.url url
connector.table table-name
connector.driver driver
connector.username username
connector.password password
connector.read.partition.column scan.partition.column
connector.read.partition.num scan.partition.num
connector.read.partition.lower-bound scan.partition.lower-bound
connector.read.partition.upper-bound scan.partition.upper-bound
connector.read.fetch-size scan.fetch-size
connector.lookup.cache.max-rows lookup.cache.max-rows
connector.lookup.cache.ttl lookup.cache.ttl
connector.lookup.max-retries lookup.max-retries
connector.write.flush.max-rows sink.buffer-flush.max-rows
connector.write.flush.interval sink.buffer-flush.interval
connector.write.max-retries sink.max-retries

大家可以看到表格中有 3 个标红的地方,这个是相对于 1.10 有发生变化比较多的地方。这次 FLIP 希望进一步简化连接器属性,以便使属性更加简洁和可读,并更好地与 FLIP-107 协作。如果需要了解更多的 Connector 参数可以进一步参考官方文档和 FLIP-122 中提到的改变,这样有助于从旧版本迁移到新版本并了解参数的变化。

Flink 1.10 存在某些 Query 无法推断出主键导致无法进行 Upsert 更新操作(如下图所示错误)。所以在 FLIP-87 中为 Flink SQL 引入的 Primary Key 约束。Flink 的主键约束遵循 SQL 标准,主键约束分为 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否对数据进行校验。我们常见数据库的主键约束属于 PRIMARY KEY ENFORCED,会对数据进行校验。因为 Flink 并不持有数据,因此 Flink 支持的主键模式是 PRIMARY KEY NOT ENFORCED, 这意味着 Flink 不会校验数据,而是由用户确保主键的完整性。例如 HBase 里面对应的主键应该是 RowKey,在 MySQL 中对应的主键是在用户数据库的表中对应的主键。

(C8~4[YFI[Z_69(_3_[IX)I.png

JDBC Catalog

目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在关系数据库中的表,如果要在 Flink 中使用,用户需要手动写表的 DDL,一旦表的 Schema 发生改变,用户需要手动修改, 这是比较繁琐的事情。JDBC Catalog 提供了接口用于连接到各种关系型数据库,使得 Flink 能够自动检索表,不用用户手动输入和修改。目前 JDBC Catalog 内置目前实现了 Postgres Catalog。Postgres catalog 是一个 read-only (只读)的 Catalog,只支持读取 Postgres 表,支持的功能比较有限。下面代码展示了目前 Postgres catalog 支持的 6 个功能:数据库是否存在、数据库列表、获取数据库、根据数据库名获取表列表、获得表、表是否存在。

// The supported methods by Postgres Catalog.
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)

如果需要支持其他 DB (如 MySQL),需要用户根据 FLIP-93 的 JdbcCatalog 接口实现对应不同的 JDBC Catalog。

JDBC Dialect

什么是 Dialect?

Dialect (方言)对各个数据库来说,Dialect 体现各个数据库的特性,比如语法、数据类型等。如果需要查看详细的差异,可以点击这里[6]查看详细差异。下面通过对比 MySQL 和 Postgres 的一些常见场景举例:

Dialect MySQL Postgres 场景描述
Grammar(语法) LIMIT 0,30 WITH LIMIT 30 OFFSET 0 分页
Data Type (数据类型) BINARY BYTEA,ARRAY 字段类型
Command (命令) show tables dt 查看所有表

在数据类型上面,Flink SQL 的数据类型目前映射规则如下:

MySQL type PostgreSQL type Flink SQL type
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT
UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT
UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT
UNSIGNED
DECIMAL(20, 0)

Flink 目前支持三种 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于测试,更多的类型映射可以点击下方链接前往官方文档查看。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping

如何保证 Dialect Upsert 的幂等性?

如果定义了主键,JDBC 写入时是能够保证 Upsert 语义的, 如果 DB 不支持 Upsert 语法,则会退化成 DELETE + INSERT 语义。Upsert query 是原子执行的,可以保证幂等性。这个在官方文档中也详细描述了更新失败或者存在故障时候如何做出的处理,下面的表格是不同的 DB 对应不同的 Upsert 语法:

Database Upsert Grammar
MySQL INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..

如何自定义 Dialect?

目前如果要实现自定义 Dialect (比如 SQL Server、Oracle 等), 需要用户自己实现对应 Dialect 修改源码并重新打包 flink-connector-jdbc。社区正在讨论提供一种插件化 dialect 的机制, 让用户可以不用修改源码打包就能实现自定义 Dialect,这个机制需要把 Dialect 接口暴露给用户。目前的 Dialect 接口不够清晰,没有考虑 DataStream API 的使用场景,也没有考虑到 一些复杂的 SQL 场景,所以这个接口目前不太稳定(后续版本会修改) 。

社区目前之所以没有把这个 API 开放给用户,是从用户使用的体验角度考虑,希望把这种顶级 API 设计得尽量稳定、简洁后再开放出来给用户使用,避免用户在后续 Flink 版本的迭代中多次修改代码。目前社区已经有相应的计划去做了,大家可以留意 FLINK-16833[7] 提出的 JDBCDialect 插件化设计。

实践 Demo

大家看完上述 Flink 1.11 在 JDBC 所做的改动后,大家可以尝试下面这个关于商品表 CDC 同步和 ETL 的小案例,有助于理解 JDBC Catalog 和 CDC 的同步机制。

环境与版本:Flink 1.11.1、Docker、Kafka 1.11.1、MySQL Driver 5.1.48、PostgreSQL Driver 42.2.14

流程如下:

  1. Flink standalone 环境准备并在提供的地址下载好对应的安装包和 connector jar。
  2. 测试数据准备,通过拉起容器运行已经打包好的镜像。其中 Kafka 中的 changelog 数据是通过 debezium connector 抓取的 MySQL orders表 的 binlog。
  3. 通过 SQL Client 编写 SQL 作业,分别创建 Flink 订单表,维表,用户表,产品表,并创建 Function UDF。从 PG Catalog 获取结果表信息之后,把作业提交至集群执行运行。
  4. 测试 CDC 数据同步和维表 join,通过新增订单、修改订单、删除订单、维表数据更新等一系列操作验证 CDC 在 Flink 上如何运行以及写入结果表。

X62NAT)SL$DY0X@_[L9H~X3.png

上图为业务流程整体图,项目 Demo 地址:

https://github.com/leonardBang/flink-sql-etl

问答环节

**1.Flink SQL Client 上面执行的 use default,是使用哪个 catlog 呢?

**

答:Flink 内部有一个内置 Catlog,它把 meta 数据存于内存中。在 SQL Client 上没有显式指定 Hive catlog 或者 jdbc catlog 时会使用内置的 Catalog,刚刚的案例给大家演示的是 Postgres Catalog,里面有结果表。在内置 Catlog 可以看到我们刚刚创建 Kafka 的表,MySQL 的维度表。

2.Flink MySQL DDL 连接 8 小时后就会自动断开的问题是否已经解决?

这个问题会在 1.12 版本解决此问题,目前 master 分支已经合并,具体可以参考以下地址,描述了相关问题的讨论和解决办法。

https://issues.apache.org/jira/browse/FLINK-16681

3.什么是 CDC?能大概讲下目前 Flink 支持的 CDC 吗?

通过 Change Data Capture 机制(CDC)来将外部系统的动态数据(如 Mysql BinLog、Kafka Compacted Topic)导入 Flink,以及将 Flink 的 Update/Retract 流写出到外部系统中是用户一直希望的功能。

Flink 1.11 实现了对 CDC 数据读取和写出的支持。目前 Flink 可以支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴开源同步工具) 两种 CDC 格式。Debezium 在国外用得比较多,Canal 在国内用得比较多,两者格式会有所区别,详细可以参考官方使用文档。

总结

本文从 JDBC Connector 的重构、数据精度、主键约束、命名规范等方面详细介绍,分享了社区目前实现的 Postgres Catalog 功能点;介绍了 Flink 如何实现 JDBC Dialect 的统一以及目前社区对 Dialect 做的一些计划;最后的实践 Demo 环节演示了通过 SQL Client 进行维表 JOIN 和 ETL 操作以及解答了大家在实际生产中所遇到的问题,希望对大家进一步了解 Flink CDC 新功能有所帮助。

参考链接:

[1]https://issues.apache.org/jira/browse/FLINK-15782

[2]https://issues.apache.org/jira/browse/FLINK-17537

[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

[4]https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API

[6]https://www.postgresqltutorial.com/postgresql-vs-mysql/

[7]https://issues.apache.org/jira/browse/FLINK-16833

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
554 0
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
561 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
7月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1596 45
|
7月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
626 12
Flink CDC YAML:面向数据集成的 API 设计
|
6月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
237 6
|
6月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
170 5
|
10月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
693 61
|
10月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
483 9
|
3月前
|
人工智能 运维 关系型数据库
数据库运维:mysql 数据库迁移方法-mysqldump
本文介绍了MySQL数据库迁移的方法与技巧,重点探讨了数据量大小对迁移方式的影响。对于10GB以下的小型数据库,推荐使用mysqldump进行逻辑导出和source导入;10GB以上可考虑mydumper与myloader工具;100GB以上则建议物理迁移。文中还提供了统计数据库及表空间大小的SQL语句,并讲解了如何使用mysqldump导出存储过程、函数和数据结构。通过结合实际应用场景选择合适的工具与方法,可实现高效的数据迁移。
560 1
|
4月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!

热门文章

最新文章