Flink -- Failover

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介:

JobManager failover

 

LeaderLatch

private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);

if ( oldValue && !newValue ) //原来是leader,当前不是leader,所以是lost leadership
{ // Lost leadership, was true, now false
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
{
listener.notLeader();
return null;
}
});
}
else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
{
input.isLeader();
return null;
}
});
}

notifyAll();
}
 

ZooKeeperLeaderElectionService

@Override
public void isLeader() {
synchronized (lock) {
issuedLeaderSessionID = UUID.randomUUID();


leaderContender.grantLeadership(issuedLeaderSessionID);
}
}

@Override
public void notLeader() {
synchronized (lock) {
issuedLeaderSessionID = null;
confirmedLeaderSessionID = null;



leaderContender.revokeLeadership();
}
}

可以看到,只是分别调用leaderContender.grantLeadership,leaderContender.revokeLeadership

 

而JobManager继承了leaderContender接口,

revokeLeadership

val newFuturesToComplete = cancelAndClearEverything(
new Exception("JobManager is no longer the leader."))

 

在cancelAndClearEverything中,关键的是suspend executionGraph;停止执行,但是并不会job删除,这样其他的JobManager还能重新提交

* The SUSPENDED state is a local terminal state which stops the execution of the job but does
* not remove the job from the HA job store so that it can be recovered by another JobManager.
private def cancelAndClearEverything(cause: Throwable)
: Seq[Future[Unit]] = {
val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
future {
eg.suspend(cause) //suspend Execution Graph

if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
jobInfo.client ! decorateMessage(
Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
}
}(context.dispatcher)
}

currentJobs.clear()

futures.toSeq
}

 

grantLeadership

context.system.scheduler.scheduleOnce(
jobRecoveryTimeout,
self,
decorateMessage(RecoverAllJobs))(
context.dispatcher)

主要是要恢复所有的job,RecoverAllJobs

case RecoverAllJobs =>
future {
try {
// The ActorRef, which is part of the submitted job graph can only be
// de-serialized in the scope of an actor system.
akka.serialization.JavaSerializer.currentSystem.withValue(
context.system.asInstanceOf[ExtendedActorSystem]) {

log.info(s"Attempting to recover all jobs.")

val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala //从submittedJobGraphs store里面读出所有submitted的job,也是从zk里面读出

if (!leaderElectionService.hasLeadership()) {
// we've lost leadership. mission: abort.
log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
s"jobs.")
} else {
log.info(s"Re-submitting ${jobGraphs.size} job graphs.")

jobGraphs.foreach{
submittedJobGraph =>
self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) //recover job
}
}
}
} catch {
case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
}
}(context.dispatcher)

 

在recover job,

case RecoverSubmittedJob(submittedJobGraph) =>
if (!currentJobs.contains(submittedJobGraph.getJobId)) {
submitJob(
submittedJobGraph.getJobGraph(),
submittedJobGraph.getJobInfo(),
isRecovery = true)
}
else {
log.info(s"Ignoring job recovery for ${submittedJobGraph.getJobId}, " +
s"because it is already submitted.")
}

其实就是重新的submit job,注意这里的,isRecovery = true

在submit job时,如果isRecovery = true,会做下面的操作,然后后续具体的操作参考Checkpoint篇

if (isRecovery) {
executionGraph.restoreLatestCheckpointedState()
}

 

TaskManager Failover

在job manager内部通过death watch发现task manager dead,

复制代码
/**
    * Handler to be executed when a task manager terminates.
    * (Akka Deathwatch or notifiction from ResourceManager)
    *
    * @param taskManager The ActorRef of the taskManager
    */
  private def handleTaskManagerTerminated(taskManager: ActorRef): Unit = {
    if (instanceManager.isRegistered(taskManager)) {
      log.info(s"Task manager ${taskManager.path} terminated.")

      instanceManager.unregisterTaskManager(taskManager, true)
      context.unwatch(taskManager)
    }
  }
复制代码

instanceManager.unregisterTaskManager,

复制代码
/**
* Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
* the given instance as dead and notify {@link InstanceListener} about the dead instance.
*
* @param instanceID TaskManager which is about to be marked dead.
*/
public void unregisterTaskManager(ActorRef instanceID, boolean terminated){
    Instance instance = registeredHostsByConnection.get(instanceID);
    
    if (instance != null){
        ActorRef host = instance.getActorGateway().actor();
        
        registeredHostsByConnection.remove(host);
        registeredHostsById.remove(instance.getId());
        registeredHostsByResource.remove(instance.getResourceId());
        
        if (terminated) {
            deadHosts.add(instance.getActorGateway().actor());
        }
        
        instance.markDead();
        
        totalNumberOfAliveTaskSlots -= instance.getTotalNumberOfSlots();
        
        notifyDeadInstance(instance);
    }
}
复制代码

 

instance.markDead()

