问题一:flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空
可以通过以下步骤还原车祸现场: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }
代码Problem2.java: package com.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;
/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable3 (\n" + " action STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'false',\n" + " 'format.json-schema' = '{"type": "object", "properties": {"action": {"type": "string"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource);
Table table = bsEnv.sqlQuery("select * from actionTable3"); // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(action
)) as T(word
)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空
bsEnv.execute("ARRAY tableFunction Problem"); } }
参考回答:
这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?
[1] https://issues.apache.org/jira/browse/FLINK-18002
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372052
问题二:flink1.10在通过TableFunction实现行转列时,Row一直是空
我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, 那么在eval方法接收到的就是Row[], 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
通过下面的步骤和代码可还原车祸场景: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }
代码1:Problem.java package com.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;
/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode2", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable (\n" + " action ARRAY<\n" + " ROW<" + " actionID STRING,\n" + " actionName STRING\n" + " >\n" + " >\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json'\n" + ")"; bsEnv.sqlUpdate(ddlSource);
// Table table = bsEnv.sqlQuery("select action
from actionTable"); Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL TABLE(explode2(action
)) as T(word
)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print("==tb==");
bsEnv.execute("ARRAY tableFunction Problem"); } }
代码2:ExplodeFunction.java package com.flink;
import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row;
import java.util.ArrayList; import java.util.Arrays;
public class ExplodeFunction extends TableFunction {
public void eval(Row[] values) { System.out.println(values.length); if (values.length > 0) { for (Row row : values) { if (row != null) {// 这里debug出来的row总是空 ArrayList list = new ArrayList<>(); for (int i = 0; i < row.getArity(); i++) { Object field = row.getField(i); list.add(field); }
collector.collect(Row.of(Arrays.toString(list.toArray()))); } } } } }
参考回答:
当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 https://issues.apache.org/jira/browse/FLINK-17855
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372038
问题三:作业因为异常restart后,频繁OOM
目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os
kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。
如果单纯heap的状态后台,作业restart不会出现这样的问题。
有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job
restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。
请问大家是否有什么好的提议或者解决方法?
其中一次系统内核日志如下:
Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit
28672000kB, failcnt 11225
Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit
9007199254740988kB, failcnt 0
Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit
9007199254740988kB, failcnt 0
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice:
cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K
B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1
7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB
swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB
active_file:0KB unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:
cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB
inactive_anon:0KB active_anon:28671760KB inactive_file:4KB
active_file:4KB
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: [ pid ] uid tgid total_vm rss
nr_ptes swapents oom_score_adj name
Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875 253 1
4 0 -998 pause
Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369 421
7 0 -998 bash
Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832 7174316
14500 0 -998 java
Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017 196
6 0 -998 tail
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill
process 26824 (Window(Tumbling) score 4 or sacrifice child
Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java)
total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB
Looking forward to your reply and help.
参考回答:
很早以前遇到这个问题, standalone 模式下 metaspace 释放不掉, 感觉是一个比较严重的 bug
https://issues.apache.org/jira/browse/FLINK-11205 这边有过讨论
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371862
问题四:Flink job不定期就会重启,版本是1.9
Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?
2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.891 [flink-metrics-16] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 2. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException
is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:745) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException
is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) ... 9 common frames omitted
参考回答:
从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371674
问题五:flink1.9读取阿里Mq问题
flink1.9读取阿里RocketMQ 如何设置AccessKey,SecretKey 参数
finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();
参考回答:
社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink
你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。