问题一:flink 1.11 rest api saveppoint接口 异常
在升级了 flink 1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: 在 flink 1.10 时:当请求该接口后,在 flink ui 可以看到 savepoint 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 savepoint 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 但是在flink 1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints 接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?
rest api flink docs 链接:https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints*来自志愿者整理的flink邮件归档
参考答案:
开启 unalign checkpoint 的情况下,如果有 checkpoint 正在做的话,那么 savepoint 会等待的[1],但是把
unaligned checkpoint 关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗?
[1] https://issues.apache.org/jira/browse/FLINK-17342*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371555?spm=a2c6h.12873639.article-detail.7.29d04378ApxdqJ
问题二:flink消费kafka提交偏移量
各位大佬,我自己使用flink1.11时,消费kafka数据源时候,使用group offset,在外部的kafka tool发现offset没有及时提交,请问下这个在flink中怎么保证呢*来自志愿者整理的flink邮件归档
参考答案:
可以参考下这个:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/
kafka.html#kafka-consumers-offset-committing-behaviour-configuration*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371553?spm=a2c6h.12873639.article-detail.8.29d04378ApxdqJ
问题三:StatementSet 里添加多个insertsql执行
大家好,我发现StatementSet 里添加2个insertsql执行的时候会启动两个application,
这两个任务除了sink都是一样的吗?这样是不是会重复计算和浪费资源,而且两边的数据可能不同步,
有什么办法能解决?
谢谢*来自志愿者整理的flink邮件归档
参考答案:
StatementSet 中的多个insert会被编译成一个job提交。
你能提供一下对应的代码样例吗?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371536?spm=a2c6h.12873639.article-detail.9.29d04378ApxdqJ
问题四:关于window过程中barrier的问题
大家好,想请教一个关于barrier的问题
如果我有如下算子
.window()
.reduce()
假设barrier和元素顺序是
tuple 和 barrier
当tuple流经窗口期,barrier是会阻塞等待tuple从窗口流出之后再达到下一个算子,还是不在窗口阻塞,直接流到下一个算子呢?*来自志愿者整理的flink邮件归档
参考答案:
barrier 是和 checkpoint 相关的逻辑,用来触发 checkpoint 的,你可以认为 barrier
和数据的顺序必须是严格保证的,不然没法保证 exactlyonce 的语义。
假设某个算子 B 有两个上游 A1 和 A2,那么 A1 和 A2 的 barrier 都发送的 B 之后,B 才会开始做
checkpoint,假设 A1 的 barrier 在 10:00 到了,A2 的 barrier 在 10:01 才到,那么 10:00 -
10:01 这段时间内,A1 发送到 B 的数据是否会被处理取决于是 exactlyonce,还是 at least once。如果是 exactly
once 语义,则不会处理(堆积在 B 这里),如果是 at least once 语义则会处理并且发送到下游。
另外也可以阅读一下社区相关的文档[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#how-does-state-snapshotting-work*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371534?spm=a2c6h.12873639.article-detail.10.29d04378ApxdqJ
问题五:flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗
您好,请教一个问题,谢谢: 很简单的json, {"num":100,"ts":1595949526874,"vin":"DDDD"} {"num":200,"ts":1595949528874,"vin":"AAAA"} {"num":200,"ts":1595949530880,"vin":"CCCC"} {"num":300,"ts":1595949532883,"vin":"CCCC"} {"num":100,"ts":1595949534888,"vin":"AAAA"} {"num":300,"ts":1595949536892,"vin":"DDDD"} 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 public class FlinkKafka { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + " ts BIGINT,\n" + " num INT ,\n" + " vin STRING ,\n" + " pts AS PROCTIME() , \n" + //处理时间 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'kkb',\n" + " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + " 'properties.group.id' = 'mm',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset' \n" + ")"; tableEnv.executeSql(kafkaSourceTable);
String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
windowAllTable.printSchema(); tableEnv.toAppendStream(windowAllTable, Row.class).print(); System.out.println("------------------------------------------------------"); env.execute("job");
}
}
请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 打印结果: root |-- ts: BIGINT |-- num: INT |-- vin: STRING |-- pts: TIMESTAMP(3) NOT NULL PROCTIME |-- rowtime: TIMESTAMP(3) ROWTIME
11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43
但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 版本是flink1.11.0
望指教,谢谢!*来自志愿者整理的flink邮件归档
参考答案:
你指定时间语义是EventTime了吗 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/371533?spm=a2c6h.12873639.article-detail.11.29d04378ApxdqJ