Flink报错问题之mysql timestamp字段报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink SQL GROUP BY后写入postgresql数据库主键如何实现业务场景需求呢?

SQL中对时间窗口和PRODUCT_ID进行了Group By聚合操作,PG数据表中的主键须设置为WINDOW_START /WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID +WINDOW_START /WINDOW_END为主键标识,请问Flink 中该如何实现这个需求?*来自志愿者整理的flink邮件归档



参考答案:

这个当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink 接口后解决,有望在 1.11 中解决。来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370046?spm=a2c6h.13066369.question.58.33bf585frGVY3q



问题二:修改topic名称后从Savepoint重启会怎么消费Kafka?

Hi,大佬们 突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。 会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢? 可以手动控制么? *来自志愿者整理的flink邮件归档



参考答案:

可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370093?spm=a2c6h.13066369.question.57.33bf585fknvNHu



问题三:大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据匹配

大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么? *来自志愿者整理的flink邮件归档



参考答案:

Collect 函数返回 Multiset 类型 ,可以使用 Map<T, Integer> 试试 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370094?spm=a2c6h.13066369.question.58.33bf585flp3p0Y



问题四:Flink SQL】无法启动 env.yaml怎么处理?

在服务器上试用sql-client时,启动指令如下:

./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml -e /root/flink-sql-client/sql-client-demo.yml

配置如下:

定义表

tables: - name: SourceTable type: source-table update-mode: append connector: type: datagen rows-per-second: 5 fields: f_sequence: kind: sequence start: 1 end: 1000 f_random: min: 1 max: 1000 f_random_str: length: 10 schema: - name: f_sequence data-type: INT - name: f_random data-type: INT - name: f_random_str data-type: STRING

遇到了如下报错:

Reading default environment from: file:/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml Reading session environment from: file:/root/flink-sql-client/sql-client-demo.yml

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

Reason: Required context properties mismatch.

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Missing properties: format.type=csv Mismatched properties: 'connector.type' expects 'filesystem', but is 'datagen'

The following properties are requested: connector.fields.f_random.max=1000 connector.fields.f_random.min=1 connector.fields.f_random_str.length=10 connector.fields.f_sequence.end=1000 connector.fields.f_sequence.kind=sequence connector.fields.f_sequence.start=1 connector.rows-per-second=5 connector.type=datagen schema.0.data-type=INT schema.0.name=f_sequence schema.1.data-type=INT schema.1.name=f_random schema.2.data-type=STRING schema.2.name=f_random_str update-mode=append

The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.filesystem.FileSystemTableFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:183) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:136) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859) ... 3 more

看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 也需要自己导包的么?哪里有更详细的资料,求指点,谢谢

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370095?spm=a2c6h.13066369.question.59.33bf585fTzCWiM



问题五:使用flink-sql解析debezium采集的mysql timestamp字段报错

flink-sql-client执行建表:

CREATE TABLE source_xxx ( id INT, ctime TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'debezium-json.schema-include' = 'false', 'debezium-json.ignore-parse-errors' = 'false' );

查询: select * from source_xxx; [ERROR] Could not execute SQL statement. Reason: java.time.format.DateTimeParseException: Text '2018-07-10T23:47:35Z' could not be parsed at index 10

mysql源表中ctime字段为timestamp类型,增加'debezium-json.timestamp-format.standard' = 'ISO-8601'配置依然报错,结尾多了个Z。 想咨询一下,这块儿是flink-sql和debezium采集的timestamp格式不兼容么?还是我debezium的配置,或者使用的flink-sql类型有问题?*来自志愿者整理的flink邮件归档



参考答案:

引用 Jark 对邮件列表中另一个相关的问题的回答,详情可查看[1]。 希望对你有帮助。

[1] http://apache-flink.147419.n8.nabble.com/flink-sql-td8884.html#a8888*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370096?spm=a2c6h.13066369.question.62.33bf585f50r5Fo

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
665 0
|
4月前
|
关系型数据库 MySQL
MySQL数据表添加字段(三种方式)
本文解析了数据表的基本概念及字段添加方法。在数据表中,字段是纵向列结构,记录为横向行数据。MySQL通过`ALTER TABLE`指令支持三种字段添加方式:1) 末尾追加字段,直接使用`ADD`语句;2) 首列插入字段,通过`FIRST`关键字实现;3) 指定位置插入字段,利用`AFTER`指定目标字段。文内结合`student`表实例详细演示了每种方法的操作步骤与结构验证,便于理解与实践。
|
8月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1824 45
|
8月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
594 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
1月前
|
安全 关系型数据库 MySQL
MySQL安全最佳实践:保护你的数据库
本文深入探讨了MySQL数据库的安全防护体系,涵盖认证安全、访问控制、网络安全、数据加密、审计监控、备份恢复、操作系统安全、应急响应等多个方面。通过具体配置示例,为企业提供了一套全面的安全实践方案,帮助强化数据库安全,防止数据泄露和未授权访问,保障企业数据资产安全。
|
16天前
|
缓存 关系型数据库 BI
使用MYSQL Report分析数据库性能(下)
使用MYSQL Report分析数据库性能
54 3
|
23天前
|
关系型数据库 MySQL 数据库
自建数据库如何迁移至RDS MySQL实例
数据库迁移是一项复杂且耗时的工程,需考虑数据安全、完整性及业务中断影响。使用阿里云数据传输服务DTS,可快速、平滑完成迁移任务,将应用停机时间降至分钟级。您还可通过全量备份自建数据库并恢复至RDS MySQL实例,实现间接迁移上云。
|
2月前
|
存储 运维 关系型数据库
从MySQL到云数据库,数据库迁移真的有必要吗?
本文探讨了企业在业务增长背景下,是否应从 MySQL 迁移至云数据库的决策问题。分析了 MySQL 的优势与瓶颈,对比了云数据库在存储计算分离、自动化运维、多负载支持等方面的优势,并提出判断迁移必要性的五个关键问题及实施路径,帮助企业理性决策并落地迁移方案。
|
10天前
|
关系型数据库 MySQL 分布式数据库
阿里云PolarDB云原生数据库收费价格:MySQL和PostgreSQL详细介绍
阿里云PolarDB兼容MySQL、PostgreSQL及Oracle语法,支持集中式与分布式架构。标准版2核4G年费1116元起,企业版最高性能达4核16G,支持HTAP与多级高可用,广泛应用于金融、政务、互联网等领域,TCO成本降低50%。
|
11天前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多