Flink流处理之迭代任务

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类: StreamIterationHead:迭代头任务,它借助于反馈阻塞队列从迭代尾部接收参与下一次迭代的反馈数据。

前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类:

  • StreamIterationHead:迭代头任务,它借助于反馈阻塞队列从迭代尾部接收参与下一次迭代的反馈数据。
  • StreamIterationTail:迭代尾任务,它借助于阻塞队列作为反馈信道将下一次需要迭代的数据反馈给迭代头。

对于迭代流处理而言,随着任务(task)最终被并行化执行,它们的子任务(sub task,这些任务在计算节点中的实例)的并行度要求一致,并且迭代头的子任务与迭代尾的子任务必须成对且处于同一个CoLocationGroup中执行,这些都反映了流处理的迭代任务的特殊性,可以认为是一种并发与并行兼具的模式。

StreamIterationHead和StreamIterationTail之间的交互依赖于阻塞队列代理(BlockingQueueBroker)。Flink设计了一种称之为Broker的数据结构,专门用于对迭代进行并发控制,而BlockingQueueBroker是Broker针对BlockingQueue类型的特定实现。因为Broker内部的核心数据结构就是ConcurrentMap<String, BlockingQueue<V>>类型,所以BlockingQueueBroker对应的类型即为:ConcurrentMap<String, BlockingQueue<BlockingQueue>>其图示如下:

BlockingQueueBroker-structure

迭代头任务跟迭代尾任务之间真正交互的数据结构是容量为1的阻塞队列:

final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);

它会被加入到BlockingQueueBroker内部的ConcurrentMap中去,对应的键是brokerID。brokerID生成规则如下:

public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) {   
    return jid + "-" + iterationID + "-" + subtaskIndex;
}

BlockingQueueBroker是单例的,因为对应的迭代头子任务和迭代尾子任务会生成相同的brokerID,所以两者在同一个JVM中会基于相同的dataChannel进行通信。dataChannel由迭代头创建并递交给BlockingQueueBroker:

BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);

由迭代尾获取:

BlockingQueue<StreamRecord<IN>> dataChannel =      
    (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);

数据交换是基于dataChannel的,由迭代尾负责生产:

public void collect(StreamRecord<IN> record) {   
    try {      
        if (shouldWait) {         
            dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);      
        } else {         
            dataChannel.put(record);      
        }   
    } catch (InterruptedException e) {      
        throw new RuntimeException(e);   
    }
}

由迭代头负责消费:

while (running) {   
    StreamRecord<OUT> nextRecord = shouldWait ?      
        dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :      
        dataChannel.take();

    if (nextRecord != null) {      
        for (RecordWriterOutput<OUT> output : outputs) {         
            output.collect(nextRecord);      
        }   
    } else {      
        // done      
        break;   
    }
}

因此,迭代头跟迭代尾的子任务之间通过反馈信道进行迭代数据处理的大致机制如下图:

Streaming-iteration-runtime

相信分析到这里,我们应该明白了为什么迭代头和迭代尾的并行度必须一致,且处于相同的CoLocationGroup中。Flink在执行拓扑中巧妙地化解了反馈环,从而使其适应于DAG计算模型,但却在同一个TaskManager的迭代头和迭代尾对应的子任务中借助于阻塞队列的生产者和消费者模型重塑了反馈“环”。



原文发布时间为:2016-12-12


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
93 4
|
30天前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
81 0
|
3月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
3月前
|
监控 Cloud Native 流计算
实时计算 Flink版产品使用问题之如何查看和管理任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 资源调度 Kafka
实时计算 Flink版操作报错合集之提交任务后,如何解决报错:UnavailableDispatcherOperationException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
50 0
|
3月前
|
资源调度 Java Scala
实时计算 Flink版产品使用问题之如何实现ZooKeeper抖动导致任务失败时,能从最近的检查点重新启动任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
68 0
|
3月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
177 0
|
3月前
|
Kubernetes Java 数据库连接
实时计算 Flink版产品使用问题之部署到 Kubernetes 集群时,任务过一会儿自动被取消,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。