问题一:Flink sql 主动使数据延时一段时间有什么方案
我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
FLink sql有什么方案实现吗?
参考回答:
我们也遇到过类似场景。
如果你的数据里面有事件时间,可以写个udf来判断下,如果事件时间-当前时间 小于某个阈值,可以sleep一下。
如果没有事件时间,那就不太好直接搞了,我们是自己搞了一个延迟维表,就是保证每条数据进到维表join算子后等固定时间后再去join。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370136
问题二:Flink SQL 关键字 user?
我发现我创建表 字段名为 user会报错。 user是关键字吗,还是其他原因
"CREATE TABLE ods_foo (\n" +
" id INT,\n" +
" user ARRAY<ROW >\n" +
") WITH (
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "user" at line 3, column 5.
Was expecting one of:
参考回答:
是的,user是关键字,关键字列表可以参考[1].
如果遇到关键字,可以使用 ` 来处理,比如:
CREATE TABLE user
(...) WITH (...);
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/#reserved-keywords
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370135
问题三:flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空
可以通过以下步骤还原车祸现场: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }
代码Problem2.java: package com.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;
/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable3 (\n" + " action STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'false',\n" + " 'format.json-schema' = '{"type": "object", "properties": {"action": {"type": "string"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource);
Table table = bsEnv.sqlQuery("select * from actionTable3"); // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(action
)) as T(word
)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空
bsEnv.execute("ARRAY tableFunction Problem"); } }
参考回答:
这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?
[1] https://issues.apache.org/jira/browse/FLINK-18002
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370134
问题四:【Flink的shuffle mode】
看Flink源码时候,在应用中使用keyBy后,源码的transformations会有shuffle mode方法,这个shuffle mode看是UNDEFINED的。
那么,shuffle mode有哪些方式?在应用里面可以设置么?
参考回答:
现在就两种:pipeline和batch
batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。
理论上可以per transformation的来设置,see PartitionTransformation.
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370133
问题五:Flink job不定期就会重启,版本是1.9
Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?
2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.891 [flink-metrics-16] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 2. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException
is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:745) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException
is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) ... 9 common frames omitted
参考回答:
从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370132