Flink在流上最大的特点,就是引入全局snapshot,
CheckpointCoordinator
做snapshot的核心组件为, CheckpointCoordinator
/** * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. * * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone * implementations don't support any recovery. */ public class CheckpointCoordinator { /** Tasks who need to be sent a message when a checkpoint is started */ private final ExecutionVertex[] tasksToTrigger; //需要触发checkpoint的tasks /** Tasks who need to acknowledge a checkpoint before it succeeds */ private final ExecutionVertex[] tasksToWaitFor; /** Tasks who need to be sent a message when a checkpoint is confirmed */ private final ExecutionVertex[] tasksToCommitTo; /** Map from checkpoint ID to the pending checkpoint */ private final Map<Long, PendingCheckpoint> pendingCheckpoints; /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; //用于记录已经完成的checkpoints /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */ private final ArrayDeque<Long> recentPendingCheckpoints; /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these * need to be ascending across job managers. */ protected final CheckpointIDCounter checkpointIdCounter; //保证产生递增的checkpoint id,即使当jobmanager crash,也有保证全局checkpoint id是递增的 /** The base checkpoint interval. Actual trigger time may be affected by the * max concurrent checkpoints and minimum-pause values */ private final long baseInterval; //触发checkpoint的时间间隔 /** The max time (in ms) that a checkpoint may take */ private final long checkpointTimeout; //一次checkpoint消耗的最大时间,超过,我们就可以认为该checkpoint超时失败 /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to * enforce minimum processing time between checkpoint attempts */ private final long minPauseBetweenCheckpoints; //checkpoint之间的最小间隔 /** The maximum number of checkpoints that may be in progress at the same time */ private final int maxConcurrentCheckpointAttempts; //最多同时存在多少checkpoint /** Actor that receives status updates from the execution graph this coordinator works for */ private ActorGateway jobStatusListener; /** The number of consecutive failed trigger attempts */ private int numUnsuccessfulCheckpointsTriggers; private ScheduledTrigger currentPeriodicTrigger; /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint. * Non-volatile, because only accessed in synchronized scope */ private boolean periodicScheduling; /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only * accessed in synchronized scope */ private boolean triggerRequestQueued; /** Flag marking the coordinator as shut down (not accepting any messages any more) */ private volatile boolean shutdown; //注意是volatile,保证可见性 /** Shutdown hook thread to clean up state handles. */ private final Thread shutdownHook; /** Helper for tracking checkpoint statistics */ private final CheckpointStatsTracker statsTracker; public CheckpointCoordinator( JobID job, long baseInterval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpointAttempts, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, CheckpointStatsTracker statsTracker) throws Exception { checkpointIDCounter.start(); //开启CheckpointIDCounter this.timer = new Timer("Checkpoint Timer", true); this.statsTracker = checkNotNull(statsTracker); if (recoveryMode == RecoveryMode.STANDALONE) { // 如果是standalone模式,需要加上shutdownHook来清理state // Add shutdown hook to clean up state handles when no checkpoint recovery is // possible. In case of another configured recovery mode, the checkpoints need to be // available for the standby job managers. this.shutdownHook = new Thread(new Runnable() { @Override public void run() { try { CheckpointCoordinator.this.shutdown(); //显示的调用shutdown } catch (Throwable t) { LOG.error("Error during shutdown of checkpoint coordinator via " + "JVM shutdown hook: " + t.getMessage(), t); } } }); try { // Add JVM shutdown hook to call shutdown of service Runtime.getRuntime().addShutdownHook(shutdownHook); } catch (IllegalStateException ignored) { // JVM is already shutting down. No need to do anything. } catch (Throwable t) { LOG.error("Cannot register checkpoint coordinator shutdown hook.", t); } } else { this.shutdownHook = null; } }
CheckpointIDCounter
有两种,
StandaloneCheckpointIDCounter
这种case下的,counter,只是用AtomicLong来是实现的,那JobManager如果挂了,那这个值可能是丢了的,重启后,应该是无法保证递增的
但这里说,在standalone的情况下,不需要做recovery,所以这个是可以接受的
/** * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. * * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not * recoverable in this recovery mode. */ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter { private final AtomicLong checkpointIdCounter = new AtomicLong(1); @Override public void start() throws Exception { } @Override public void stop() throws Exception { } @Override public long getAndIncrement() throws Exception { return checkpointIdCounter.getAndIncrement(); } @Override public void setCount(long newCount) { checkpointIdCounter.set(newCount); } }
ZooKeeperCheckpointIDCounter
这种counter用zk的persistent node来保存当前的计数,以保证计数的递增
/** * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. * * <p>Each counter creates a ZNode: * <pre> * +----O /flink/checkpoint-counter/<job-id> 1 [persistent] * . * . * . * +----O /flink/checkpoint-counter/<job-id> N [persistent] * </pre> * * <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. */ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter
CompletedCheckpointStore
接口,用于记录有哪些已经完成的checkpoint
/** * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. */ public interface CompletedCheckpointStore { /** * Recover available {@link CompletedCheckpoint} instances. * * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest * available checkpoint. */ void recover() throws Exception; /** * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. * * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of * retained checkpoints, the oldest one will be discarded via {@link * CompletedCheckpoint#discard(ClassLoader)}. */ void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception; /** * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was * added. */ CompletedCheckpoint getLatestCheckpoint() throws Exception; /** * Discards all added {@link CompletedCheckpoint} instances via {@link * CompletedCheckpoint#discard(ClassLoader)}. */ void discardAllCheckpoints() throws Exception; /** * Returns all {@link CompletedCheckpoint} instances. * * <p>Returns an empty list if no checkpoint has been added yet. */ List<CompletedCheckpoint> getAllCheckpoints() throws Exception; /** * Returns the current number of retained checkpoints. */ int getNumberOfRetainedCheckpoints(); }
看下StandaloneCompletedCheckpointStore,其实就是一个用于记录CompletedCheckpoint的ArrayDeque
class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { /** The completed checkpoints. */ private final ArrayDeque<CompletedCheckpoint> checkpoints; }
ZooKeeperCompletedCheckpointStore,这个就是用zk来记录
/** * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. * * <p>Checkpoints are added under a ZNode per job: * <pre> * +----O /flink/checkpoints/<job-id> [persistent] * . | * . +----O /flink/checkpoints/<job-id>/1 [persistent] * . . . * . . . * . . . * . +----O /flink/checkpoints/<job-id>/N [persistent] * </pre> * * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, * only the latest one is used and older ones are discarded (even if the maximum number * of retained checkpoints is greater than one). * * <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the * same program, it is OK to take any valid successful checkpoint as long as the "history" of * checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoint to circumvent those situations. */ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
做snapshot流程
StreamingJobGraphGenerator
配置checkpoint
private void configureCheckpointing() { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); //取出Checkpoint的配置 if (cfg.isCheckpointingEnabled()) { long interval = cfg.getCheckpointInterval(); //Checkpoint的时间间隔 // collect the vertices that receive "trigger checkpoint" messages. // currently, these are all the sources List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>(); // collect the vertices that need to acknowledge the checkpoint // currently, these are all vertices List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size()); // collect the vertices that receive "commit checkpoint" messages // currently, these are all vertices List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(); for (JobVertex vertex : jobVertices.values()) { if (vertex.isInputVertex()) { //只有对source vertex,才加入triggerVertices,因为只需要在源头触发checkpoint triggerVertices.add(vertex.getID()); } // TODO: add check whether the user function implements the checkpointing interface commitVertices.add(vertex.getID()); //当前所有节点都会加入commitVertices和ackVertices ackVertices.add(vertex.getID()); } JobSnapshottingSettings settings = new JobSnapshottingSettings( //生成JobSnapshottingSettings triggerVertices, ackVertices, commitVertices, interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints()); jobGraph.setSnapshotSettings(settings); //调用setSnapshotSettings // if the user enabled checkpointing, the default number of exec retries is infinitive. int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries(); if(executionRetries == -1) { streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE); } } }
JobManager
submitJob的时候,将JobGraph中的配置,放到ExecutionGraph中去
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { // configure the state checkpointing val snapshotSettings = jobGraph.getSnapshotSettings if (snapshotSettings != null) { val jobId = jobGraph.getJobID() val idToVertex: JobVertexID => ExecutionJobVertex = id => { val vertex = executionGraph.getJobVertex(id) if (vertex == null) { throw new JobSubmissionException(jobId, "The snapshot checkpointing settings refer to non-existent vertex " + id) } vertex } val triggerVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava val ackVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava val confirmVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava val completedCheckpoints = checkpointRecoveryFactory .createCompletedCheckpoints(jobId, userCodeLoader) val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId) executionGraph.enableSnapshotCheckpointing( snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout, snapshotSettings.getMinPauseBetweenCheckpoints, snapshotSettings.getMaxConcurrentCheckpoints, triggerVertices, ackVertices, confirmVertices, context.system, leaderSessionID.orNull, checkpointIdCounter, completedCheckpoints, recoveryMode, savepointStore) } }
ExecutionGraph
创建checkpointCoordinator对象
public void enableSnapshotCheckpointing( long interval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, ActorSystem actorSystem, UUID leaderSessionID, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, StateStore<Savepoint> savepointStore) throws Exception { ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); // disable to make sure existing checkpoint coordinators are cleared disableSnaphotCheckpointing(); if (isStatsDisabled) { checkpointStatsTracker = new DisabledCheckpointStatsTracker(); } else { int historySize = jobConfiguration.getInteger( ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, tasksToWaitFor); } // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( jobID, interval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode, checkpointStatsTracker); // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) registerJobStatusListener( //将checkpointCoordinator的actor注册到jobStatusListenerActors,这样当job状态变化时,可以通知checkpointCoordinator checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
这里看到checkpointCoordinator 作为ExecutionGraph的成员,
接着会异步的提交ExecutionGraph,
// execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { try { if (isRecovery) { executionGraph.restoreLatestCheckpointedState() //恢复CheckpointedState } else { //...... } submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //把jobGraph放到submittedJobGraphs中track } jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) //告诉client,job提交成功 if (leaderElectionService.hasLeadership) { executionGraph.scheduleForExecution(scheduler) //真正的调度executionGraph } else { //...... } } catch { //....... } }(context.dispatcher)
CheckpointCoordinatorDeActivator
/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor { private final CheckpointCoordinator coordinator; private final UUID leaderSessionID; @Override public void handleMessage(Object message) { if (message instanceof ExecutionGraphMessages.JobStatusChanged) { JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus(); if (status == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } // we ignore all other messages } @Override public UUID getLeaderSessionID() { return leaderSessionID; } }
在job状态发生变化时,需要打开或关闭Checkpoint scheduler
CheckpointCoordinator
开启定时startCheckpointScheduler
public void startCheckpointScheduler() { synchronized (lock) { // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; currentPeriodicTrigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); } } private class ScheduledTrigger extends TimerTask { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis()); } catch (Exception e) { LOG.error("Exception while triggering checkpoint", e); } } }
triggerCheckpoint,用于触发一次checkpoint
/** * Triggers a new checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. * @param nextCheckpointId The checkpoint ID to use for this checkpoint or <code>-1</code> if * the checkpoint ID counter should be queried. */ public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) throws Exception { // we will actually trigger this checkpoint! final long checkpointID; if (nextCheckpointId < 0) { try { // this must happen outside the locked scope, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); } catch (Throwable t) { } } else { checkpointID = nextCheckpointId; } //对于没有开始的Checkpoint,称为PendingCheckpoint,传入所有需要ack checkpoint的ackTasks //后续会一个个ack这些tasks,当所有的ackTasks都被acked,PendingCheckpoint就变成CompletedCheckpoint final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); // schedule the timer that will clean up the expired checkpoints,定期去清理过期的checkpoint TimerTask canceller = new TimerTask() { @Override public void run() { try { synchronized (lock) { // only do the work if the checkpoint is not discarded anyways // note that checkpoint completion discards the pending checkpoint object if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint " + checkpointID + " expired before completing."); checkpoint.discard(userClassLoader); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); onCancelCheckpoint(checkpointID); triggerQueuedRequests(); } } } catch (Throwable t) { LOG.error("Exception while handling checkpoint timeout", t); } } }; try { // re-acquire the lock synchronized (lock) { pendingCheckpoints.put(checkpointID, checkpoint); //将该PendingCheckpoint加入列表track timer.schedule(canceller, checkpointTimeout); //并且启动canceller } // end of lock scope // send the messages to the tasks that trigger their checkpoint for (int i = 0; i < tasksToTrigger.length; i++) { ExecutionAttemptID id = triggerIDs[i]; TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp); tasksToTrigger[i].sendMessageToCurrentExecution(message, id); //给所有的需要触发checkpoint的task发送checkpoint message,这里只是source tasks } numUnsuccessfulCheckpointsTriggers = 0; return true; } catch (Throwable t) { } }
---------上面只会给所有的source发checkpoint message,所以下面的流程只有source会走到-----------
TaskManager
sendMessageToCurrentExecution,发送的message最终会被TaskManager收到,
/** * Handler for messages related to checkpoints. * * @param actorMessage The checkpoint message. */ private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { case message: TriggerCheckpoint => //如果是triggerCheckpoint val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp val task = runningTasks.get(taskExecutionId) //从runningTasks中取出真正执行的task if (task != null) { task.triggerCheckpointBarrier(checkpointId, timestamp) //最终是调用task的triggerCheckpointBarrier } case message: NotifyCheckpointComplete => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp val task = runningTasks.get(taskExecutionId) if (task != null) { task.notifyCheckpointComplete(checkpointId) //调用task的notifyCheckpointComplete } else { log.debug( s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.") } // unknown checkpoint message case _ => unhandled(actorMessage) } }
Task
public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) { AbstractInvokable invokable = this.invokable; if (executionState == ExecutionState.RUNNING && invokable != null) { if (invokable instanceof StatefulTask) { // build a local closure final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable; final String taskName = taskNameWithSubtask; Runnable runnable = new Runnable() { @Override public void run() { try { statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp); //关键就是调用statefulTask的triggerCheckpoint,这个时候task正在执行,所以checkpoint是并行做的 } catch (Throwable t) { failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t)); } } }; executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName); } } }
StreamTask
StreamTask就是实现了StatefulTask
所以最终调用到,
StreamTask.triggerCheckpoint,这里面会实际去做checkpoint工作
调用performCheckpoint(checkpointId, timestamp)
protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception { synchronized (lock) { //加锁,checkpoint需要stop world if (isRunning) { // Since both state checkpointing and downstream barrier emission occurs in this // lock scope, they are an atomic operation regardless of the order in which they occur. // Given this, we immediately emit the checkpoint barriers, so the downstream operators // can start their checkpoint work as soon as possible operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); //立即发出barrier,理由如上注释 // now draw the state snapshot final StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); final StreamTaskState[] states = new StreamTaskState[allOperators.length]; boolean hasAsyncStates = false; for (int i = 0; i < states.length; i++) { //根据各个state的类型,判断是否需要异步 StreamOperator<?> operator = allOperators[i]; if (operator != null) { StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); if (state.getOperatorState() instanceof AsynchronousStateHandle) { hasAsyncStates = true; } if (state.getFunctionState() instanceof AsynchronousStateHandle) { hasAsyncStates = true; } if (state.getKvStates() != null) { for (KvStateSnapshot<?, ?, ?, ?, ?> kvSnapshot: state.getKvStates().values()) { if (kvSnapshot instanceof AsynchronousKvStateSnapshot) { hasAsyncStates = true; } } } states[i] = state.isEmpty() ? null : state; } } for (int i = 0; i < states.length; i++) { //为所有的Operator生成snapshot的StreamTaskState StreamOperator<?> operator = allOperators[i]; if (operator != null) { StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); //通过operator.snapshotOperatorState生成StreamTaskState states[i] = state.isEmpty() ? null : state; } } StreamTaskStateList allStates = new StreamTaskStateList(states); //异步或同步的进行checkpoint if (allStates.isEmpty()) { getEnvironment().acknowledgeCheckpoint(checkpointId); } else if (!hasAsyncStates) { //sync方式 this.lastCheckpointSize = allStates.getStateSize(); getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); } else { //async方式 // start a Thread that does the asynchronous materialization and // then sends the checkpoint acknowledge String threadName = "Materialize checkpoint state " + checkpointId + " - " + getName(); AsyncCheckpointThread checkpointThread = new AsyncCheckpointThread( threadName, this, cancelables, states, checkpointId); synchronized (cancelables) { cancelables.add(checkpointThread); } checkpointThread.start(); } return true; } else { return false; } } }
这里是对于source而言的checkpoint的调用逻辑,对于中间节点或sink,是要根据barrier情况,通过onEvent来触发triggerCheckpoint的
StreamTask.triggerCheckpoint最关键的步骤是,会对task中每个operator完成state snapshot
最终生成StreamTaskStateList allStates,保存所有的state的list
最终同步或异步的调用
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
把state snapshot发送到Jobmanager去,后面就看看JobManager怎么处理的
同步的方式比较简单,但是一般都是需要异步的做snapshot的,
看看异步的AsyncCheckpointThread
AsyncCheckpointThread
@Override public void run() { try { for (StreamTaskState state : states) { if (state != null) { if (state.getFunctionState() instanceof AsynchronousStateHandle) { AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState(); state.setFunctionState(asyncState.materialize()); } if (state.getOperatorState() instanceof AsynchronousStateHandle) { AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState(); state.setOperatorState(asyncState.materialize()); } if (state.getKvStates() != null) { Set<String> keys = state.getKvStates().keySet(); HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates(); for (String key: keys) { if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) { AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key); kvStates.put(key, asyncHandle.materialize()); //可以看到把真正的存储,delay到这里的materialize去做 } } } } } StreamTaskStateList allStates = new StreamTaskStateList(states); owner.lastCheckpointSize = allStates.getStateSize(); owner.getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName()); }
RuntimeEnvironment
package org.apache.flink.runtime.taskmanager;
/** * In implementation of the {@link Environment}. */ public class RuntimeEnvironment implements Environment { @Override public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) { // try and create a serialized version of the state handle SerializedValue<StateHandle<?>> serializedState; long stateSize; if (state == null) { serializedState = null; stateSize = 0; } else { try { serializedState = new SerializedValue<StateHandle<?>>(state); } catch (Exception e) { throw new RuntimeException("Failed to serialize state handle during checkpoint confirmation", e); } try { stateSize = state.getStateSize(); } catch (Exception e) { throw new RuntimeException("Failed to fetch state handle size", e); } } AcknowledgeCheckpoint message = new AcknowledgeCheckpoint( jobId, executionId, checkpointId, serializedState, stateSize); jobManager.tell(message); } }
所以可以看到,是把这个ack发送到job manager的,
JobManager
handleCheckpointMessage
/** * Dedicated handler for checkpoint messages. * * @param actorMessage The checkpoint actor message. */ private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { case ackMessage: AcknowledgeCheckpoint => val jid = ackMessage.getJob() currentJobs.get(jid) match { case Some((graph, _)) => val checkpointCoordinator = graph.getCheckpointCoordinator() val savepointCoordinator = graph.getSavepointCoordinator() if (checkpointCoordinator != null && savepointCoordinator != null) { future { //future等待异步的ack消息 try { if (checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { //JobManager收到checkpoint的ack message // OK, this is the common case } else { // Try the savepoint coordinator if the message was not addressed // to the periodic checkpoint coordinator. if (!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) { log.info("Received message for non-existing checkpoint " + ackMessage.getCheckpointId) } } } catch { case t: Throwable => log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t) } }(context.dispatcher) }
CheckpointCoordinator
receiveAcknowledgeMessage
/** * Receives an AcknowledgeCheckpoint message and returns whether the * message was associated with a pending checkpoint. * * @param message Checkpoint ack from the task manager * * @return Flag indicating whether the ack'd checkpoint was associated * with a pending checkpoint. * * @throws Exception If the checkpoint cannot be added to the completed checkpoint store. */ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception { final long checkpointId = message.getCheckpointId(); CompletedCheckpoint completed = null; PendingCheckpoint checkpoint; // Flag indicating whether the ack message was for a known pending // checkpoint. boolean isPendingCheckpoint; synchronized (lock) { checkpoint = pendingCheckpoints.get(checkpointId); //取出相应的pendingCheckpoint if (checkpoint != null && !checkpoint.isDiscarded()) { isPendingCheckpoint = true; if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) { //根据这个ack message,对pendingCheckpoint进行ack if (checkpoint.isFullyAcknowledged()) { //如果所有需要ack的tasks都完成ack completed = checkpoint.toCompletedCheckpoint(); //将状态置为Completed completedCheckpointStore.addCheckpoint(completed); //将checkpoint track到completedCheckpointStore,表示完成一次完整的checkpoint pendingCheckpoints.remove(checkpointId); //从pending里面去除相应的checkpoint rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(completed.getTimestamp()); onFullyAcknowledgedCheckpoint(completed); triggerQueuedRequests(); } } } } // send the confirmation messages to the necessary targets. we do this here // to be outside the lock scope if (completed != null) { final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp); ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); //通知每个ExecutionVertex,checkpoint完成 } } statsTracker.onCompletedCheckpoint(completed); } return isPendingCheckpoint; }
PendingCheckpoint
在acknowledgeTask中,
只是把state,cache在collectedStates中,
public boolean acknowledgeTask( ExecutionAttemptID attemptID, SerializedValue<StateHandle<?>> state, long stateSize) { synchronized (lock) { if (discarded) { return false; } ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID); if (vertex != null) { if (state != null) { collectedStates.add(new StateForTask( state, stateSize, vertex.getJobvertexId(), vertex.getParallelSubtaskIndex(), System.currentTimeMillis() - checkpointTimestamp)); } numAcknowledgedTasks++; return true; } else { return false; } } }
接着在收到所有的task的ack后,会调用toCompletedCheckpoint
public CompletedCheckpoint toCompletedCheckpoint() { synchronized (lock) { if (discarded) { throw new IllegalStateException("pending checkpoint is discarded"); } if (notYetAcknowledgedTasks.isEmpty()) { CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId, checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates)); dispose(null, false); return completed; } else { throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged"); } } }
把collectedStates封装在CompletedCheckpoint中,返回
最后调用completedCheckpointStore.addCheckpoint,存储这个checkpoint,可以参考
ZooKeeperCompletedCheckpointStore
NotifyCheckpointComplete
通用这个NotifyCheckpointComplete,也最到TaskManager,Task,最终调到StreamTask.notifyCheckpointComplete
@Override public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (lock) { if (isRunning) { LOG.debug("Notification of complete checkpoint for task {}", getName()); // We first notify the state backend if necessary if (stateBackend instanceof CheckpointNotifier) { ((CheckpointNotifier) stateBackend).notifyCheckpointComplete(checkpointId); } for (StreamOperator<?> operator : operatorChain.getAllOperators()) { if (operator != null) { operator.notifyOfCompletedCheckpoint(checkpointId); } } } else { LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName()); } } }
这个就是checkpoint的完整的过程
再看看restore的过程
Restore过程
可以看到,在提交job的时候,会调用
executionGraph.restoreLatestCheckpointedState()
/** * Restores the latest checkpointed state. * * <p>The recovery of checkpoints might block. Make sure that calls to this method don't * block the job manager actor and run asynchronously. * */ public void restoreLatestCheckpointedState() throws Exception { synchronized (progressLock) { if (checkpointCoordinator != null) { checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false); } } }
restoreLatestCheckpointedState
public void restoreLatestCheckpointedState( Map<JobVertexID, ExecutionJobVertex> tasks, boolean errorIfNoCheckpoint, boolean allOrNothingState) throws Exception { synchronized (lock) { // Recover the checkpoints //对于ZooKeeperCompletedCheckpointStore, //Gets the latest checkpoint from ZooKeeper and removes all others. completedCheckpointStore.recover(); // restore from the latest checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); //从completedCheckpointStore中取出最新的CompletedCheckpoint long recoveryTimestamp = System.currentTimeMillis(); if (allOrNothingState) { //全部成功或Nothing Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>(); for (StateForTask state : latest.getStates()) { ExecutionJobVertex vertex = tasks.get(state.getOperatorId()); Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt(); exec.setInitialState(state.getState(), recoveryTimestamp); //恢复state Integer count = stateCounts.get(vertex); //计数 if (count != null) { stateCounts.put(vertex, count+1); } else { stateCounts.put(vertex, 1); } } // validate that either all task vertices have state, or none for (Map.Entry<ExecutionJobVertex, Integer> entry : stateCounts.entrySet()) { ExecutionJobVertex vertex = entry.getKey(); if (entry.getValue() != vertex.getParallelism()) { //如果vetex的恢复state次数不等于平行数,说明有些没有被恢复,抛异常 throw new IllegalStateException( "The checkpoint contained state only for a subset of tasks for vertex " + vertex); } } } else { for (StateForTask state : latest.getStates()) { ExecutionJobVertex vertex = tasks.get(state.getOperatorId()); Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt(); exec.setInitialState(state.getState(), recoveryTimestamp); } } } }
Execution
public void setInitialState(SerializedValue<StateHandle<?>> initialState, long recoveryTimestamp) { if (state != ExecutionState.CREATED) { throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED"); } this.operatorState = initialState; this.recoveryTimestamp = recoveryTimestamp; }
可以看到这里的recovery,只是把我们从zk中获取的checkpoint中的状态赋值给operatorState
然后再deployToSlot,会把初始state,封装到deployment中去,提交给taskManager
public void deployToSlot(final SimpleSlot slot) throws JobException { final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp, attemptNumber); final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout); }
在TaskManager中的submitTask里面,会创建Task,并执行该task,
Task.run()
// the very last thing before the actual execution starts running is to inject // the state into the task. the state is non-empty if this is an execution // of a task that failed but had backuped state from a checkpoint // get our private reference onto the stack (be safe against concurrent changes) SerializedValue<StateHandle<?>> operatorState = this.operatorState; //恢复的state long recoveryTs = this.recoveryTs; if (operatorState != null) { if (invokable instanceof StatefulTask) { //如果是一个有状态的task try { StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader); //反序列化数据 StatefulTask<?> op = (StatefulTask<?>) invokable; StateUtils.setOperatorState(op, state, recoveryTs);//真正的恢复state } catch (Exception e) { throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e); } } else { throw new IllegalStateException("Found operator state for a non-stateful task invokable"); } } // be memory and GC friendly - since the code stays in invoke() for a potentially long time, // we clear the reference to the state handle //noinspection UnusedAssignment operatorState = null; this.operatorState = null;
StateUtils
public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op, StateHandle<?> state, long recoveryTimestamp) throws Exception { @SuppressWarnings("unchecked") StatefulTask<T> typedOp = (StatefulTask<T>) op; @SuppressWarnings("unchecked") T typedHandle = (T) state; typedOp.setInitialState(typedHandle, recoveryTimestamp); }
StreamTask
@Override public void setInitialState(StreamTaskStateList initialState, long recoveryTimestamp) { lazyRestoreState = initialState; //将状态置到lazyRestoreState this.recoveryTimestamp = recoveryTimestamp; }
//在StreamTask的invoke中,会调用restoreStateLazy,真正的做状态恢复
public void restoreStateLazy() throws Exception { if (lazyRestoreState != null) { try { final StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader); //获取所有states // be GC friendly lazyRestoreState = null; for (int i = 0; i < states.length; i++) { StreamTaskState state = states[i]; StreamOperator<?> operator = allOperators[i]; if (state != null && operator != null) { operator.restoreState(state, recoveryTimestamp); //最终把state恢复到operator } else if (operator != null) { } } } catch (Exception e) { throw new Exception("Could not restore checkpointed state to operators and functions", e); } } }