问题一:1.11.1 报OutOfMemoryError: Metaspace. 错误
我是用flink sql,通过JDBC源读取mysql表数据,多次提交后,报OutOfMemoryError: Metaspace.错误,最后分析是mysql驱动包中有个守护线程一直在检测清理失效的connection,导致有线程一直存在,class资源无法释放导致的,请问这个如何处理?*来自志愿者整理的flink邮件归档
参考答案:
你用的 Flink 版本是哪个呢? 对于 Metaspace OOM 的话,如果是稳定在某些值,确实需要加大这个内存大小的话,可以设置 参数 taskmanager-memory-jvm-metaspace-size
[1]。 对于根本原因的话,需要看下为什么有那么多的无效 Conection,是不是长时间没有数据发送导致 connection invalid,然后再次使用时候就会重启,导致进程级别的缓存无效 Connection对象,这样的话,有线程一直在检测这个对象,就无法 GC。对于 connection 是否 invalid 的处理目前有个 issue[2] 已经解决了。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-jvm-metaspace-size [2] https://issues.apache.org/jira/browse/FLINK-16681*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371641?spm=a2c6h.13066369.question.87.6ad26382e6YZgi
问题二:提交flink sql任务报错
这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。 我想问问是什么原因?
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) at com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 8 more bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local. Stopping standalonesession daemon (pid: 92004) on host bjhldeMacBook-Pro.local. bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x -bash: bin/start-x: No such file or directory bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host bjhldeMacBook-Pro.local. Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.*来自志愿者整理的flink邮件归档
参考答案:
Hi, 你是不是使用的flink 1.11版本,在调用了tableEnv.executeSql,最后又调用了TableEnvironment.execute或StreamExecutionEnvironment.execute方法。 可以参考[1]
[1]https://blog.csdn.net/weixin_41608066/article/details/107769826 https://blog.csdn.net/weixin_41608066/article/details/107769826*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371640?spm=a2c6h.13066369.question.90.6ad263828l5PKX
问题三:kafka table connector eventTime的问题
你好。 在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
我是这样用的, 1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 2. 在connector里指定watermark,其中transTime是消息里的字段 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +
- 然后直接用datastream的window ds.keyBy(marketCode).timeWindow(Time.minutes(1L));
但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗? java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
即使我在datastream里定义了strategy , ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));
也还是报上面一样的错。*来自志愿者整理的flink邮件归档
参考答案:
Hi marble, 使用 Datastream 开发的话,Kafka connector 的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。 对应的中文文档对应在文献3和4. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html [3] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html [4] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371639?spm=a2c6h.13066369.question.89.6ad26382wxrmdc
问题四:使用BroadcastStream后checkpoint失效
问题:job在接入广播流后,checkpint失效。
描述:广播流的数据来源两个地方,一个是从mongo,一个是从kafka,数据进行union,同时指定Watermark,返回Watermark.MAX_WATERMARK(用于与主数据源connect后,窗口聚合水印更新),job部署后,来源mongo的数据源状态会变为FINISHED。网上有查过,说subtask 状态finished会导致checkpoint不触发,那如何既能满足数据源自定义(更像是DataSet),同时保证checkpoint正常触发呢*来自志愿者整理的flink邮件归档
参考答案:
我理解你的 BroadcastStream 也会定期的再次读取,然后更新对应的状态,这样的话,你的 source 可以一直在读取数据(run函数),不退出即可。如果只希望读取一次话,是不是维表也可以满足你的需求呢?
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371638?spm=a2c6h.13066369.question.90.6ad26382AABzF5
问题五:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题
flink 版本 1.11.2 问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
代码: // stream 1 create table kafkaSource1 ( id int, field_1 int, field_2 varchar, ts1 timestamp(3), watermark for ts1
) with ( connector = kafka ) // stream 2 create table kafkaSource2 ( id int, field_3 ts2 timestamp(3), watermark for ts2
) with ( connector = kafka )
//create view create view kafkaSource1_view as select field_1 as field_1, last_value(field_2) as field_2, last_value(ts1) as ts1 from kafkaSouce1 group by field_1
// query insert into sinkTable select a.field_1, b.field_3 from kafkaSource2 a join kafkaSource1_view b on a.id = b.id and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY*来自志愿者整理的flink邮件归档
参考答案:
从报错来看,这个 SQL 应该是 match 了 StreamExecJoinRule
,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->TimeIndicatorRelDataType
,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371637?spm=a2c6h.13066369.question.91.6ad263828BS9eI