如何做倒flink服务器重启后,所有之前进行中的job自动恢复,且checkpoint读取数据继续执行。
我目前flink部署在单台服务器上
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
为了确保Flink服务器重启后,所有之前进行中的Job能够自动恢复并从最近的Checkpoint继续执行,您需要采取以下措施:
配置Checkpoint:
flink-conf.yaml
)中设置相关参数实现,例如:
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
execution.checkpointing.interval: 5000 # 每5秒做一次Checkpoint,根据实际需求调整
使用High Availability (HA)模式部署Flink:
配置JobManager的高可用性:
flink-conf.yaml
中配置ZooKeeper或其它协调服务,用于选举和管理JobManager的状态。
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-host1:2181,zk-host2:2181,zk-host3:2181
自定义Linux服务以自动重启Flink JobManager和TaskManager:
ecs-tool-servicekeepalive
来创建一个systemd service,确保Flink的JobManager和TaskManager进程在服务器重启后能自动启动。配置类似于:
sudo acs-plugin-manager -exec -plugin ecs-tool-servicekeepalive -params "start,'./bin/start-cluster.sh'"
start-cluster.sh
)正确地初始化和启动了Flink集群。通过上述步骤,当您的Flink服务器因任何原因重启后,JobManager会自动从高可用存储中恢复最新的Checkpoint,并重新启动所有TaskManager,从而使得所有之前进行中的Job能够自动恢复并继续执行。请注意,实施这些更改前,请充分测试以避免生产环境中的意外中断。