Flink执行问题之执行checkpoint失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:怎样实现超过一定时间没有收到消息就发出报警的功能?

有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。

比如

robot1 2020-11-11 12:00:00 msginfo

之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢?

flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?

我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。

这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。

我们必须 按 robotId 做 keyBy*来自志愿者整理的flink邮件归档



参考答案:

我理解这篇文章少介绍了 keyby 的逻辑。 可以keyby(robotId),然后在 processFunction 里面使用 ValueState 存储最近一次 robot 的到达时间, 同时注册一个 20min 的timer来触发检测,在检测时候,取出 ValueState 的值都是同一个 robotId的。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364546?spm=a2c6h.13066369.question.18.6ad26382WOwkLF



问题二:flink-1.11.2 执行checkpoint失败

执行checkpoint失败,报下面的错。

2020-11-12 21:04:56

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.

at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)

at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)

at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)

at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)

at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)*来自志愿者整理的flink邮件归档



参考答案:

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢? *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364545?spm=a2c6h.13066369.question.19.6ad26382Wdbgny



问题三:flink sql 函数FIRST_VALUE调用报错

HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。*来自志愿者整理的flink邮件归档



参考答案:

HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。 *来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364544?spm=a2c6h.13066369.question.20.6ad26382WeYCpQ



问题四:ElasticsearchApiCallBridge相关类构造函数问题

为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。

我还想继承Elasticsearch6ApiCallBridge类。在new

RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。

不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?*来自志愿者整理的flink邮件归档



参考答案:

目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364543?spm=a2c6h.13066369.question.21.6ad26382yMeoE8



问题五:slot数量与并行度的大小关系

我在flink web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:

Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout.

Please make sure that the cluster has enough resources.

是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?*来自志愿者整理的flink邮件归档



参考答案:

Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。

如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。

目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/concepts/flink-architecture.html#task-slots-and-resources

[3]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/parallel.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364542?spm=a2c6h.13066369.question.22.6ad26382E9qwRt

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1天前
|
存储 分布式数据库 Apache
【Flink】Flink的Checkpoint 存在哪里?
【4月更文挑战第19天】【Flink】Flink的Checkpoint 存在哪里?
|
1天前
|
JSON Java API
Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
【2月更文挑战第17天】Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
60 3
|
1天前
|
SQL JSON Java
Flink数据问题之checkpoint数据删除失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
1天前
|
消息中间件 Java Kafka
Flink背压问题之checkpoint超时如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
1天前
|
存储 SQL canal
Flink CDC数据同步问题之同步数据到checkpoint失败如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
1天前
|
存储 数据处理 数据库
Flink CDC里不开启checkPoint有什么影响吗?
【1月更文挑战第23天】【1月更文挑战第112篇】Flink CDC里不开启checkPoint有什么影响吗?
45 6
|
1天前
|
存储 流计算
在Flink CDC中,Checkpoint的清理策略通常有两种设置方式
在Flink CDC中,Checkpoint的清理策略通常有两种设置方式
83 5
|
6月前
|
存储 流计算
Flink Checkpoint所有配置解读
Flink Checkpoint所有配置解读
82 0
|
6月前
|
流计算
解决Flink Checkpoint snapshotState方法无法回调的问题
解决Flink Checkpoint snapshotState方法无法回调的问题
32 0
|
10月前
|
Web App开发 消息中间件 固态存储
Flink Unaligned Checkpoint 在 Shopee 的优化和实践
Tech Lead of Shopee Flink Runtime Team 范瑞,在 Flink Forward Asia 2022 核心技术的分享。
537 0
Flink Unaligned Checkpoint 在 Shopee 的优化和实践

热门文章

最新文章

相关产品

  • 实时计算 Flink版