Flink问题之应用程序重启如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复怎么办?


我遇到的问题现象是这样的

1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 flink run -d -s hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 -t yarn-per-job -m yarn-cluser -D yarn.application.name=xxxx /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod 2、flink-conf.xml state.checkpoints.dir: hdfs:///user/flink/checkpoints/default 3、代码checkpoint设置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10)); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.enableCheckpointing(1 * 60 * 1000); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setTolerableCheckpointFailureNumber(100); checkpointConfig.setCheckpointTimeout(60 * 1000); checkpointConfig.setMaxConcurrentCheckpoints(1); 4、问题现象 a)运维同事切换yarn resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 restore的。 d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction sourceMilApplysLogStream = MySQLSource. builder() 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? 2021-05-24 16:49:50,398 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.savepoint.path, hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。*来自志愿者整理的flink邮件归档


参考回答:

这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?

1、从savepoint恢复;

2、作业开始定期做savepoint;

3、作业failover。

如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。

如果还是有问题,需要通过日志来排查了。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359741


问题二:flink 1.13 k8s native 启动找不到 KubernetesSessionClusterEntrypoint怎么办


最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
ERROR org.apache.flink.runtime.blob.BlobServerConnection -Error while
excuting Blob connection
.
.
.
org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
:Adjusted frame length exceeds 10485760: 1347375960 -discarded
上面的报错信息,刚开始知道其中一个报错的时候,怀疑是状态过大导致的。但是后来又有另外一个程序报同样的错误,现在不确定是什么问题导致,请问大佬这个可能是什么原因*来自志愿者整理的flink邮件归档


参考回答:

有些程序挂掉了,有些没有。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359748


问题三:关于 flinksql 维表的问题?


我想实现将MySQL中的 A 表数据预先查询出来进行缓存,用于给流表 B 进行 join关联。接下来定时查询并更新 A 表内的缓存数据,请问目前 FlinkSQL 可以实现吗?*来自志愿者整理的flink邮件归档


参考回答:

实现肯定可以实现,不过这个周期性加载 Mysql 并更新的 Cache 的功能,可能需要你自己定制化开发下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359785?spm=a2c6h.13148508.0.0.5a844f0erUZ87h


问题四:flink sql cdc并行度怎么做?


flink sql作业:消费mysql binlog将数据同步到 mongodb 问题: 1. mysql-cdc connector只能设置成一个并行度吗? 2. 可以增大mongodb的sink并行度吗?可以的话,要如何设置?它保证主键相同的记录会发到同一个分区sink吗?*来自志愿者整理的flink邮件归档


参考回答:

mysql-cdc connector只能设置一个并行度,主要可能有这些原因:

  1. mysql binlog本质上是一个文件,多个并行度消费需要避免重复
  2. 多个并行度消费难以保证顺序

sink可以设为多个并行度,但是顺序不一定,如果需要主键相同的记录发到同一个sink线程可以先做一个keyby,并且保证keyby并行度与sink并行度相同,这样基本上能够保证数据forward传输,不过也不能100%保证有序。

如果需要保证有序还是建议sink并行度为1


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359786?spm=a2c6h.13148508.0.0.5a844f0erUZ87h


问题五:flink job exception有问题求教?


各位好: 我是flink的初学者。 今天在flink web UI 和后台的job 管理页面 发现很多

exception: ......

11:29:30.107 [flink-akka.actor.default-dispatcher-41] ERROR

org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler -

Exception occurred in REST handler: Job 16c614ab0d6f5b28746c66f351fb67f8

not found

......

此时,登录flink web UI,  在"completed jobs"页面

找不到任何job的历史信息,但是当初提交job的时候 是能看到这些job 信息的。

环境信息:

flink: 1.12.4 for windows

启动flink和执行flink 作业使用的是 1.9.3版本的start-cluster.bat, flink.bat

我的疑问是:flink 是否有定期清理历史job的功能?

    如果有,在哪里(通过命令行或者配置文件)可以配置相关的参数 ?

    如果没有,这些错误信息是否正常 ?怎样解决这个问题 ?

Thanks,*来自志愿者整理的flink邮件归档


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359787

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
SQL Java 关系型数据库
Flink问题之隔断时间重启如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
34 3
|
2月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
55 3
|
4月前
|
Kubernetes 流计算 容器
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的。
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的。【1月更文挑战第22天】【1月更文挑战第106篇】
61 1
|
2月前
|
Kubernetes 流计算 Perl
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
37 7
|
2月前
|
Kubernetes 网络协议 Java
在Kubernetes上运行Flink应用程序时
【2月更文挑战第27天】在Kubernetes上运行Flink应用程序时
38 10
|
2月前
|
Java 流计算
Flink任务管理器(TaskManager)在凌晨重启,可能是由于以下几种原因
【2月更文挑战第16天】Flink任务管理器(TaskManager)在凌晨重启,可能是由于以下几种原因
48 2
|
3月前
|
关系型数据库 MySQL 数据库
Flink CDC数据同步问题之用savepoint重启任务报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
3月前
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
42 3
|
5月前
|
NoSQL Java 关系型数据库
Flink 动态更新配置,不需要重启作业
Flink 动态更新配置,不需要重启作业
91 1
|
5月前
|
Kubernetes Java 流计算
在Kubernetes上运行Flink应用程序时,你可以使用Flink Kubernetes Client提供的命令来提交作业
在Kubernetes上运行Flink应用程序时,你可以使用Flink Kubernetes Client提供的命令来提交作业
45 6

相关产品

  • 实时计算 Flink版