为什么 Flink 无法实时写入 MySQL?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 1.10 使用 flink-jdbc 连接器的方式与 MySQL 交互,读数据和写数据都能完成,但是在写数据时,发现 Flink 程序执行完毕之后,才能在 MySQL 中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果?

作者:孙金城

摘要:本文为 Flink 生产环境应用中的疑问剖析,Flink 无法实时写入 MySQL 是初学者常见问题之一,由社区同学罗鹏程提出,Apache Flink PMC 孙金城(金竹)老师分享该问题的解决方案及分析思路。主要分为以下四部分:

  1. 问题描述
  2. 解决思路
  3. 原因剖析
  4. 举一反三

Tips:更多生产环境问题交流及反馈请订阅 Flink 中文邮件列表~

问题描述

Flink 1.10 使用 flink-jdbc 连接器的方式与 MySQL 交互,读数据和写数据都能完成,但是在写数据时,发现 Flink 程序执行完毕之后,才能在 MySQL 中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果?

1.jpg

相关代码片段:

JDBCAppendTableSink.builder()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl("jdbc:mysql://localhost/flink")
    .setUsername("root")
    .setPassword("123456")
    .setParameterTypes(
    BasicTypeInfo.INT_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO)
    .setQuery("insert into batch_size values(?,?)")
   .build()

如何解决?

Flink 1.10 这个问题是知道一秒钟,不知磨洋工的 Case,在初学时候非常容易遇上,那么真的是 Flink 不能实时写入 MySQL 吗?当然不是,上面代码基础之上简单的加上一行,就解决问题了:

...
.setBatchSize(1) //将写入MySQL的buffer大小为1。
..

原因剖析

那么问题虽然解决了,根本原因是个啥呢?也许你看到这里会说,这问题很明显,就是 Flink 设计 JDBC Sink 的时候出于性能因素考虑,对写入 buffer 做了默认值设置。

没错,这一点你说的很对,在 Flink 1.10 中 JDBC OutputFormat 的基类 AbstractJDBCOutputFormat 里面和这相关的变量 DEFAULT_FLUSH_MAX_SIZE 默认值是 5000,所以在你学习测试时候由于测试数据少(少于 5000),数据一直在 buffer 中,直到数据源数据结束,作业也结束了,才将计算结果刷入 MySQL,所以没有实时的(每条)写入 MySQL。如下:

3-1.jpg

但这里还有个因素需要注意,那就是时间因素,上面 DEFAULT_FLUSH_INTERVAL_MILLS 默认值是 0,这个相当于没有时间限制,一直等到 buffer 满了或者作业结束才能触发写出动作。

也就是有些初学者,发现问题,即使故意 debug 时候打上断点,不让作业结束,但是等到花儿都谢了,数据也没有写入到 MySQL。

在 Flink 1.10 中 AbstractJDBCOutputFormat 有两个实现类:

3.jpg

分别对应了如下两类 Sink:

4.jpg

所以在 Flink 1.10 中不论是 AppendTableSink 和 UpsertTableSink 都会有同样的问题。不过 UpsertTableSink 时用户可以设置时间,而 AppendTableSink 是连时间设置的入口都木有。

那么,是 Flink 的锅?

就这个问题而言,我个人认为不是用户的问题,是 Flink 1.10 代码设计有进一步改进的空间。在 Flink 1.11 中社区的确重构了,对 JDBCOutputFormat 打了 @Deprecated。感兴趣可以查阅 FLINK-17537 了解变化过程。但是在这个改进中,并没有对 DEFAULT_FLUSH_MAX_SIZE 默认值和 DEFAULT_FLUSH_INTERVAL_MILLS 默认值做变化,社区也在积极的讨论改进方案,想参与社区贡献或者了解最终讨论结果的可以查阅 FLINK-16497。

举一反三

当然在你学习过程中使用任何 Sink 的时候,只要没有实时写入,都可以找找是否有写出 buffer 和写出时间的限制设置。在这一点上,罗鹏程也提到了 Elasticsearch 也有类似问题,需要调用 setBulkFlushMaxActions 进行设置。

5.jpg

大家在学习、使用 Flink 的过程中遇到的问题都可以通过 Flink 中文邮件列表进行反馈,Flink 核心开发者及社区一线用户在线答疑交流!

2 分钟快速订阅 Flink 中文邮件列表

Apache Flink 中文邮件列表订阅流程:

  1. 发送任意邮件到 user-zh-subscribe@flink.apache.org
  2. 收到官方确认邮件
  3. 回复该邮件 confirm 即可订阅

订阅成功后将收到 Flink 官方的中文邮件列表的消息,您可以向 user-zh@flink.apache.org 发邮件提问也可以帮助别人解答问题,动动手测试一下!

以上是对该问题解决方案及思路的分享,希望能对你有所帮助,也期待大家遇到的典型问题能及时反馈至社区邮件列表。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何创建mysql临时表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
30天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
1月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
132 1
|
30天前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之连接到MySQL的从库时遇到其他服务也连接到了从库,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之MySQL到MySOL的批量实时同步该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
11天前
|
NoSQL 关系型数据库 MySQL
微服务架构下的数据库选择:MySQL、PostgreSQL 还是 NoSQL?
在微服务架构中,数据库的选择至关重要。不同类型的数据库适用于不同的需求和场景。在本文章中,我们将深入探讨传统的关系型数据库(如 MySQL 和 PostgreSQL)与现代 NoSQL 数据库的优劣势,并分析在微服务架构下的最佳实践。
|
13天前
|
存储 SQL 关系型数据库
使用MySQL Workbench进行数据库备份
【9月更文挑战第13天】以下是使用MySQL Workbench进行数据库备份的步骤:启动软件后,通过“Database”菜单中的“管理连接”选项配置并选择要备份的数据库。随后,选择“数据导出”,确认导出的数据库及格式(推荐SQL格式),设置存储路径,点击“开始导出”。完成后,可在指定路径找到备份文件,建议定期备份并存储于安全位置。
138 11
|
1月前
|
弹性计算 关系型数据库 数据库
手把手带你从自建 MySQL 迁移到云数据库,一步就能脱胎换骨
阿里云瑶池数据库来开课啦!自建数据库迁移至云数据库 RDS原来只要一步操作就能搞定!点击阅读原文完成实验就可获得一本日历哦~

相关产品

  • 实时计算 Flink版