Flink JDBC Connector:Flink 与数据库集成最佳实践

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。

简介:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。


作者:徐榜江(雪尽)

整理:陈政羽(Flink 社区志愿者)

摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contributor,阿里巴巴高级开发工程师徐榜江(雪尽)分享,主要介绍 Flink 1.11 JDBC Connector 的最佳实践。大纲如下:

  1. JDBC connector
  2. JDBC Catalog
  3. JDBC Dialect
  4. Demo

Tips:点击下方链接可查看作者原版 PPT 及分享视频:

https://flink-learning.org.cn/developers/flink-training-course3/

JDBC-Connector 的重构

JDBC Connector 在 Flink 1.11 版本发生了比较大的变化,我们先从以下几个 Feature 来具体了解一下 Flink 社区在这个版本上对 JDBC 所做的改进。

这个 issue 主要为 DataStream API 新增了 JdbcSink,对于使用 DataStream 编程的用户会更加方便地把数据写入到 JDBC;并且规范了一些命名规则,以前命名使用的是 JDBC 加上连接器名称,目前命名规范为 Jdbc+ 连接器名称

这个 issue 将 flink-jdbc 包名重命名为 flink-connector-jdbc,与 Flink 的其他 connector 统一,将所有接口和类从 org.apache.flink.java.io.jdbc(旧包)规范为新包路径 org.apache.flink.connector.jdbc(新包),通过这种重命名用户在对底层源代码的阅读上面会更加容易的理解和统一。

由于早期数据类型系统并不是很完善,导致了比较多的 Connector 在使用上会经常报数据类型相关的异常,例如 DECIMAL 精度类型,在以往的 Flink 1.10 版本中有可能出现下图问题:

12FL%[}%1WAU[9U}MB@ZPL5.png

基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面做了重构,目前数据精度方面的支持已经很完善了。

在 Flink 1.11 版本中,我们对 DDL 的 WITH 参数相对于 1.10 版本做了简化,从用户视角看上就是简化和规范了参数,如表格所示:

Old Key (Flink 1.10) New Key (Flink1.11)
connector.type connector.type
connector.url url
connector.table table-name
connector.driver driver
connector.username username
connector.password password
connector.read.partition.column scan.partition.column
connector.read.partition.num scan.partition.num
connector.read.partition.lower-bound scan.partition.lower-bound
connector.read.partition.upper-bound scan.partition.upper-bound
connector.read.fetch-size scan.fetch-size
connector.lookup.cache.max-rows lookup.cache.max-rows
connector.lookup.cache.ttl lookup.cache.ttl
connector.lookup.max-retries lookup.max-retries
connector.write.flush.max-rows sink.buffer-flush.max-rows
connector.write.flush.interval sink.buffer-flush.interval
connector.write.max-retries sink.max-retries

大家可以看到表格中有 3 个标红的地方,这个是相对于 1.10 有发生变化比较多的地方。这次 FLIP 希望进一步简化连接器属性,以便使属性更加简洁和可读,并更好地与 FLIP-107 协作。如果需要了解更多的 Connector 参数可以进一步参考官方文档和 FLIP-122 中提到的改变,这样有助于从旧版本迁移到新版本并了解参数的变化。

Flink 1.10 存在某些 Query 无法推断出主键导致无法进行 Upsert 更新操作(如下图所示错误)。所以在 FLIP-87 中为 Flink SQL 引入的 Primary Key 约束。Flink 的主键约束遵循 SQL 标准,主键约束分为 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否对数据进行校验。我们常见数据库的主键约束属于 PRIMARY KEY ENFORCED,会对数据进行校验。因为 Flink 并不持有数据,因此 Flink 支持的主键模式是 PRIMARY KEY NOT ENFORCED, 这意味着 Flink 不会校验数据,而是由用户确保主键的完整性。例如 HBase 里面对应的主键应该是 RowKey,在 MySQL 中对应的主键是在用户数据库的表中对应的主键。

