Flink运行时之TaskManager执行Task

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: TaskManager执行任务 当一个任务被JobManager部署到TaskManager之后,它将会被执行。本篇我们将分析任务的执行细节。 submitTask方法分析 一个任务实例被部署所产生的实际影响就是JobManager会将一个TaskDeploymentDescriptor对象封装在SubmitTask消息中发送给TaskManager。

TaskManager执行任务

当一个任务被JobManager部署到TaskManager之后,它将会被执行。本篇我们将分析任务的执行细节。

submitTask方法分析

一个任务实例被部署所产生的实际影响就是JobManager会将一个TaskDeploymentDescriptor对象封装在SubmitTask消息中发送给TaskManager。而处理该消息的入口方法是submitTask方法,它是TaskManager接收任务部署并启动任务执行的入口方法,值得我们关注一下它的实现细节。

submitTask方法中的第一个关键点是它先构建一个Task对象:

val task = new Task(
    tdd,
    memoryManager,
    ioManager,
    network,
    bcVarManager,
    selfGateway,
    jobManagerGateway,
    config.timeout,
    libCache,
    fileCache,
    runtimeInfo,
    taskMetricGroup)

该Task封装了其在TaskManager中执行时需要的一些关键对象。task对象将会被加入TaskManager中的一个ExecutionAttemptID与Task的Map中,如果发现该ExecutionAttemptID所对应的Task对象已存在于Map中,则将原先的Task实例重新放回到Map中,同时抛出异常:

val execId = tdd.getExecutionId
val prevTask = runningTasks.put(execId, task)
if (prevTask != null) {
    runningTasks.put(execId, prevTask)
    throw new IllegalStateException("TaskManager already contains a task for id " + execId)
}

如果一切正常,接下来就启动线程并执行任务,接着发送应答消息进行回复:

task.startTaskThread()
sender ! decorateMessage(Acknowledge)

submitTask方法比起JobManager的submitJob方法,逻辑和代码量都相对简单。我们会进一步分析两个过程:

  1. Task对象的构造方法
  2. Task作为一个线程,其run方法的实现

首先关注的是Task的构造方法,Task作为TaskManager的启动对象,其需要的参数基本都跟其执行有关,参数如下:

public Task(TaskDeploymentDescriptor tdd,            //任务描述符        
    MemoryManager memManager,                        //内存管理器
    IOManager ioManager,                             //IO管理器
    NetworkEnvironment networkEnvironment,           //网络环境对象,处理网络请求
    BroadcastVariableManager bcVarManager,           //广播变量管理器
    ActorGateway taskManagerActor,                   //TaskManager对应的actor通信网关
    ActorGateway jobManagerActor,                    //JobManager对应的actor通信网关
    FiniteDuration actorAskTimeout,                  //actor响应超时时间
    LibraryCacheManager libraryCache,                //用户程序的Jar、类库缓存         
    FileCache fileCache,                             //用户定义的文件缓存,执行时需要
    TaskManagerRuntimeInfo taskManagerConfig         //TaskManager运行时配置
)

构造方法的第一段代码是将TaskDeploymentDescriptor封装的大量信息“转交”给Task对象。

接下来会根据结果分区部署描述符ResultPartitionDeploymentDescriptor和输入网关部署描述符InputGateDeploymentDescriptor来初始化结果分区以及输入网关,其中结果分区是当前的task实例产生的,而输入网关是用来从网络上消费前一个任务的结果分区。首先看一下结果分区的初始化:

this.producedPartitions = new ResultPartition[partitions.size()];
this.writers = new ResultPartitionWriter[partitions.size()];
for (int i = 0; i < this.producedPartitions.length; i++) {   
    ResultPartitionDeploymentDescriptor desc = partitions.get(i);   
    ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);   
    this.producedPartitions[i] = new ResultPartition(         
        taskNameWithSubtaskAndId,         
        jobId,         
        partitionId,         
        desc.getPartitionType(),         
        desc.getEagerlyDeployConsumers(),         
        desc.getNumberOfSubpartitions(),         
        networkEnvironment.getPartitionManager(),         
        networkEnvironment.getPartitionConsumableNotifier(),         
        ioManager,         
        networkEnvironment.getDefaultIOMode());   

    this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
}

