Flink数据问题之数据写入mysql失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:动态表 Change Log 格式

Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE). 其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗? 我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。 此外,Delete语句对应的数据是完整记录还是操作日志呢?

这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。 并通过额外的逻辑判断来获得最新的数据是可行的。

-- *来自志愿者整理的flink邮件归档



参考答案:

是完整的记录。

upsert kafka 就是这样子实现的,只存储最新镜像。 但是有的 query 是会产生 delete 消息的,所以有时候还是需要存下 delete,像 upsert kafka 里就存成了kafka 的 tombstone 消息。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370082?spm=a2c6h.13066369.question.46.33bf585f3ZrV23



问题二:SQL解析复杂JSON问题

麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log( id VARCHAR, timestam VARCHAR, user_info ROW(user_id string, name string ), jsonArray ARRAY<ROW(user_id222 STRING, name222 STRING)> ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'complex_string', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json', 'format.json-schema' = '{ "type": "object", "properties": { "id": {type: "string"}, "timestam": {type: "string"}, "user_info":{type: "object", "properties" : { "user_id" : {type:"string"}, "name":{type:"string"} } }, "jsonArray":{"type": "array", "items": { "type": "object", "properties" : { "user_id222" : {type:"string"}, "name222" : {type:"string"} } } } } }' );

*来自志愿者整理的flink邮件归档



参考答案:

Hi 你好,

这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json schema不是完全相同的话,需要手动写json-schema: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370083?spm=a2c6h.13066369.question.47.33bf585fQemfHB



问题三:一个关于实时合并数据的问题

想请教各位一个问题:目前有一个这样的需求:

数据流40W/s,数据有id,time,type....等字段,id有10亿个,现在想30分钟内,同一个id的信息只保存一条,时间的话要用事件的事件,不能用处理的时间 本人现在的思路是:根据id分组,然后做增量ck,状态信息存储每个id的最后的时间,然后每来一条数据会读取状态信息,然后做时间判断。但是发现这样做背压很高,数据消费很慢 请问各位,我这种思路是否可行?根据id分组会产生10亿个分组,这样会影响什么?还有其他更好的方法么?

谢谢各位解答疑惑!*来自志愿者整理的flink邮件归档



参考答案:

根据 id 去重并不意味着根据 id 做 keyby 分组,比如可以将 id 做个 mod 分成若干有限组,然后在 processFunction 中 MapState 存储 <id, 1> 进行去重处理

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370084?spm=a2c6h.13066369.question.48.33bf585fI2D86j



问题四:flinksql1.11长时间没有数据写入mysql,出问题

使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。*来自志愿者整理的flink邮件归档



参考答案:

JdbcBatchingOutputFormat: for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { attemptFlush(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); } try { if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { connection = connectionProvider.reestablishConnection(); jdbcStatementExecutor.closeStatements(); jdbcStatementExecutor.prepareStatements(connection); } } catch (Exception excpetion) { LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion); throw new IOException("Reestablish JDBC connection failed", excpetion); } try { Thread.sleep(1000 * i); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("unable to flush; interrupted while doing another attempt", e); } } }

嗯,看起来是这样的。

if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); }这个判断重试的代码应该放在sleep 后面。不然,Caused by: java.io.IOException: java.sql.SQLException: No operations allowed after statement closed. 就没机会重建连接了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370085?spm=a2c6h.13066369.question.51.33bf585fijZlWy



问题五:请问flink sql实时计算分位数如何实现哈?

你好:

   目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。     期待你的答复,谢谢!*来自志愿者整理的flink邮件归档



参考答案:

可以看下UDAF的文档: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregate-functions *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370087?spm=a2c6h.13066369.question.52.33bf585fnDag6n

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3天前
|
存储 关系型数据库 MySQL
MySQL是怎样存储数据的?
MySQL是怎样存储数据的?
|
4天前
|
SQL 关系型数据库 MySQL
这篇文章带你了解:如何一次性将Centos中Mysql的数据快速导出!!!
这篇文章带你了解:如何一次性将Centos中Mysql的数据快速导出!!!
|
5天前
|
存储 SQL 关系型数据库
MySQL的优化利器⭐️索引条件下推,千万数据下性能提升273%🚀
以小白的视角探究MySQL索引条件下推ICP的优化,其中包括server层与存储引擎层如何交互、索引、回表、ICP等内容
MySQL的优化利器⭐️索引条件下推,千万数据下性能提升273%🚀
|
6天前
|
存储 关系型数据库 MySQL
MySQL字段的字符类型该如何选择?千万数据下varchar和char性能竟然相差30%🚀
本篇文章来讨论MySQL字段的字符类型选择并深入实践char与varchar类型的区别以及在千万数据下的性能测试
MySQL字段的字符类型该如何选择?千万数据下varchar和char性能竟然相差30%🚀
|
6天前
|
关系型数据库 MySQL 数据管理
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
|
11天前
|
关系型数据库 MySQL 数据库
【MySQL-10】DCL-数据控制语言-【管理用户&权限控制】 (语法语句&案例演示&可cv案例代码)
【MySQL-10】DCL-数据控制语言-【管理用户&权限控制】 (语法语句&案例演示&可cv案例代码)
【MySQL-10】DCL-数据控制语言-【管理用户&权限控制】 (语法语句&案例演示&可cv案例代码)
|
11天前
|
SQL 关系型数据库 MySQL
【MySQL-7】DML的表操作详解:添加数据&修改数据&删除数据(可cv例题语句)
【MySQL-7】DML的表操作详解:添加数据&修改数据&删除数据(可cv例题语句)
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
598 5
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1630 2
官宣|Apache Flink 1.19 发布公告
|
2月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
289 3

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多