Flink报错问题之提交flink sql任务报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:1.11.1 报OutOfMemoryError: Metaspace. 错误

     我是用flink sql,通过JDBC源读取mysql表数据,多次提交后,报OutOfMemoryError: Metaspace.错误,最后分析是mysql驱动包中有个守护线程一直在检测清理失效的connection,导致有线程一直存在,class资源无法释放导致的,请问这个如何处理?*来自志愿者整理的flink邮件归档



参考答案:

你用的 Flink 版本是哪个呢? 对于 Metaspace OOM 的话,如果是稳定在某些值,确实需要加大这个内存大小的话,可以设置 参数 taskmanager-memory-jvm-metaspace-size [1]。 对于根本原因的话,需要看下为什么有那么多的无效 Conection,是不是长时间没有数据发送导致 connection invalid,然后再次使用时候就会重启,导致进程级别的缓存无效 Connection对象,这样的话,有线程一直在检测这个对象,就无法 GC。对于 connection 是否 invalid 的处理目前有个 issue[2] 已经解决了。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-jvm-metaspace-size [2] https://issues.apache.org/jira/browse/FLINK-16681*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371641?spm=a2c6h.13066369.question.87.6ad26382e6YZgi



问题二:提交flink sql任务报错

这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。 我想问问是什么原因?

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) at com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 8 more bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local. Stopping standalonesession daemon (pid: 92004) on host bjhldeMacBook-Pro.local. bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x -bash: bin/start-x: No such file or directory bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host bjhldeMacBook-Pro.local. Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.*来自志愿者整理的flink邮件归档



参考答案:

Hi, 你是不是使用的flink 1.11版本,在调用了tableEnv.executeSql,最后又调用了TableEnvironment.execute或StreamExecutionEnvironment.execute方法。 可以参考[1]

[1]https://blog.csdn.net/weixin_41608066/article/details/107769826 https://blog.csdn.net/weixin_41608066/article/details/107769826*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371640?spm=a2c6h.13066369.question.90.6ad263828l5PKX



问题三:kafka table connector eventTime的问题

你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,

我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +

  1. 然后直接用datastream的window ds.keyBy(marketCode).timeWindow(Time.minutes(1L));

但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗? java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

即使我在datastream里定义了strategy , ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));

也还是报上面一样的错。*来自志愿者整理的flink邮件归档



参考答案:

Hi marble, 使用 Datastream 开发的话,Kafka connector 的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。 对应的中文文档对应在文献3和4. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html [3] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html [4] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371639?spm=a2c6h.13066369.question.89.6ad26382wxrmdc



问题四:使用BroadcastStream后checkpoint失效

问题:job在接入广播流后,checkpint失效。

描述:广播流的数据来源两个地方,一个是从mongo,一个是从kafka,数据进行union,同时指定Watermark,返回Watermark.MAX_WATERMARK(用于与主数据源connect后,窗口聚合水印更新),job部署后,来源mongo的数据源状态会变为FINISHED。网上有查过,说subtask 状态finished会导致checkpoint不触发,那如何既能满足数据源自定义(更像是DataSet),同时保证checkpoint正常触发呢*来自志愿者整理的flink邮件归档



参考答案:

我理解你的 BroadcastStream 也会定期的再次读取,然后更新对应的状态,这样的话,你的 source 可以一直在读取数据(run函数),不退出即可。如果只希望读取一次话,是不是维表也可以满足你的需求呢?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371638?spm=a2c6h.13066369.question.90.6ad26382AABzF5



问题五:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

flink 版本 1.11.2 问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

代码: // stream 1 create table kafkaSource1 ( id int, field_1 int, field_2 varchar, ts1 timestamp(3), watermark for ts1 ) with ( connector = kafka ) // stream 2 create table kafkaSource2 ( id int, field_3 ts2 timestamp(3), watermark for ts2 ) with ( connector = kafka )

//create view create view kafkaSource1_view as select field_1 as field_1, last_value(field_2) as field_2, last_value(ts1) as ts1 from kafkaSouce1 group by field_1

// query insert into sinkTable select a.field_1, b.field_3 from kafkaSource2 a join kafkaSource1_view b on a.id = b.id and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY*来自志愿者整理的flink邮件归档



参考答案:

从报错来看,这个 SQL 应该是 match 了 StreamExecJoinRule,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->TimeIndicatorRelDataType,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371637?spm=a2c6h.13066369.question.91.6ad263828BS9eI

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
11天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
49 15
|
13天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
38 2
|
13天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
29 1
|
25天前
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
Java 对象存储 流计算
如何迁移 Flink 任务到实时计算
本文由阿里巴巴技术专家景丽宁(砚田)分享,主要介绍如何迁移Flink任务到实时计算 Flink 中来。
如何迁移 Flink 任务到实时计算
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
25天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
1月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版