Flink运行时之合久必分的特定任务

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 合久必分的特定任务 前面我们谈到了TaskManager对每个Task实例会启动一个独立的线程来执行。在分析线程执行的核心代码时,我们看到最终执行的是AbstractInvokable这样执行体的invoke方法。

合久必分的特定任务

前面我们谈到了TaskManager对每个Task实例会启动一个独立的线程来执行。在分析线程执行的核心代码时,我们看到最终执行的是AbstractInvokable这样执行体的invoke方法。所谓合久必分,鉴于流处理任务跟批处理任务执行模式上存在巨大的差异,在对AbstractInvokable的实现时,它们将会走向两个不同的分支。

流处理相关的任务

流处理所对应的任务的继承关系图如下:

AbstractInvokable-for-streaming

从上面的继承关系图可见,StreamTask是流处理任务的抽象。因为在DataStream API中允许算子链接(operator-chain),算子链中的第一个算子称之为“head”,根据“head”算子输入端个数以及角色的差别派生出三种不同类型的任务:

  • SourceStreamTask:输入源对应的流任务;
  • OneInputStreamTask:单输入端流任务;
  • TwoInputStreamTask:双输入端流任务;

在这三大类流任务之下又会有其他一些特殊的流任务被派生。但StreamTask为所有流任务的实现提供了基础。它在AbstracInvokable的invoke核心方法中规定了一套统一的执行流程,这个流程会被任何一个流处理任务在执行时所遵循。这套流程步骤如下:

  1. 创建基本的辅助部件并加载算子链;
  2. 启动算子;
  3. 特定任务相关的初始化;
  4. 打开算子;
  5. 执行run方法;
  6. 关闭算子;
  7. 销毁算子;
  8. 公共的清理操作;
  9. 特定任务相关的清理操作;

以上步骤中被标记为斜体字的步骤(3,5,9)都被定义为抽象方法让实现类填充逻辑。

除去SourceStreamTask之外,为什么要以输入端的个数来区分任务呢?这一点跟Flink的设计是分不开的,它从API层到算子再到任务的设计都以输入端的个数为中心。我们可以看一下流处理中的所有算子的继承关系图:

all-StreamOperator-class-diagram

可以看到除了StreamSource没有扩展OneInputStreamOperator和TwoInputStreamOperator。其他所有算子都无一例外的扩展自这两个接口。这一点跟在Task层面的继承关系是一致的。

批处理相关的任务

批处理对应的任务继承关系如下图:

AbstractInvokable-for-batch

在批处理的任务设计中,将source和sink这两个角色对应的任务独立开,而其他承载业务逻辑的算子所对应的任务都由BatchTask来表示,迭代相关的任务也直接继承自BatchTask。

如同流处理中的StreamTask一样,BatchTask主要也是提供一套执行的流程并提供了基于驱动(Driver)的用户逻辑代码的抽象。

所谓Driver,它其实是衔接用户逻辑代码跟Flink运行时的任务(BatchTask)一个载体,这一点跟类似于计算机中介于软硬件之间的驱动程序。

殊途同归的UDF

其实不管Flink如何执行你的代码,对终端用户而言,它编写Flink程序时做得最多的一件事是什么?是覆写Flink提供的一些函数并在其中实现自己的业务逻辑,这些包含业务逻辑的类,我们通常就称之为UDF(user-defined function,也即用户定义的函数)。接下来,我们将关注点放在封装了UDF的执行体上,看看在流处理和批处理中它们都是如何被实现的。我们选择了一个很常见的函数Map,它在流处理中的实现为StreamMap,在批处理中的实现为MapDriver。

StreamMap在其processElement中的逻辑为:

public void processElement(StreamRecord<IN> element) throws Exception {
    output.collect(element.replace(userFunction.map(element.getValue())));
}

流处理算子的必须实现逐元素处理的processElement方法。

代码段中的userFunction是MapFunction接口的实例。

接下来再看批处理中的MapDriver中的run方法:

public void run() throws Exception {
    final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
    final MapFunction<IT, OT> function = this.taskContext.getStub();
    final Collector<OT> output = new CountingCollector<>(
        this.taskContext.getOutputCollector(), numRecordsOut);

    if (objectReuseEnabled) {
        IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();

    while (this.running && ((record = input.next(record)) != null)) {
        numRecordsIn.inc();
        output.collect(function.map(record));
    }
    }
    else {
        IT record = null;

        while (this.running && ((record = input.next()) != null)) {
            numRecordsIn.inc();
            output.collect(function.map(record));
        }
    }
}

从Collector的collect方法中可以看到同样会执行MapFunction的map方法。

两者的MapFunction是同一个接口,来自org.apache.flink.api.common.functions。

所以归根结底,不论流处理和批处理中它们的执行模式有何不同,对于相同语义的函数,UDF中用户逻辑的执行是殊途同归的。

关于更多用户逻辑的执行细节,我们后续会进行分析。


原文发布时间为:2017-01-26

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
15天前
|
机器学习/深度学习 人工智能 流计算
人工智能平台PAI 操作报错合集之在集群上提交了包含alink相关功能的flink任务,但是却报错如何解决
阿里云人工智能平台PAI (Platform for Artificial Intelligence) 是阿里云推出的一套全面、易用的机器学习和深度学习平台,旨在帮助企业、开发者和数据科学家快速构建、训练、部署和管理人工智能模型。在使用阿里云人工智能平台PAI进行操作时,可能会遇到各种类型的错误。以下列举了一些常见的报错情况及其可能的原因和解决方法。
|
19天前
|
资源调度 Java API
[flink 实时流基础] flink组件栈以及任务执行与资源划分
[flink 实时流基础] flink组件栈以及任务执行与资源划分
|
24天前
|
存储 监控 调度
【Flink】怎么提交的实时任务,有多少Job Manager?
【4月更文挑战第18天】【Flink】怎么提交的实时任务,有多少Job Manager?
|
2月前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
70 2
|
2月前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
23 2
|
2月前
|
Kubernetes 网络协议 Java
在Kubernetes上运行Flink应用程序时
【2月更文挑战第27天】在Kubernetes上运行Flink应用程序时
39 10
|
2月前
|
关系型数据库 MySQL API
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
Flink CDC产品常见问题之mysql整库同步到starrock时任务挂掉如何解决
|
3月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
598 5
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1634 2
官宣|Apache Flink 1.19 发布公告