Flink报错问题之提交flink sql任务报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:1.11.1 报OutOfMemoryError: Metaspace. 错误

     我是用flink sql,通过JDBC源读取mysql表数据,多次提交后,报OutOfMemoryError: Metaspace.错误,最后分析是mysql驱动包中有个守护线程一直在检测清理失效的connection,导致有线程一直存在,class资源无法释放导致的,请问这个如何处理?*来自志愿者整理的flink邮件归档



参考答案:

你用的 Flink 版本是哪个呢? 对于 Metaspace OOM 的话,如果是稳定在某些值,确实需要加大这个内存大小的话,可以设置 参数 taskmanager-memory-jvm-metaspace-size [1]。 对于根本原因的话,需要看下为什么有那么多的无效 Conection,是不是长时间没有数据发送导致 connection invalid,然后再次使用时候就会重启,导致进程级别的缓存无效 Connection对象,这样的话,有线程一直在检测这个对象,就无法 GC。对于 connection 是否 invalid 的处理目前有个 issue[2] 已经解决了。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-jvm-metaspace-size [2] https://issues.apache.org/jira/browse/FLINK-16681*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371641?spm=a2c6h.13066369.question.87.6ad26382e6YZgi



问题二:提交flink sql任务报错

这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。 我想问问是什么原因?

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) at com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 8 more bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local. Stopping standalonesession daemon (pid: 92004) on host bjhldeMacBook-Pro.local. bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x -bash: bin/start-x: No such file or directory bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host bjhldeMacBook-Pro.local. Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.*来自志愿者整理的flink邮件归档



参考答案:

Hi, 你是不是使用的flink 1.11版本,在调用了tableEnv.executeSql,最后又调用了TableEnvironment.execute或StreamExecutionEnvironment.execute方法。 可以参考[1]

[1]https://blog.csdn.net/weixin_41608066/article/details/107769826 https://blog.csdn.net/weixin_41608066/article/details/107769826*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371640?spm=a2c6h.13066369.question.90.6ad263828l5PKX



问题三:kafka table connector eventTime的问题

你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,

我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +

  1. 然后直接用datastream的window ds.keyBy(marketCode).timeWindow(Time.minutes(1L));

但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗? java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

即使我在datastream里定义了strategy , ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));

也还是报上面一样的错。*来自志愿者整理的flink邮件归档



参考答案:

Hi marble, 使用 Datastream 开发的话,Kafka connector 的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。 对应的中文文档对应在文献3和4. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html [3] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html [4] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371639?spm=a2c6h.13066369.question.89.6ad26382wxrmdc



问题四:使用BroadcastStream后checkpoint失效

问题:job在接入广播流后,checkpint失效。

描述:广播流的数据来源两个地方,一个是从mongo,一个是从kafka,数据进行union,同时指定Watermark,返回Watermark.MAX_WATERMARK(用于与主数据源connect后,窗口聚合水印更新),job部署后,来源mongo的数据源状态会变为FINISHED。网上有查过,说subtask 状态finished会导致checkpoint不触发,那如何既能满足数据源自定义(更像是DataSet),同时保证checkpoint正常触发呢*来自志愿者整理的flink邮件归档



参考答案:

我理解你的 BroadcastStream 也会定期的再次读取,然后更新对应的状态,这样的话,你的 source 可以一直在读取数据(run函数),不退出即可。如果只希望读取一次话,是不是维表也可以满足你的需求呢?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371638?spm=a2c6h.13066369.question.90.6ad26382AABzF5



问题五:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

flink 版本 1.11.2 问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

代码: // stream 1 create table kafkaSource1 ( id int, field_1 int, field_2 varchar, ts1 timestamp(3), watermark for ts1 ) with ( connector = kafka ) // stream 2 create table kafkaSource2 ( id int, field_3 ts2 timestamp(3), watermark for ts2 ) with ( connector = kafka )

//create view create view kafkaSource1_view as select field_1 as field_1, last_value(field_2) as field_2, last_value(ts1) as ts1 from kafkaSouce1 group by field_1

// query insert into sinkTable select a.field_1, b.field_3 from kafkaSource2 a join kafkaSource1_view b on a.id = b.id and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY*来自志愿者整理的flink邮件归档



参考答案:

从报错来看,这个 SQL 应该是 match 了 StreamExecJoinRule,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->TimeIndicatorRelDataType,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371637?spm=a2c6h.13066369.question.91.6ad263828BS9eI

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
10天前
|
SQL 数据处理 流计算
实时计算 Flink版产品使用合集之sql真正的执行顺序是怎样的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7天前
|
SQL 数据采集 监控
14个Flink SQL性能优化实践分享
本文档详细列举了Apache Flink SQL的性能调优策略。主要关注点包括:增加数据源读取并行度、优化状态管理(如使用RocksDB状态后端并设置清理策略)、调整窗口操作以减少延迟、避免类型转换和不合理的JOIN操作、使用广播JOIN、注意SQL查询复杂度、控制并发度和资源调度、自定义源码实现、执行计划分析、异常检测与恢复、监控报警、数据预处理与清洗、利用高级特性(如容器化部署和UDF)以及数据压缩与序列化。此外,文档还强调了任务并行化、网络传输优化、系统配置调优、数据倾斜处理和任务调度策略。通过这些方法,可以有效解决性能问题,提升Flink SQL的运行效率。
|
10天前
|
SQL Java 数据库连接
实时计算 Flink版产品使用合集之怎么将MyBatis-Plus集成到SQL语法中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
NoSQL Java MongoDB
实时计算 Flink版产品使用合集之在一个任务中创建了多个MySQLCDC源表,这些源表是否共享同一个数据库连接池
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之在Application模式下,如何在客户端中同步获取任务执行结果后再退出
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
SQL Java 数据库连接
实时计算 Flink版产品使用合集之同步数据到Doris的任务中,遇到在继续同步后再次点击下线无法下线,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之从Oracle数据库同步数据时,checkpoint恢复后无法捕获到任务暂停期间的变更日志,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之sql读取mysql写入clickhouse,该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
消息中间件 Kafka 分布式数据库
实时计算 Flink版产品使用合集之如何批量读取Kafka数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
SQL JSON 资源调度
实时计算 Flink版产品使用合集之如何指定FlinkYarnSession启动的properties文件存放位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

相关产品

  • 实时计算 Flink版