(C8~4[YFI[Z_69(_3_[IX)I.png

JDBC Catalog

目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在关系数据库中的表,如果要在 Flink 中使用,用户需要手动写表的 DDL,一旦表的 Schema 发生改变,用户需要手动修改, 这是比较繁琐的事情。JDBC Catalog 提供了接口用于连接到各种关系型数据库,使得 Flink 能够自动检索表,不用用户手动输入和修改。目前 JDBC Catalog 内置目前实现了 Postgres Catalog。Postgres catalog 是一个 read-only (只读)的 Catalog,只支持读取 Postgres 表,支持的功能比较有限。下面代码展示了目前 Postgres catalog 支持的 6 个功能:数据库是否存在、数据库列表、获取数据库、根据数据库名获取表列表、获得表、表是否存在。

// The supported methods by Postgres Catalog.
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)

如果需要支持其他 DB (如 MySQL),需要用户根据 FLIP-93 的 JdbcCatalog 接口实现对应不同的 JDBC Catalog。

JDBC Dialect

什么是 Dialect?

Dialect (方言)对各个数据库来说,Dialect 体现各个数据库的特性,比如语法、数据类型等。如果需要查看详细的差异,可以点击这里[6]查看详细差异。下面通过对比 MySQL 和 Postgres 的一些常见场景举例:

Dialect MySQL Postgres 场景描述
Grammar(语法) LIMIT 0,30 WITH LIMIT 30 OFFSET 0 分页
Data Type (数据类型) BINARY BYTEA,ARRAY 字段类型
Command (命令) show tables dt 查看所有表

在数据类型上面,Flink SQL 的数据类型目前映射规则如下:

MySQL type PostgreSQL type Flink SQL type
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT
UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT
UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT
UNSIGNED
DECIMAL(20, 0)

Flink 目前支持三种 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于测试,更多的类型映射可以点击下方链接前往官方文档查看。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping

如何保证 Dialect Upsert 的幂等性?

如果定义了主键,JDBC 写入时是能够保证 Upsert 语义的, 如果 DB 不支持 Upsert 语法,则会退化成 DELETE + INSERT 语义。Upsert query 是原子执行的,可以保证幂等性。这个在官方文档中也详细描述了更新失败或者存在故障时候如何做出的处理,下面的表格是不同的 DB 对应不同的 Upsert 语法:

Database Upsert Grammar
MySQL INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..

如何自定义 Dialect?

目前如果要实现自定义 Dialect (比如 SQL Server、Oracle 等), 需要用户自己实现对应 Dialect 修改源码并重新打包 flink-connector-jdbc。社区正在讨论提供一种插件化 dialect 的机制, 让用户可以不用修改源码打包就能实现自定义 Dialect,这个机制需要把 Dialect 接口暴露给用户。目前的 Dialect 接口不够清晰,没有考虑 DataStream API 的使用场景,也没有考虑到 一些复杂的 SQL 场景,所以这个接口目前不太稳定(后续版本会修改) 。

社区目前之所以没有把这个 API 开放给用户,是从用户使用的体验角度考虑,希望把这种顶级 API 设计得尽量稳定、简洁后再开放出来给用户使用,避免用户在后续 Flink 版本的迭代中多次修改代码。目前社区已经有相应的计划去做了,大家可以留意 FLINK-16833[7] 提出的 JDBCDialect 插件化设计。

实践 Demo

大家看完上述 Flink 1.11 在 JDBC 所做的改动后,大家可以尝试下面这个关于商品表 CDC 同步和 ETL 的小案例,有助于理解 JDBC Catalog 和 CDC 的同步机制。

环境与版本:Flink 1.11.1、Docker、Kafka 1.11.1、MySQL Driver 5.1.48、PostgreSQL Driver 42.2.14

流程如下:

  1. Flink standalone 环境准备并在提供的地址下载好对应的安装包和 connector jar。
  2. 测试数据准备,通过拉起容器运行已经打包好的镜像。其中 Kafka 中的 changelog 数据是通过 debezium connector 抓取的 MySQL orders表 的 binlog。
  3. 通过 SQL Client 编写 SQL 作业,分别创建 Flink 订单表,维表,用户表,产品表,并创建 Function UDF。从 PG Catalog 获取结果表信息之后,把作业提交至集群执行运行。
  4. 测试 CDC 数据同步和维表 join,通过新增订单、修改订单、删除订单、维表数据更新等一系列操作验证 CDC 在 Flink 上如何运行以及写入结果表。

X62NAT)SL$DY0X@_[L9H~X3.png

上图为业务流程整体图,项目 Demo 地址:

https://github.com/leonardBang/flink-sql-etl

问答环节

**1.Flink SQL Client 上面执行的 use default,是使用哪个 catlog 呢?

**

答:Flink 内部有一个内置 Catlog,它把 meta 数据存于内存中。在 SQL Client 上没有显式指定 Hive catlog 或者 jdbc catlog 时会使用内置的 Catalog,刚刚的案例给大家演示的是 Postgres Catalog,里面有结果表。在内置 Catlog 可以看到我们刚刚创建 Kafka 的表,MySQL 的维度表。

2.Flink MySQL DDL 连接 8 小时后就会自动断开的问题是否已经解决?

这个问题会在 1.12 版本解决此问题,目前 master 分支已经合并,具体可以参考以下地址,描述了相关问题的讨论和解决办法。

https://issues.apache.org/jira/browse/FLINK-16681

3.什么是 CDC?能大概讲下目前 Flink 支持的 CDC 吗?

通过 Change Data Capture 机制(CDC)来将外部系统的动态数据(如 Mysql BinLog、Kafka Compacted Topic)导入 Flink,以及将 Flink 的 Update/Retract 流写出到外部系统中是用户一直希望的功能。

Flink 1.11 实现了对 CDC 数据读取和写出的支持。目前 Flink 可以支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴开源同步工具) 两种 CDC 格式。Debezium 在国外用得比较多,Canal 在国内用得比较多,两者格式会有所区别,详细可以参考官方使用文档。

总结

本文从 JDBC Connector 的重构、数据精度、主键约束、命名规范等方面详细介绍,分享了社区目前实现的 Postgres Catalog 功能点;介绍了 Flink 如何实现 JDBC Dialect 的统一以及目前社区对 Dialect 做的一些计划;最后的实践 Demo 环节演示了通过 SQL Client 进行维表 JOIN 和 ETL 操作以及解答了大家在实际生产中所遇到的问题,希望对大家进一步了解 Flink CDC 新功能有所帮助。

参考链接:

[1]https://issues.apache.org/jira/browse/FLINK-15782

[2]https://issues.apache.org/jira/browse/FLINK-17537

[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

[4]https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API

[6]https://www.postgresqltutorial.com/postgresql-vs-mysql/

[7]https://issues.apache.org/jira/browse/FLINK-16833

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
10天前
|
SQL 开发框架 .NET
ASP.NET连接SQL数据库:详细步骤与最佳实践指南ali01n.xinmi1009fan.com
随着Web开发技术的不断进步,ASP.NET已成为一种非常流行的Web应用程序开发框架。在ASP.NET项目中,我们经常需要与数据库进行交互,特别是SQL数据库。本文将详细介绍如何在ASP.NET项目中连接SQL数据库,并提供最佳实践指南以确保开发过程的稳定性和效率。一、准备工作在开始之前,请确保您
52 3
|
1月前
|
消息中间件 缓存 监控
优化微服务架构中的数据库访问:策略与最佳实践
在微服务架构中,数据库访问的效率直接影响到系统的性能和可扩展性。本文探讨了优化微服务架构中数据库访问的策略与最佳实践,包括数据分片、缓存策略、异步处理和服务间通信优化。通过具体的技术方案和实例分析,提供了一系列实用的建议,以帮助开发团队提升微服务系统的响应速度和稳定性。
|
3天前
|
存储 JSON Ubuntu
时序数据库 TDengine 支持集成开源的物联网平台 ThingsBoard
本文介绍了如何结合 Thingsboard 和 TDengine 实现设备管理和数据存储。Thingsboard 中的“设备配置”与 TDengine 中的超级表相对应,每个设备对应一个子表。通过创建设备配置和设备,实现数据的自动存储和管理。具体操作包括创建设备配置、添加设备、写入数据,并展示了车辆实时定位追踪和车队维护预警两个应用场景。
20 3
|
15天前
|
关系型数据库 MySQL 数据库
MySQL数据库:基础概念、应用与最佳实践
一、引言随着互联网技术的快速发展,数据库管理系统在现代信息系统中扮演着核心角色。在众多数据库管理系统中,MySQL以其开源、稳定、可靠以及跨平台的特性受到了广泛的关注和应用。本文将详细介绍MySQL数据库的基本概念、特性、应用领域以及最佳实践,帮助读者更好地理解和应用MySQL数据库。二、MySQL
37 5
|
14天前
|
SQL 数据管理 数据库
文章初学者指南:SQL新建数据库详细步骤与最佳实践
引言:在当今数字化的世界,数据库管理已经成为信息技术领域中不可或缺的一部分。作为广泛使用的数据库管理系统,SQL已经成为数据管理和信息检索的标准语言。本文将详细介绍如何使用SQL新建数据库,包括准备工作、具体步骤和最佳实践,帮助初学者快速上手。一、准备工作在开始新建数据库之前,你需要做好以下准备工作
74 3
|
18天前
|
安全 算法 Java
数据库信息/密码加盐加密 —— Java代码手写+集成两种方式,手把手教学!保证能用!
本文提供了在数据库中对密码等敏感信息进行加盐加密的详细教程,包括手写MD5加密算法和使用Spring Security的BCryptPasswordEncoder进行加密,并强调了使用BCryptPasswordEncoder时需要注意的Spring Security配置问题。
58 0
数据库信息/密码加盐加密 —— Java代码手写+集成两种方式,手把手教学!保证能用!
|
10天前
|
SQL 监控 测试技术
全面解析SQL数据库迁移:步骤、挑战与最佳实践a8u.0335pw.com
随着信息技术的快速发展,数据库迁移已成为企业和组织在IT领域经常面临的一项任务。数据库迁移涉及到数据的转移、转换和适应新环境的过程,特别是在使用SQL数据库时。本文将详细介绍SQL数据库迁移的过程,探讨其面临的挑战,并分享一些最佳实践。一、数据库迁移概述数据库迁移是指将数据库从一个环境迁移到另一个环
|
2月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
70 0
|
2月前
|
Java 数据库连接 数据库
AI 时代风起云涌,Hibernate 实体映射引领数据库高效之路,最佳实践与陷阱全解析!
【8月更文挑战第31天】Hibernate 是一款强大的 Java 持久化框架,可将 Java 对象映射到关系数据库表中。本文通过代码示例详细介绍了 Hibernate 实体映射的最佳实践,包括合理使用关联映射(如 `@OneToMany` 和 `@ManyToOne`)以及正确处理继承关系(如单表继承)。此外,还探讨了常见陷阱,例如循环依赖可能导致的无限递归问题,并提供了使用 `@JsonIgnore` 等注解来避免此类问题的方法。通过遵循这些最佳实践,可以显著提升开发效率和数据库操作性能。
79 0
|
2月前
|
Java 开发者 前端开发
Struts 2、Spring MVC、Play Framework 上演巅峰之战,Web 开发的未来何去何从?
【8月更文挑战第31天】在Web应用开发中,Struts 2框架因强大功能和灵活配置备受青睐,但开发者常遇配置错误、类型转换失败、标签属性设置不当及异常处理等问题。本文通过实例解析常见难题与解决方案,如配置文件中遗漏`result`元素致页面跳转失败、日期格式不匹配需自定义转换器、`<s:checkbox>`标签缺少`label`属性致显示不全及Action中未捕获异常影响用户体验等,助您有效应对挑战。
80 0