以上代码主要的逻辑是循环初始化结果分区对象数组producedPartitions以及结果分区写入器数组writers。结果分区对象初始化时,会根据ResultPartitionType的类型来判断是创建阻塞式的子分区还是创建管道式的子分区,这涉及到数据传输的方式。ResultPartitionWriter是面向结果分区的运行时结果写入器对象。

下面的代码用于输入网关的初始化:

this.inputGates = new SingleInputGate[consumedPartitions.size()];
this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
for (int i = 0; i < this.inputGates.length; i++) {   
    SingleInputGate gate = SingleInputGate.create(         
        taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment);   
    this.inputGates[i] = gate;   inputGatesById.put(gate.getConsumedResultId(), gate);
}

输入网关的初始化则是根据上游task产生的结果分区来进行挨个初始化。

最终它会为该任务的执行创建一个线程:

executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

其实Task类实现了Runnable接口,它的实例本身就可以被线程执行,然后它又在内部实例化了一个线程对象并保存了执行它自身的线程引用,进而获得了对该线程的完全控制。比如,用startTaskThread方法来启动执行Task的线程。Task线程的执行细节,我们将会在接下来进行分析。

从这里我们也能看到,每个任务的部署会产生一个Task对象,而一个Task对象恰好对应一个执行它的线程实例。

Task线程的执行

Task实现了Runnable接口,那么毫无疑问其run方法承载了Task被执行的核心逻辑。而之前,我们将会分析Task线程的执行流程。

首先,第一步先对Task的执行状态进行转换:

while (true) {
    ExecutionState current = this.executionState;
    //如果当前的执行状态为CREATED,则对其应用CAS操作,将其设置为DEPLOYING状态,如果设置成功,将退出while无限循环
    if (current == ExecutionState.CREATED) {
        if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
            // success, we can start our work
            break;
        }
    }
    //如果当前执行状态为FAILED,则发出最终状态的通知消息,并退出run方法的执行
    else if (current == ExecutionState.FAILED) {
        notifyFinalState();
        return;
    }
    //如果当前执行状态为CANCELING,则对其应用cas操作,并将其修改为CANCELED状态,如果修改成功则发出最终状态通知消息,
    //同时退出run方法的执行
    else if (current == ExecutionState.CANCELING) {
        if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
            notifyFinalState();
            return;
        }
    }
    //如果当前的执行状态为其他状态,则抛出异常
    else {
        throw new IllegalStateException("Invalid state for beginning of task operation");
    }
}

接下来,是对用户代码所打成的jar包的加载并生成对应的类加载器,同时获取到程序的执行配置ExecutionConfig。根据类加载器以及用户的可执行体在Flink中所对应的具体的实现类名来加载该类:

invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

Flink中所有类型的操作都有特定的可执行体,它们无一例外都是对AbstractInvokable类的扩展。每个的可执行体的名称在生产JobGraph时就已确定。

紧接着的一个关键操作就是向网络栈注册该任务对象:

network.registerTask(this);

这个操作是为了让Task之间可以基于网络互相进行数据交换,包含了分配网络缓冲、结果分区注册等一系列内部操作,并且有可能会由于系统无足够的内存而发生失败。

然后会把各种配置、管理对象都打包到Task在执行时的统一环境对象Environment中,并将该环境对象赋予可执行体:

invokable.setEnvironment(env);

在此之后,对于有状态的任务,如果它们的状态不为空,则会对这些有状态的任务进行状态初始化:

SerializedValue<StateHandle<?>> operatorState = this.operatorState;

if (operatorState != null) {
    if (invokable instanceof StatefulTask) {
    try {
        StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
        StatefulTask<?> op = (StatefulTask<?>) invokable;
        StateUtils.setOperatorState(op, state);
    }
    catch (Exception e) {
        throw new RuntimeException("Failed to deserialize state handle and " 
            + " setup initial operator state.", e);
    }
    }
    else {
        throw new IllegalStateException("Found operator state for a non-stateful task invokable");
    }
}

