Flink数据问题之数据写入mysql失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:动态表 Change Log 格式

Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE). 其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗? 我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。 此外,Delete语句对应的数据是完整记录还是操作日志呢?

这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。 并通过额外的逻辑判断来获得最新的数据是可行的。

-- *来自志愿者整理的flink邮件归档



参考答案:

是完整的记录。

upsert kafka 就是这样子实现的,只存储最新镜像。 但是有的 query 是会产生 delete 消息的,所以有时候还是需要存下 delete,像 upsert kafka 里就存成了kafka 的 tombstone 消息。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370082?spm=a2c6h.13066369.question.46.33bf585f3ZrV23



问题二:SQL解析复杂JSON问题

麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log( id VARCHAR, timestam VARCHAR, user_info ROW(user_id string, name string ), jsonArray ARRAY<ROW(user_id222 STRING, name222 STRING)> ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'complex_string', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json', 'format.json-schema' = '{ "type": "object", "properties": { "id": {type: "string"}, "timestam": {type: "string"}, "user_info":{type: "object", "properties" : { "user_id" : {type:"string"}, "name":{type:"string"} } }, "jsonArray":{"type": "array", "items": { "type": "object", "properties" : { "user_id222" : {type:"string"}, "name222" : {type:"string"} } } } } }' );

*来自志愿者整理的flink邮件归档



参考答案:

Hi 你好,

这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json schema不是完全相同的话,需要手动写json-schema: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370083?spm=a2c6h.13066369.question.47.33bf585fQemfHB



问题三:一个关于实时合并数据的问题

想请教各位一个问题:目前有一个这样的需求:

数据流40W/s,数据有id,time,type....等字段,id有10亿个,现在想30分钟内,同一个id的信息只保存一条,时间的话要用事件的事件,不能用处理的时间 本人现在的思路是:根据id分组,然后做增量ck,状态信息存储每个id的最后的时间,然后每来一条数据会读取状态信息,然后做时间判断。但是发现这样做背压很高,数据消费很慢 请问各位,我这种思路是否可行?根据id分组会产生10亿个分组,这样会影响什么?还有其他更好的方法么?

谢谢各位解答疑惑!*来自志愿者整理的flink邮件归档



参考答案:

根据 id 去重并不意味着根据 id 做 keyby 分组,比如可以将 id 做个 mod 分成若干有限组,然后在 processFunction 中 MapState 存储 <id, 1> 进行去重处理

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370084?spm=a2c6h.13066369.question.48.33bf585fI2D86j



问题四:flinksql1.11长时间没有数据写入mysql,出问题

使用flinksql 1.11版本,运行计算好的指标落入mysql,长时间没有数据会报错,导致任务会失败。*来自志愿者整理的flink邮件归档



参考答案:

JdbcBatchingOutputFormat: for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { attemptFlush(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); } try { if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) { connection = connectionProvider.reestablishConnection(); jdbcStatementExecutor.closeStatements(); jdbcStatementExecutor.prepareStatements(connection); } } catch (Exception excpetion) { LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion); throw new IOException("Reestablish JDBC connection failed", excpetion); } try { Thread.sleep(1000 * i); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("unable to flush; interrupted while doing another attempt", e); } } }

嗯,看起来是这样的。

if (i >= executionOptions.getMaxRetries()) { throw new IOException(e); }这个判断重试的代码应该放在sleep 后面。不然,Caused by: java.io.IOException: java.sql.SQLException: No operations allowed after statement closed. 就没机会重建连接了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370085?spm=a2c6h.13066369.question.51.33bf585fijZlWy



问题五:请问flink sql实时计算分位数如何实现哈?

你好:

   目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。     期待你的答复,谢谢!*来自志愿者整理的flink邮件归档



参考答案:

可以看下UDAF的文档: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregate-functions *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370087?spm=a2c6h.13066369.question.52.33bf585fnDag6n

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3天前
|
缓存 NoSQL 关系型数据库
13- Redis和Mysql如何保证数据⼀致?
该内容讨论了保证Redis和MySQL数据一致性的几种策略。首先提到的两种方法存在不一致风险:先更新MySQL再更新Redis,或先删Redis再更新MySQL。第三种方案是通过MQ异步同步以达到最终一致性,适用于一致性要求较高的场景。项目中根据不同业务需求选择不同方案,如对一致性要求不高的情况不做处理,时效性数据设置过期时间,高一致性需求则使用MQ确保同步,最严格的情况可能涉及分布式事务(如Seata的TCC模式)。
23 6
|
10天前
|
SQL 关系型数据库 MySQL
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
轻松入门MySQL:保障数据完整性,MySQL事务在进销存管理系统中的应用(12)
|
17天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
19 0
|
20天前
Mybatis+mysql动态分页查询数据案例——测试类HouseDaoMybatisImplTest)
Mybatis+mysql动态分页查询数据案例——测试类HouseDaoMybatisImplTest)
19 1
|
20天前
Mybatis+mysql动态分页查询数据案例——工具类(MybatisUtil.java)
Mybatis+mysql动态分页查询数据案例——工具类(MybatisUtil.java)
14 1
|
16天前
|
关系型数据库 MySQL
MySQL查询当天昨天明天本月上月今年等数据
MySQL查询当天昨天明天本月上月今年等数据
17 2
|
20天前
|
Java 数据库连接 mybatis
Mybatis+mysql动态分页查询数据案例——Mybatis的配置文件(mybatis-config.xml)
Mybatis+mysql动态分页查询数据案例——Mybatis的配置文件(mybatis-config.xml)
14 1
|
6天前
|
关系型数据库 MySQL 数据库
mysql卸载、下载、安装(window版本)
mysql卸载、下载、安装(window版本)
|
1月前
|
关系型数据库 MySQL 数据库连接
关于MySQL-ODBC的zip包安装方法
关于MySQL-ODBC的zip包安装方法
|
24天前
|
关系型数据库 MySQL 数据库
rds安装数据库客户端工具
安装阿里云RDS的数据库客户端涉及在本地安装对应类型(如MySQL、PostgreSQL)的客户端工具。对于MySQL,可选择MySQL Command-Line Client或图形化工具如Navicat,安装后输入RDS实例的连接参数进行连接。对于PostgreSQL,可以使用`psql`命令行工具或图形化客户端如PgAdmin。首先从阿里云控制台获取连接信息,然后按照官方文档安装客户端,最后配置客户端连接以确保遵循安全指引。
75 1

相关产品

  • 实时计算 Flink版