Flink接口问题之接口异常如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11 rest api saveppoint接口 异常

在升级了 flink 1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: 在 flink 1.10 时:当请求该接口后,在 flink ui 可以看到 savepoint 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 savepoint 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 但是在flink 1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints 接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?

rest api flink docs 链接:https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints*来自志愿者整理的flink邮件归档



参考答案:

开启 unalign checkpoint 的情况下,如果有 checkpoint 正在做的话,那么 savepoint 会等待的[1],但是把

unaligned checkpoint 关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗?

[1] https://issues.apache.org/jira/browse/FLINK-17342*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371555?spm=a2c6h.12873639.article-detail.7.29d04378ApxdqJ



问题二:flink消费kafka提交偏移量

各位大佬,我自己使用flink1.11时,消费kafka数据源时候,使用group offset,在外部的kafka tool发现offset没有及时提交,请问下这个在flink中怎么保证呢*来自志愿者整理的flink邮件归档



参考答案:

可以参考下这个:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/

kafka.html#kafka-consumers-offset-committing-behaviour-configuration*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371553?spm=a2c6h.12873639.article-detail.8.29d04378ApxdqJ



问题三:StatementSet 里添加多个insertsql执行

大家好,我发现StatementSet 里添加2个insertsql执行的时候会启动两个application,

这两个任务除了sink都是一样的吗?这样是不是会重复计算和浪费资源,而且两边的数据可能不同步,

有什么办法能解决?

谢谢*来自志愿者整理的flink邮件归档



参考答案:

StatementSet 中的多个insert会被编译成一个job提交。

你能提供一下对应的代码样例吗?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371536?spm=a2c6h.12873639.article-detail.9.29d04378ApxdqJ



问题四:关于window过程中barrier的问题

大家好,想请教一个关于barrier的问题

如果我有如下算子

.window()

.reduce()

假设barrier和元素顺序是

tuple 和 barrier

当tuple流经窗口期,barrier是会阻塞等待tuple从窗口流出之后再达到下一个算子,还是不在窗口阻塞,直接流到下一个算子呢?*来自志愿者整理的flink邮件归档



参考答案:

barrier 是和 checkpoint 相关的逻辑,用来触发 checkpoint 的,你可以认为 barrier

和数据的顺序必须是严格保证的,不然没法保证 exactlyonce 的语义。

假设某个算子 B 有两个上游 A1 和 A2,那么 A1 和 A2 的 barrier 都发送的 B 之后,B 才会开始做

checkpoint,假设 A1 的 barrier 在 10:00 到了,A2 的 barrier 在 10:01 才到,那么 10:00 -

10:01 这段时间内,A1 发送到 B 的数据是否会被处理取决于是 exactlyonce,还是 at least once。如果是 exactly

once 语义,则不会处理(堆积在 B 这里),如果是 at least once 语义则会处理并且发送到下游。

另外也可以阅读一下社区相关的文档[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#how-does-state-snapshotting-work*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371534?spm=a2c6h.12873639.article-detail.10.29d04378ApxdqJ



问题五:flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

您好,请教一个问题,谢谢: 很简单的json, {"num":100,"ts":1595949526874,"vin":"DDDD"} {"num":200,"ts":1595949528874,"vin":"AAAA"} {"num":200,"ts":1595949530880,"vin":"CCCC"} {"num":300,"ts":1595949532883,"vin":"CCCC"} {"num":100,"ts":1595949534888,"vin":"AAAA"} {"num":300,"ts":1595949536892,"vin":"DDDD"} 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。 public class FlinkKafka { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" + " ts BIGINT,\n" + " num INT ,\n" + " vin STRING ,\n" + " pts AS PROCTIME() , \n" + //处理时间 " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), \n " + " WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'kkb',\n" + " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" + " 'properties.group.id' = 'mm',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'latest-offset' \n" + ")"; tableEnv.executeSql(kafkaSourceTable);

String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"; final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);

windowAllTable.printSchema(); tableEnv.toAppendStream(windowAllTable, Row.class).print(); System.out.println("------------------------------------------------------"); env.execute("job");

}

}


请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable group by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)" 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。 打印结果: root |-- ts: BIGINT |-- num: INT |-- vin: STRING |-- pts: TIMESTAMP(3) NOT NULL PROCTIME |-- rowtime: TIMESTAMP(3) ROWTIME


11> 1595949629063,500,AAAA,2020-07-28T15:20:29.066,2020-07-28T23:20:29 7> 1595949627062,500,BBBB,2020-07-28T15:20:27.101,2020-07-28T23:20:27 7> 1595949631067,100,EEEE,2020-07-28T15:20:31.071,2020-07-28T23:20:31 12> 1595949633072,500,BBBB,2020-07-28T15:20:33.077,2020-07-28T23:20:33 11> 1595949637081,400,EEEE,2020-07-28T15:20:37.085,2020-07-28T23:20:37 2> 1595949635077,400,BBBB,2020-07-28T15:20:35.082,2020-07-28T23:20:35 11> 1595949639085,100,EEEE,2020-07-28T15:20:39.089,2020-07-28T23:20:39 1> 1595949643093,200,CCCC,2020-07-28T15:20:43.096,2020-07-28T23:20:43

但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。 版本是flink1.11.0

望指教,谢谢!*来自志愿者整理的flink邮件归档



参考答案:

你指定时间语义是EventTime了吗 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371533?spm=a2c6h.12873639.article-detail.11.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
SQL 存储 资源调度
实时计算 Flink版产品使用合集之遇到客户端启动异常,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
5月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之整库同步mysql到starRock提交任务异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL Java 数据库
实时计算 Flink版产品使用问题之Spring Boot集成Flink可以通过什么方式实现通过接口启动和关闭Flink程序
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Prometheus Cloud Native 关系型数据库
实时计算 Flink版操作报错合集之实时计算 Flink版操作报错合集之当从保存点恢复并添加新的表时,出现了org.apache.flink.util.FlinkRuntimeException异常,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
监控 关系型数据库 MySQL
Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)
Flink实现实时异常登陆监控(两秒内多次登陆失败进行异常行为标记)
107 1
|
7月前
|
消息中间件 Kafka 数据库
实时计算 Flink版操作报错之遇到UnsupportedOperationException异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
SQL Kubernetes Java
实时计算 Flink版产品使用合集之遇到清空表数据遇到异常,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之CDC任务在异常后整个record sent从0初始化开始,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
580 0
|
6月前
|
资源调度 Kubernetes Java
实时计算 Flink版产品使用问题之全量快照初始化时,如果任务异常自动从ck restore后,会从上次binlog断点续传吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版