Flink报错问题之提交flink sql任务报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:1.11.1 报OutOfMemoryError: Metaspace. 错误

     我是用flink sql,通过JDBC源读取mysql表数据,多次提交后,报OutOfMemoryError: Metaspace.错误,最后分析是mysql驱动包中有个守护线程一直在检测清理失效的connection,导致有线程一直存在,class资源无法释放导致的,请问这个如何处理?*来自志愿者整理的flink邮件归档



参考答案:

你用的 Flink 版本是哪个呢? 对于 Metaspace OOM 的话,如果是稳定在某些值,确实需要加大这个内存大小的话,可以设置 参数 taskmanager-memory-jvm-metaspace-size [1]。 对于根本原因的话,需要看下为什么有那么多的无效 Conection,是不是长时间没有数据发送导致 connection invalid,然后再次使用时候就会重启,导致进程级别的缓存无效 Connection对象,这样的话,有线程一直在检测这个对象,就无法 GC。对于 connection 是否 invalid 的处理目前有个 issue[2] 已经解决了。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-jvm-metaspace-size [2] https://issues.apache.org/jira/browse/FLINK-16681*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371641?spm=a2c6h.13066369.question.87.6ad26382e6YZgi



问题二:提交flink sql任务报错

这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。 我想问问是什么原因?

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) at com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 8 more bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local. Stopping standalonesession daemon (pid: 92004) on host bjhldeMacBook-Pro.local. bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x -bash: bin/start-x: No such file or directory bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host bjhldeMacBook-Pro.local. Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.*来自志愿者整理的flink邮件归档



参考答案:

Hi, 你是不是使用的flink 1.11版本,在调用了tableEnv.executeSql,最后又调用了TableEnvironment.execute或StreamExecutionEnvironment.execute方法。 可以参考[1]

[1]https://blog.csdn.net/weixin_41608066/article/details/107769826 https://blog.csdn.net/weixin_41608066/article/details/107769826*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371640?spm=a2c6h.13066369.question.90.6ad263828l5PKX



问题三:kafka table connector eventTime的问题

你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,

我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +

  1. 然后直接用datastream的window ds.keyBy(marketCode).timeWindow(Time.minutes(1L));

但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗? java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

即使我在datastream里定义了strategy , ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));

也还是报上面一样的错。*来自志愿者整理的flink邮件归档



参考答案:

Hi marble, 使用 Datastream 开发的话,Kafka connector 的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。 对应的中文文档对应在文献3和4. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html [3] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html [4] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371639?spm=a2c6h.13066369.question.89.6ad26382wxrmdc



问题四:使用BroadcastStream后checkpoint失效

问题:job在接入广播流后,checkpint失效。

描述:广播流的数据来源两个地方,一个是从mongo,一个是从kafka,数据进行union,同时指定Watermark,返回Watermark.MAX_WATERMARK(用于与主数据源connect后,窗口聚合水印更新),job部署后,来源mongo的数据源状态会变为FINISHED。网上有查过,说subtask 状态finished会导致checkpoint不触发,那如何既能满足数据源自定义(更像是DataSet),同时保证checkpoint正常触发呢*来自志愿者整理的flink邮件归档



参考答案:

我理解你的 BroadcastStream 也会定期的再次读取,然后更新对应的状态,这样的话,你的 source 可以一直在读取数据(run函数),不退出即可。如果只希望读取一次话,是不是维表也可以满足你的需求呢?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371638?spm=a2c6h.13066369.question.90.6ad26382AABzF5



问题五:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

flink 版本 1.11.2 问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

代码: // stream 1 create table kafkaSource1 ( id int, field_1 int, field_2 varchar, ts1 timestamp(3), watermark for ts1 ) with ( connector = kafka ) // stream 2 create table kafkaSource2 ( id int, field_3 ts2 timestamp(3), watermark for ts2 ) with ( connector = kafka )

//create view create view kafkaSource1_view as select field_1 as field_1, last_value(field_2) as field_2, last_value(ts1) as ts1 from kafkaSouce1 group by field_1

// query insert into sinkTable select a.field_1, b.field_3 from kafkaSource2 a join kafkaSource1_view b on a.id = b.id and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY*来自志愿者整理的flink邮件归档



参考答案:

从报错来看,这个 SQL 应该是 match 了 StreamExecJoinRule,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->TimeIndicatorRelDataType,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371637?spm=a2c6h.13066369.question.91.6ad263828BS9eI

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 关系型数据库 MySQL
这样的SQL执行为什么不会报错?optimizer_trace深度历险
【10月更文挑战第12天】本文探讨了一条看似错误但实际上能成功执行的SQL语句,通过开启MySQL的优化器追踪功能,详细分析了SQL的执行过程,揭示了子查询被优化器解析为连接操作的原因,最终解释了为何该SQL不会报错。文章不仅增进了对SQL优化机制的理解,也展示了如何利用优化器追踪解决实际问题。
|
2月前
|
SQL 数据采集 自然语言处理
NL2SQL之DB-GPT-Hub<详解篇>:text2sql任务的微调框架和基准对比
NL2SQL之DB-GPT-Hub<详解篇>:text2sql任务的微调框架和基准对比
|
8天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
72 14
|
1月前
|
SQL 关系型数据库 MySQL
体验使用DAS实现数据库SQL优化,完成任务可得羊羔绒加厚坐垫!
本实验介绍如何通过数据库自治服务DAS对RDS MySQL高可用实例进行SQL优化,包含购买RDS实例并创建数据库、数据导入、生成并优化慢SQL、执行优化后的SQL语句等实验步骤。完成任务,即可领取羊羔绒加厚坐垫,限量500个,先到先得。
150 12
|
2月前
|
SQL 关系型数据库 MySQL
|
2月前
|
SQL 运维
Doris同一个SQL任务,前一天执行成功,第二天执行失败
Doris 动态分区 插入数据 同样的代码隔天运行一个成功一个失败
|
3月前
|
关系型数据库 MySQL Nacos
nacos启动报错 load derby-schema.sql error
这篇文章描述了作者在使用Nacos时遇到的启动错误,错误提示为加载derby-schema.sql失败,作者通过将数据库从Derby更换为MySQL解决了问题。
nacos启动报错 load derby-schema.sql error
|
2月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
60 0
|
3月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
5月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
135 13

相关产品

  • 实时计算 Flink版