问题一:怎样实现超过一定时间没有收到消息就发出报警的功能?
有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
比如
robot1 2020-11-11 12:00:00 msginfo
之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢?
flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
我们必须 按 robotId 做 keyBy*来自志愿者整理的flink邮件归档
参考答案:
我理解这篇文章少介绍了 keyby 的逻辑。 可以keyby(robotId),然后在 processFunction 里面使用 ValueState 存储最近一次 robot 的到达时间, 同时注册一个 20min 的timer来触发检测,在检测时候,取出 ValueState 的值都是同一个 robotId的。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364546?spm=a2c6h.13066369.question.18.6ad26382WOwkLF
问题二:flink-1.11.2 执行checkpoint失败
执行checkpoint失败,报下面的错。
2020-11-12 21:04:56
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)*来自志愿者整理的flink邮件归档
参考答案:
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. 顺着这个问个问题。 检查点“超时”是否计算进入checkpoint failure呢? *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364545?spm=a2c6h.13066369.question.19.6ad26382Wdbgny
问题三:flink sql 函数FIRST_VALUE调用报错
HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。*来自志愿者整理的flink邮件归档
参考答案:
HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。 *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364544?spm=a2c6h.13066369.question.20.6ad26382WeYCpQ
问题四:ElasticsearchApiCallBridge相关类构造函数问题
为啥ElasticsearchApiCallBridge接口实现类的构造函数都不是Public。
我还想继承Elasticsearch6ApiCallBridge类。在new
RestHightLevelClient之前添加账号密码认证功能,即实现一个支持账号密码的子类。
不加Public 子类就必须得和父类一个包名了。ElasticsearchApiCallBridge的实现类为什么这么设计呢?*来自志愿者整理的flink邮件归档
参考答案:
目前在 master 分支已经支持了,可以去看看 flink-connector-es7 的源码*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364543?spm=a2c6h.13066369.question.21.6ad26382yMeoE8
问题五:slot数量与并行度的大小关系
我在flink web面板上提交了1个job,job的并行度为15,flink集群slot总数为12,发现任务一直在created阶段等待,一段时间后报错:
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout.
Please make sure that the cluster has enough resources.
是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?*来自志愿者整理的flink邮件归档
参考答案:
Flink 的调度策略会保证一个job需要的slot数恰好等于该job所有算子的最大并行度。
如果slot数量小于算子的最大并行度,则该job无法执行。可以参考[1][2]中的文档描述。
目前没有方法让flink自动选择可用slot数量作为并行度,但可以通过[3]中的几种方法来设置。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/internals/job_scheduling.html
[2]
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/parallel.html
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/364542?spm=a2c6h.13066369.question.22.6ad26382E9qwRt