复制代码
public void markDead() {

    // create a copy of the slots to avoid concurrent modification exceptions
    List<Slot> slots;
    
    synchronized (instanceLock) {
    if (isDead) {
        return;
    }
    isDead = true;
    
    // no more notifications for the slot releasing
    this.slotAvailabilityListener = null;
    
    slots = new ArrayList<Slot>(allocatedSlots);
    
    allocatedSlots.clear();
        availableSlots.clear();
    }
    
    /*
    * releaseSlot must not own the instanceLock in order to avoid dead locks where a slot
    * owning the assignment group lock wants to give itself back to the instance which requires
    * the instance lock
    */
    for (Slot slot : slots) {
        slot.releaseSlot();
    }
}
复制代码

 

SimpleSolt.releaseSlot

复制代码
@Override 
public void releaseSlot() { 

    if (!isCanceled()) { 

        // kill all tasks currently running in this slot 
        Execution exec = this.executedTask; 
        if (exec != null && !exec.isFinished()) { 
            exec.fail(new Exception( 
                    "The slot in which the task was executed has been released. Probably loss of TaskManager " 
                            + getInstance())); 
        } 

        // release directly (if we are directly allocated), 
        // otherwise release through the parent shared slot 
        if (getParent() == null) { 
            // we have to give back the slot to the owning instance 
            if (markCancelled()) { 
                getInstance().returnAllocatedSlot(this); 
            } 
        } else { 
            // we have to ask our parent to dispose us 
            getParent().releaseChild(this); 
        }

}
复制代码

 

Execution.fail

public void fail(Throwable t) {
processFail(t, false);
}

 

Execution.processFail

先将Execution的状态设为failed

transitionState(current, FAILED, t)
复制代码
private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { 

    if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
        markTimestamp(targetState); 

        try {
            vertex.notifyStateTransition(attemptId, targetState, error);
        }
        catch (Throwable t) {
            LOG.error("Error while notifying execution graph of execution state transition.", t);
        }
        return true;
    } else {
        return false;
    }
}
复制代码

设置完后,需要notifyStateTransition

getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
复制代码
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
                        newExecutionState, Throwable error)
{
    ExecutionJobVertex vertex = getJobVertex(vertexId);

    if (executionListenerActors.size() > 0) {
        String message = error == null ? null : ExceptionUtils.stringifyException(error);
        ExecutionGraphMessages.ExecutionStateChanged actorMessage =
                new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId,  vertex.getJobVertex().getName(),
                                                                vertex.getParallelism(), subtask,
                                                                executionID, newExecutionState,
                                                                System.currentTimeMillis(), message);

        for (ActorGateway listener : executionListenerActors) {
            listener.tell(actorMessage);
        }
    }

    // see what this means for us. currently, the first FAILED state means -> FAILED
    if (newExecutionState == ExecutionState.FAILED) {
        fail(error);
    }
}
复制代码

主要就是将ExecutionGraphMessages.ExecutionStateChanged,发送给所有的listeners

listener是在JobManager里面在提交job的时候加上的,

复制代码
     if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
          // the sender wants to be notified about state changes
          val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)

          executionGraph.registerExecutionListener(gateway)
          executionGraph.registerJobStatusListener(gateway)
      }
复制代码

而在client,

JobClientActor,只是log和print这些信息
if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
    logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message);
} else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
    logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message);
}

 

注意,这里如果newExecutionState == ExecutionState.FAILED,会调用ExecutionGraph.fail 
就像注释说的,第一个failed,就意味着整个jobfailed

复制代码
public void fail(Throwable t) {
    while (true) {
        JobStatus current = state;
        // stay in these states
        if (current == JobStatus.FAILING ||
            current == JobStatus.SUSPENDED ||
            current.isGloballyTerminalState()) {
            return;
        } else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
            synchronized (progressLock) {
                postRunCleanup();
                progressLock.notifyAll();
                return;
            }
        } else if (transitionState(current, JobStatus.FAILING, t)) { //将job的状态设为JobStatus.FAILING
            this.failureCause = t;

            if (!verticesInCreationOrder.isEmpty()) {
                // cancel all. what is failed will not cancel but stay failed
                for (ExecutionJobVertex ejv : verticesInCreationOrder) {
                    ejv.cancel();
                }
            } else {
                // set the state of the job to failed
                transitionState(JobStatus.FAILING, JobStatus.FAILED, t); //
            }

            return;
        }

    }
}
复制代码

可以看到,这里直接把job状态设为Failing,并且调用所有的ExecutionJobVertex.cancel

 

接着,从ExecutionGraph中deregister这个execution,

vertex.getExecutionGraph().deregisterExecution(this);
Execution contained = currentExecutions.remove(exec.getAttemptId());

 

最终,调用

vertex.executionFailed(t);
void executionFailed(Throwable t) {
    jobVertex.vertexFailed(subTaskIndex, t);
}

 

复制代码
ExecutionJobVertex

void vertexFailed(int subtask, Throwable error) {
    subtaskInFinalState(subtask);
}

private void subtaskInFinalState(int subtask) {
    synchronized (stateMonitor) {
        if (!finishedSubtasks[subtask]) {
            finishedSubtasks[subtask] = true;
            
            if (numSubtasksInFinalState+1 == parallelism) { //看看对于Vertex而言,是否所有的subTask都已经finished
                
                // call finalizeOnMaster hook
                try {
                    getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
                }
                catch (Throwable t) {
                    getGraph().fail(t);
                }

                numSubtasksInFinalState++;
                
                // we are in our final state
                stateMonitor.notifyAll();
                
                // tell the graph
                graph.jobVertexInFinalState();
            } else {
                numSubtasksInFinalState++;
            }
        }
    }
}
复制代码

