Flink接口问题之接口异常如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11 rest api saveppoint接口 异常

在升级了 flink 1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: 在 flink 1.10 时:当请求该接口后,在 flink ui 可以看到 savepoint 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 savepoint 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 但是在flink 1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints 接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?

rest api flink docs 链接:https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints*来自志愿者整理的flink邮件归档



参考答案:

开启 unalign checkpoint 的情况下,如果有 checkpoint 正在做的话,那么 savepoint 会等待的[1],但是把

unaligned checkpoint 关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗?

[1] https://issues.apache.org/jira/browse/FLINK-17342*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371555?spm=a2c6h.12873639.article-detail.7.29d04378ApxdqJ



问题二:flink消费kafka提交偏移量

各位大佬,我自己使用flink1.11时,消费kafka数据源时候,使用group offset,在外部的kafka tool发现offset没有及时提交,请问下这个在flink中怎么保证呢*来自志愿者整理的flink邮件归档



参考答案:

可以参考下这个:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/

kafka.html#kafka-consumers-offset-committing-behaviour-configuration*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371553?spm=a2c6h.12873639.article-detail.8.29d04378ApxdqJ



问题三:StatementSet 里添加多个insertsql执行

大家好,我发现StatementSet 里添加2个insertsql执行的时候会启动两个application,

这两个任务除了sink都是一样的吗?这样是不是会重复计算和浪费资源,而且两边的数据可能不同步,

有什么办法能解决?

谢谢*来自志愿者整理的flink邮件归档



参考答案:

StatementSet 中的多个insert会被编译成一个job提交。

你能提供一下对应的代码样例吗?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371536?spm=a2c6h.12873639.article-detail.9.29d04378ApxdqJ



问题四:关于window过程中barrier的问题

大家好,想请教一个关于barrier的问题

如果我有如下算子

.window()

.reduce()

假设barrier和元素顺序是

tuple 和 barrier

当tuple流经窗口期,barrier是会阻塞等待tuple从窗口流出之后再达到下一个算子,还是不在窗口阻塞,直接流到下一个算子呢?*来自志愿者整理的flink邮件归档



参考答案:

barrier 是和 checkpoint 相关的逻辑,用来触发 checkpoint 的,你可以认为 barrier

和数据的顺序必须是严格保证的,不然没法保证 exactlyonce 的语义。

假设某个算子 B 有两个上游 A1 和 A2,那么 A1 和 A2 的 barrier 都发送的 B 之后,B 才会开始做

checkpoint,假设 A1 的 barrier 在 10:00 到了,A2 的 barrier 在 10:01 才到,那么 10:00 -

10:01 这段时间内,A1 发送到 B 的数据是否会被处理取决于是 exactlyonce,还是 at least once。如果是 exactly

once 语义,则不会处理(堆积在 B 这里),如果是 at least once 语义则会处理并且发送到下游。

另外也可以阅读一下社区相关的文档[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#how-does-state-snapshotting-work*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371534?spm=a2c6h.12873639.article-detail.10.29d04378ApxdqJ



问题五:flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

您好,请教一个问题,谢谢: 很简单的json, {"num":100,"ts":1595949526874,"vin":"DDDD"} {"num":200,"ts":1595949528874,"vin":"AAAA"} {"num":200,"ts":1595949530880,"vin":"CCCC"} {"num":300,"ts":1595949532883,"vin":"CCCC"} {"num":100,"ts":1595949534888,"vin":"AAAA"} {"num":300,"ts":1595949536892,"vin":"DDDD"} 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 public class FlinkKafka { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + " ts BIGINT,\n" + " num INT ,\n" + " vin STRING ,\n" + " pts AS PROCTIME() , \n" + //处理时间 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'kkb',\n" + " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + " 'properties.group.id' = 'mm',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset' \n" + ")"; tableEnv.executeSql(kafkaSourceTable);

String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);

windowAllTable.printSchema(); tableEnv.toAppendStream(windowAllTable, Row.class).print(); System.out.println("------------------------------------------------------"); env.execute("job");

}

}


请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 打印结果: root |-- ts: BIGINT |-- num: INT |-- vin: STRING |-- pts: TIMESTAMP(3) NOT NULL PROCTIME |-- rowtime: TIMESTAMP(3) ROWTIME


11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43

但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 版本是flink1.11.0

望指教,谢谢!*来自志愿者整理的flink邮件归档



参考答案:

你指定时间语义是EventTime了吗 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371533?spm=a2c6h.12873639.article-detail.11.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
Flink数据源问题之转换异常如何解决
Flink数据源是指Apache Flink用于读取外部系统数据的接口或组件;本合集将探讨Flink数据源的类型、配置方法和最佳实践,以及在使用数据源时可能遇到的错误和解决方案。
40 2
|
2月前
|
canal SQL 关系型数据库
flink cdc 提交问题之提交任务异常如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
Oracle 关系型数据库 Java
Flink CDC产品常见问题之Flink CDC 使用jar包启动异常如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
SQL JSON Apache
Flink问题之嵌套 json 中string 数组的解析异常如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
225 1
|
3月前
|
SQL 消息中间件 Kafka
flink问题之作业执行异常如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
85 2
|
4月前
|
关系型数据库 MySQL 数据库
使用Flink同步MySQL到doris数据库中,时间格式异常
当我完成同步时,我发现Doris数据库中的所有时间格式都变为了如此:1970-02-01 11:00:00.000000,在所有时间格式后面追加了一个点和6个零,这是怎样造成的,该如何解决这个问题
|
9月前
|
SQL 消息中间件 关系型数据库
使用Flink SQL实现对连续的异常告警进行合并
使用Flink SQL实现对连续的异常告警进行合并
278 40
|
流计算
Flink - 新增 BroadcastStream 无 watermark 导致数据流异常
Flink 任务新增 BroadcastStream 无 watermark 导致数据流异常,修复问题并熟悉单流,双流 Watermark 机制。
492 0
Flink - 新增 BroadcastStream 无 watermark 导致数据流异常
|
监控 Java 分布式数据库
Flink/Hbase - Sink 背压100% 与 hbase.util.RetryCounter.sleepUntilNextRetry 异常分析与排查
Flink-hbase 任务 hbase.util.RetryCounter.sleepUntilNextRetry 堆栈问题分析与排查。
391 0
Flink/Hbase - Sink 背压100% 与 hbase.util.RetryCounter.sleepUntilNextRetry 异常分析与排查
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
597 5

相关产品

  • 实时计算 Flink版