概述
Flink的exactly-once语义实现是需要依赖checkpoint的,对于一个有状态的Flink任务来说如果想要在任务发生failover,或者手动重启任务的时候任务的状态不丢失是必须要开启checkpoint的,今天这篇文章主要分享一下Flink on zeppelin里面怎么设置checkpoint以及怎么从指定的checkpoint恢复任务.
checkpoint配置
%flink.conf // 设置任务使用的时间属性是eventtime pipeline.time-characteristic EventTime // 设置checkpoint的时间间隔 execution.checkpointing.interval 10000 // 确保检查点之间的间隔 execution.checkpointing.min-pause 60000 // 设置checkpoint的超时时间 execution.checkpointing.timeout 60000 // 设置任务取消后保留hdfs上的checkpoint文件 execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
只需要像上面这样就可以配置任务的checkpoint了,当然除了配置这些参数外也可以设置任务的jm,tm等参数,Interpreters 设置里面的很多参数(比如下图的这些)都是可以在note里面直接执行的,然后在这个note里面的任务都会使用刚才的配置.
在执行这些配置的时候需要先把Interpreters 重启一下,因为当Interpreters 进程已启动时就无法更改Interpreters 的属性了,否则会遇到下面的报错
java.io.IOException: Can not change interpreter properties when interpreter process has already been launched at org.apache.zeppelin.interpreter.InterpreterSetting.setInterpreterGroupProperties(InterpreterSetting.java:957) at org.apache.zeppelin.interpreter.ConfInterpreter.interpret(ConfInterpreter.java:73) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:458) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:72) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
创建kafak流表
然后来一个非常简单的聚合查询,为了体现任务是从状态中恢复的,所以我们这里根据name分组计算了一个count(age)的操作
这里我先写入了10条数据,jason和spark都是5条,然后先来看一下HDFS上的checkpoint文件是否生成了.
可以看到此时任务已经完成了4次checkpoint了,这时候把任务cancel掉,然后从上一次成功的checkpoint处恢复任务,也就是第4个checkpoint的地方.
%flink.conf execution.savepoint.path hdfs://master:9000//flink-rockdb/checkpoints/904d58cbd50821f42f33ffed91989ba0/chk-4
需要指定刚才任务的checkpoint路径,先执行上面的语句,然后再接着执行刚才的那条SQL就可以了.
从上面的图可以很清楚的看到,任务是从第4个checkpoint处恢复计算的,然后再来往kafka中写入10条数据看下结果是否正确.
可以看到结果是正确的,是从刚才的状态里面的值接着计算的,说明任务从checkpoint里面恢复成功了.整个过程是不需要写任何代码的,只需要在note里面执行几行配置就可以了,使用起来还是非常方便的.
这篇文章主要是介绍Flink on zeppelin如何设置任务的checkpoint,以及任务重启的时候怎么从指定的checkpoint位置恢复任务.