通常什么情况下任务会有初始状态呢?当任务并不是首次运行,比如之前发生过失败从某个检查点恢复时会从检查点中获取当前任务的状态,在执行之前先进行初始化。

接下来,会将任务的执行状态变更为RUNNING,并向观察者以及TaskManager发送通知:

if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
    throw new CancelTaskException();
}

notifyObservers(ExecutionState.RUNNING, null);
taskManager.tell(new UpdateTaskExecutionState(
    new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)));

然后将执行线程的类加载器设置为用户代码的类加载器,然后调用可执行体的invoke方法,invoke方法实现了每个可执行体所要执行的核心逻辑。

executingThread.setContextClassLoader(userCodeClassLoader);
invokable.invoke();

invoke方法的执行是个分界点,在执行之前用户逻辑还没有被触发执行;而该方法被执行之后,说明用户逻辑已被执行完成。

然后对当前任务所生产的所有结果分区调用finish方法进行资源释放:

for (ResultPartition partition : producedPartitions) {
    if (partition != null) {
        partition.finish();
    }
}

最后将任务的执行状态修改为FINISHED,并发出通知:

if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
    notifyObservers(ExecutionState.FINISHED, null);
}
else {
    throw new CancelTaskException();
}

接下来在finally块里进行一系列资源释放操作。

最终的可执行体

Task是在TaskManager中执行任务的统一抽象,它的核心仍然是如何执行,而不是如何表述。比如,批处理任务和流处理任务,它们有很大的差别,但我们需要一种表述层面上的抽象,使得它们最终都能被Task所接收,然后得到执行。而该表述层面上的抽象即为AbstractInvokable。它是所有在TaskManager中真正被执行的主体。其类图如下:

AbstructInvokable

AbstractInvokable定义了一系列的“上下文”对象,同时提供了核心两个方法:

  • invoke:该抽象方法是描述用户逻辑的核心方法,最终在Task线程中被执行的就是该方法;
  • cancel:取消执行用户逻辑的方法,提供了默认为空的实现,用户取消执行或者执行失败会触发该方法的调用;

跟Flink提供了流处理和批处理的API一致,AbstractInvokable也相应的具有两个派生类:

  • StreamTask:所有流处理任务的基类,实现位于flink-streaming-java模块中;
  • BatchTask:所有批处理任务的基类,实现位于runtime模块中;

无论是哪种形式的任务,在生成JobGraph阶段就已经被确定并加入到JobVertex中:

public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {   
    Preconditions.checkNotNull(invokable);   
    this.invokableClassName = invokable.getName();   
    this.isStoppable = StoppableTask.class.isAssignableFrom(invokable);
}

随后被一直携带到Task类中,并通过反射的机制从特定的类加载器中创建其实例,最终调用其invoke方法执行:

private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) 
    throws Exception {   
    Class<? extends AbstractInvokable> invokableClass;   
    try {      
        invokableClass = Class.forName(className, true, classLoader)            
            .asSubclass(AbstractInvokable.class);   
    }   catch (Throwable t) {      
        throw new Exception("Could not load the task's invokable class.", t);   
    }   
    try {      
        return invokableClass.newInstance();   
    }   catch (Throwable t) {      
        throw new Exception("Could not instantiate the task's invokable class.", t);   
    }
}

关于更多用户逻辑的执行细节,我们后续会进行分析。



原文发布时间为:2017-01-24

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
39 4
|
1月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
65 1
|
3月前
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
消息中间件 监控 关系型数据库
实时计算 Flink版产品使用问题之运行后,怎么进行监控和报警
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
资源调度 Oracle Java
实时计算 Flink版产品使用问题之在YARN集群上运行时,如何查看每个并行度的详细处理数据情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
资源调度 安全 数据处理
实时计算 Flink版产品使用问题之提交任务时如何设置TaskManager的数量
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 Kubernetes Kafka
实时计算 Flink版操作报错合集之在Rancher K8s部署时,TaskManager无法正常连接到其他TaskManager,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
18天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
701 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