Flink报错问题之mysql timestamp字段报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink SQL GROUP BY后写入postgresql数据库主键如何实现业务场景需求呢?

SQL中对时间窗口和PRODUCT_ID进行了Group By聚合操作,PG数据表中的主键须设置为WINDOW_START /WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID +WINDOW_START /WINDOW_END为主键标识,请问Flink 中该如何实现这个需求?*来自志愿者整理的flink邮件归档



参考答案:

这个当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink 接口后解决,有望在 1.11 中解决。来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370046?spm=a2c6h.13066369.question.58.33bf585frGVY3q



问题二:修改topic名称后从Savepoint重启会怎么消费Kafka?

Hi,大佬们 突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。 会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢? 可以手动控制么? *来自志愿者整理的flink邮件归档



参考答案:

可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370093?spm=a2c6h.13066369.question.57.33bf585fknvNHu



问题三:大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据匹配

大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么? *来自志愿者整理的flink邮件归档



参考答案:

Collect 函数返回 Multiset 类型 ,可以使用 Map<T, Integer> 试试 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370094?spm=a2c6h.13066369.question.58.33bf585flp3p0Y



问题四:Flink SQL】无法启动 env.yaml怎么处理?

在服务器上试用sql-client时,启动指令如下:

./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml -e /root/flink-sql-client/sql-client-demo.yml

配置如下:

定义表

tables: - name: SourceTable type: source-table update-mode: append connector: type: datagen rows-per-second: 5 fields: f_sequence: kind: sequence start: 1 end: 1000 f_random: min: 1 max: 1000 f_random_str: length: 10 schema: - name: f_sequence data-type: INT - name: f_random data-type: INT - name: f_random_str data-type: STRING

遇到了如下报错:

Reading default environment from: file:/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml Reading session environment from: file:/root/flink-sql-client/sql-client-demo.yml

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

Reason: Required context properties mismatch.

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Missing properties: format.type=csv Mismatched properties: 'connector.type' expects 'filesystem', but is 'datagen'

The following properties are requested: connector.fields.f_random.max=1000 connector.fields.f_random.min=1 connector.fields.f_random_str.length=10 connector.fields.f_sequence.end=1000 connector.fields.f_sequence.kind=sequence connector.fields.f_sequence.start=1 connector.rows-per-second=5 connector.type=datagen schema.0.data-type=INT schema.0.name=f_sequence schema.1.data-type=INT schema.1.name=f_random schema.2.data-type=STRING schema.2.name=f_random_str update-mode=append

The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.filesystem.FileSystemTableFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:183) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:136) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859) ... 3 more

看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 也需要自己导包的么?哪里有更详细的资料,求指点,谢谢

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370095?spm=a2c6h.13066369.question.59.33bf585fTzCWiM



问题五:使用flink-sql解析debezium采集的mysql timestamp字段报错

flink-sql-client执行建表:

CREATE TABLE source_xxx ( id INT, ctime TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'debezium-json.schema-include' = 'false', 'debezium-json.ignore-parse-errors' = 'false' );

查询: select * from source_xxx; [ERROR] Could not execute SQL statement. Reason: java.time.format.DateTimeParseException: Text '2018-07-10T23:47:35Z' could not be parsed at index 10

mysql源表中ctime字段为timestamp类型,增加'debezium-json.timestamp-format.standard' = 'ISO-8601'配置依然报错,结尾多了个Z。 想咨询一下,这块儿是flink-sql和debezium采集的timestamp格式不兼容么?还是我debezium的配置,或者使用的flink-sql类型有问题?*来自志愿者整理的flink邮件归档



参考答案:

引用 Jark 对邮件列表中另一个相关的问题的回答,详情可查看[1]。 希望对你有帮助。

[1] http://apache-flink.147419.n8.nabble.com/flink-sql-td8884.html#a8888*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370096?spm=a2c6h.13066369.question.62.33bf585f50r5Fo

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
28天前
|
关系型数据库 MySQL
mysql增加修改删除字段
mysql增加修改删除字段
14 0
|
25天前
|
关系型数据库 MySQL 数据库
运行flyway报错, MySQL 5.6 is no longer supported by Flyway Community Edition
运行flyway报错, MySQL 5.6 is no longer supported by Flyway Community Edition
20 1
|
23天前
|
存储 关系型数据库 MySQL
MySQL数据库性能大揭秘:表设计优化的高效策略(优化数据类型、增加冗余字段、拆分表以及使用非空约束)
MySQL数据库性能大揭秘:表设计优化的高效策略(优化数据类型、增加冗余字段、拆分表以及使用非空约束)
|
4天前
|
存储 数据可视化 关系型数据库
MySQL字段的时间类型该如何选择?千万数据下性能提升10%~30%🚀
本文探讨MySQL中时间类型的选择,阐述datetime、timestamp、整形时间戳等类型特点以及它们在千万级数据量下的查询性能
MySQL字段的时间类型该如何选择?千万数据下性能提升10%~30%🚀
|
8天前
|
SQL 关系型数据库 MySQL
mysql 数据库查询 查询字段用逗号隔开 关联另一个表并显示
mysql 数据库查询 查询字段用逗号隔开 关联另一个表并显示
18 2
|
12天前
|
关系型数据库 MySQL
MySQL全局库表查询准确定位字段
information_schema.COLUMNS 详细信息查询
201 4
|
18天前
|
关系型数据库 MySQL
如何解决cmd命令窗口无法运行mysql命令的问题
如何解决cmd命令窗口无法运行mysql命令的问题
10 0
|
19天前
|
SQL 存储 关系型数据库
【mysql】将逗号分割的字段内容转换为多行并group by
【mysql】将逗号分割的字段内容转换为多行并group by
|
30天前
|
SQL 关系型数据库 MySQL
Mysql数据库一个表字段中存了id,并以逗号分隔,id对应的详细信息在另一个表中
Mysql数据库一个表字段中存了id,并以逗号分隔,id对应的详细信息在另一个表中
10 0
|
1月前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
52 2

相关产品

  • 实时计算 Flink版