graph.jobVertexInFinalState()

复制代码
void jobVertexInFinalState() {
        numFinishedJobVertices++;

        if (numFinishedJobVertices == verticesInCreationOrder.size()) { //是否所有JobVertices都已经finished

            // we are done, transition to the final state
            JobStatus current;
            while (true) {
                current = this.state;

                if (current == JobStatus.RUNNING) {
                    if (transitionState(current, JobStatus.FINISHED)) {
                        postRunCleanup();
                        break;
                    }
                }
                else if (current == JobStatus.CANCELLING) {
                    if (transitionState(current, JobStatus.CANCELED)) {
                        postRunCleanup();
                        break;
                    }
                }
                else if (current == JobStatus.FAILING) {
                    boolean allowRestart = !(failureCause instanceof SuppressRestartsException);

                    if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
                        restartStrategy.restart(this);
                        break;
                    } else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
                        postRunCleanup();
                        break;
                    }
                }
                else if (current == JobStatus.SUSPENDED) {
                    // we've already cleaned up when entering the SUSPENDED state
                    break;
                }
                else if (current.isGloballyTerminalState()) {
                    LOG.warn("Job has entered globally terminal state without waiting for all " +
                        "job vertices to reach final state.");
                    break;
                }
                else {
                    fail(new Exception("ExecutionGraph went into final state from state " + current));
                    break;
                }
            }
            // done transitioning the state

            // also, notify waiters
            progressLock.notifyAll();
        }
    }
}
复制代码

如果Job状态是JobStatus.FAILING,并且满足restart的条件,transitionState(current, JobStatus.RESTARTING)

restartStrategy.restart(this);

这个restart策略是可以配置的,但无论什么策略最终调用到,

executionGraph.restart();
复制代码
public void restart() {
    try {
        synchronized (progressLock) {
            JobStatus current = state;

            if (current == JobStatus.CANCELED) {
                LOG.info("Canceled job during restart. Aborting restart.");
                return;
            } else if (current == JobStatus.FAILED) {
                LOG.info("Failed job during restart. Aborting restart.");
                return;
            } else if (current == JobStatus.SUSPENDED) {
                LOG.info("Suspended job during restart. Aborting restart.");
                return;
            } else if (current != JobStatus.RESTARTING) {
                throw new IllegalStateException("Can only restart job from state restarting.");
            }

            if (scheduler == null) {
                throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
            }

            this.currentExecutions.clear();

            Collection<CoLocationGroup> colGroups = new HashSet<>();

            for (ExecutionJobVertex jv : this.verticesInCreationOrder) {

                CoLocationGroup cgroup = jv.getCoLocationGroup();
                if(cgroup != null && !colGroups.contains(cgroup)){
                    cgroup.resetConstraints();
                    colGroups.add(cgroup);
                }

                jv.resetForNewExecution();
            }

            for (int i = 0; i < stateTimestamps.length; i++) {
                if (i != JobStatus.RESTARTING.ordinal()) {
                    // Only clear the non restarting state in order to preserve when the job was
                    // restarted. This is needed for the restarting time gauge
                    stateTimestamps[i] = 0;
                }
            }
            numFinishedJobVertices = 0;
            transitionState(JobStatus.RESTARTING, JobStatus.CREATED);

            // if we have checkpointed state, reload it into the executions
            if (checkpointCoordinator != null) {
                boolean restored = checkpointCoordinator
                        .restoreLatestCheckpointedState(getAllVertices(), false, false); //重新加载checkpoint和状态

                // TODO(uce) Temporary work around to restore initial state on
                // failure during recovery. Will be superseded by FLINK-3397.
                if (!restored && savepointCoordinator != null) {
                    String savepointPath = savepointCoordinator.getSavepointRestorePath();
                    if (savepointPath != null) {
                        savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
                    }
                }
            }
        }

        scheduleForExecution(scheduler); //把ExecuteGraph加入调度,重新提交
    }
    catch (Throwable t) {
        fail(t);
    }
}
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
339 0
|
12月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
1329 7
阿里云实时计算Flink在多行业的应用和实践
|
10月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3155 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
10月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
410 56
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
527 0
Flink CDC 在阿里云实时计算Flink版的云上实践
|
9月前
|
存储 关系型数据库 BI
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
|
人工智能 Apache 流计算
Flink Forward Asia 2024 上海站|探索实时计算新边界
Flink Forward Asia 2024 即将盛大开幕!11 月 29 至 30 日在上海举行,大会聚焦 Apache Flink 技术演进与未来规划,涵盖流式湖仓、流批一体、Data+AI 融合等前沿话题,提供近百场专业演讲。立即报名,共襄盛举!官网:https://asia.flink-forward.org/shanghai-2024/
1189 33
Flink Forward Asia 2024 上海站|探索实时计算新边界

热门文章

最新文章