Flink流处理迭代之化解反馈环

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 我们都知道Flink在可迭代的流处理中引入了反馈边来将本次迭代的结果反馈给迭代头以进行下一次迭代,这在执行拓扑中引入了环(反馈环)。Flink主要应对的执行拓扑还是有向无环图(DAG),最终它选择了将反馈环进行化解使其能够适配有向无环图的结构,而如何对反馈环进行化解是我们这一篇主要探讨的话题。

我们都知道Flink在可迭代的流处理中引入了反馈边来将本次迭代的结果反馈给迭代头以进行下一次迭代,这在执行拓扑中引入了环(反馈环)。Flink主要应对的执行拓扑还是有向无环图(DAG),最终它选择了将反馈环进行化解使其能够适配有向无环图的结构,而如何对反馈环进行化解是我们这一篇主要探讨的话题。

任何提交给Flink执行的程序在提交之前都必须先生成作业图,对于用DataStream API编写的流处理程序在生成作业图之前,还会先生成流图。因此,如果想化解迭代产生的反馈环其时机只能是在部署执行之前的流图和作业图中,而不可能是在最终的执行图中,事实上经过分析,我们发现它发生在流图中。

Flink生成流图的部件主要是流图生成器(StreamGraphGenerator)。它会对流处理程序进行遍历、转换。跟迭代有关的转换是我们上一篇所分析的FeedbackTransformation和CoFeedbackTransformation。

首先,我们来看对FeedbackTransformation的转换:

private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
    //检查迭代的反馈边,如果没有反馈边,则无法形成迭代的“环”,这时就抛出异常   
    if (iterate.getFeedbackEdges().size() <= 0) {      
        throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");   
    }
    //获得迭代的上游输入端对应的转换
    StreamTransformation<T> input = iterate.getInput();   
    List<Integer> resultIds = new ArrayList<>();   
    //对上游输入进行(递归)转换以获得转换编号集合   
    Collection<Integer> inputIds = transform(input);   
    //将转换编号集合加入结果集合中
    resultIds.addAll(inputIds);   
    //因为转换是递归进行的,所以为防止重复转换,会将已转换过的对象将入alreadyTransformed集合中
    //在对当前转换对象进行转换之前会预先检查该集合,如果当前转换对象已处于该集合中,则直接返回对应的编号集合,防止重复转换
    if (alreadyTransformed.containsKey(iterate)) {      
        return alreadyTransformed.get(iterate);   
    }   
    //这里将迭代这一在执行图中的环看作一个闭合的整体,认为它也有source和sink,为其创建source和sink的二元组   
    Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(         
        iterate.getId(),         
        getNewIterationNodeId(),         
        getNewIterationNodeId(),         
        iterate.getWaitTime(),         
        iterate.getParallelism());
    //获得迭代source和sink   
    StreamNode itSource = itSourceAndSink.f0;   
    StreamNode itSink = itSourceAndSink.f1;   
    //在StreamGraph中为这两个顶点设置序列化器   
    streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType()
        .createSerializer(env.getConfig()));   
    streamGraph.setSerializers(itSink.getId(), iterate.getOutputType()
        .createSerializer(env.getConfig()), null, null);   
    //将迭代的source顶点的编号也作为结果集合的一部分,这是为了让下游的算子将其视为输入   
    resultIds.add(itSource.getId());   
    //将反馈转换对象以及其对应的结果集合的映射关系加入已遍历的Map中,这样在进行反馈边转换时,当它们向上递归转换时
    //遇到当前的反馈转换对象将停止递归转换   
    alreadyTransformed.put(iterate, resultIds);   
    //遍历迭代的所有反馈边,并将所有反馈边对应的转换对象编号加入allFeedbackIds中   
    List<Integer> allFeedbackIds = new ArrayList<>();   
    for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
        //对反馈边转换对象执行递归转换      
        Collection<Integer> feedbackIds = transform(feedbackEdge);      
        //将获取到的反馈边转换对象编号集合加入allFeedbackIds
        allFeedbackIds.addAll(feedbackIds);      
        //遍历所有的反馈转换对象的编号,并在StreamGraph中构建从反馈转换对象到迭代sink之间的边
        for (Integer feedbackId: feedbackIds) {         
            streamGraph.addEdge(feedbackId, itSink.getId(), 0);      
        }   
    }

    //决定所有的反馈对象的”槽共享组“名   
    String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
    //为迭代sink设置槽共享组名称   
    itSink.setSlotSharingGroup(slotSharingGroup);   
    //为迭代source设置槽共享组名称
    itSource.setSlotSharingGroup(slotSharingGroup);   
    //返回该转换对象对应的编号结果集
    return resultIds;
}

Flink在流图中如何处理迭代产生的”环“呢?从上面的转换方法的实现中可以看出一些端倪。我们看到它在内部调用了createIterationSourceAndSink这一创建“伪”source和sink的方法。通过这种方式,将迭代产生的“环”转化成了虚拟的输入输出。我们以之前的迭代案例的实现代码为示例,来看一下其通过计划可视化器所产生的流图:

streaming-iteration-StreamGraph

注意,通过计划可视化器展示的流图并不能完整地展示程序的执行拓扑,它会丢失一些信息,比如虚拟节点等。

上面的流图中并没有因为迭代而产生环,而是多了IterationSource-3以及IterationSink-3这两个节点。代码段中创建这两个节点的createIterationSourceAndSink方法实现如下:

public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, 
    int sinkId, long timeout, int parallelism) {
    //创建一个source节点实例,这里我们尤其关注第三个参数,它对应的执行时的任务类是StreamIterationHead
    StreamNode source = this.addNode(sourceId,
            null,
            StreamIterationHead.class,
            null,
            "IterationSource-" + loopId);
    sources.add(source.getId());
    setParallelism(source.getId(), parallelism);

    //创建一个sink节点实例,它对应的执行时的任务类是StreamIterationTail
    StreamNode sink = this.addNode(sinkId,
            null,
            StreamIterationTail.class,
            null,
            "IterationSink-" + loopId);
    sinks.add(sink.getId());
    setParallelism(sink.getId(), parallelism);

    iterationSourceSinkPairs.add(new Tuple2<>(source, sink));

    //建立节点编号与代理的关系
    this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
    this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
    this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
    this.vertexIDtoLoopTimeout.put(sink.getId(), timeout);

    return new Tuple2<>(source, sink);
}

上面代码段中创建source和sink时,分别指定了这两个节点在最终被部署执行时所对应的任务类(StreamIterationHead和StreamIterationTail)。这两个类如何协作是了解可迭代的流处理程序执行机制的关键,我们将会在下一篇对其进行详细分析。在创建完source和sink节点之后,我们还看到了建立节点编号跟代理(broker)编号映射关系的代码。这里的代理主要用于为迭代头和迭代尾交换数据。

在流图中生成的source和sink二元组有什么作用呢?它将会在流处理程序生成作业图的过程中获得迭代source、sink并将它们相应的作业顶点加入到同一个CoLocationGroup中:

for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
    CoLocationGroup ccg = new CoLocationGroup();

    JobVertex source = jobVertices.get(pair.f0.getId());
    JobVertex sink = jobVertices.get(pair.f1.getId());

    ccg.addVertex(source);
    ccg.addVertex(sink);
    source.updateCoLocationGroup(ccg);
    sink.updateCoLocationGroup(ccg);
}

Flink会确保伪source/sink对被放置到同一个TaskManager上且分区完全对应得执行。

对CoFeedbackTransformation的转换,跟FeedbackTransformation有所不同。它不需要去转换输入端对应的转换对象然后再通过返回输入转换的编号以将输入连接到迭代头,因为输入将对接到可连接的迭代流的左侧输入端,所以仅需要返回反馈边对应的转换对象的编号,反馈边会被用来对接到可连接的迭代流的右侧输入端。




原文发布时间为:2016-12-06


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
780 5
|
Java Linux API
flink入门-流处理
flink入门-流处理
158 0
|
分布式计算 资源调度 监控
没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
|
6月前
|
监控 大数据 Java
使用Apache Flink进行大数据实时流处理
Apache Flink是开源流处理框架,擅长低延迟、高吞吐量实时数据流处理。本文深入解析Flink的核心概念、架构(包括客户端、作业管理器、任务管理器和数据源/接收器)和事件时间、窗口、状态管理等特性。通过实战代码展示Flink在词频统计中的应用,讨论其实战挑战与优化。Flink作为大数据处理的关键组件,将持续影响实时处理领域。
894 5
|
4月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
55 0
|
4月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
75 0
|
4月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
240 0
|
4月前
|
消息中间件 大数据 Kafka
Apache Flink 大揭秘:征服大数据实时流处理的神奇魔法,等你来解锁!
【8月更文挑战第5天】Apache Flink 是一款强大的开源大数据处理框架,专长于实时流处理。本教程通过两个示例引导你入门:一是计算数据流中元素的平均值;二是从 Kafka 中读取数据并实时处理。首先确保已安装配置好 Flink 和 Kafka 环境。第一个 Java 示例展示了如何创建流执行环境,生成数据流,利用 `flatMap` 转换数据,并使用 `keyBy` 和 `sum` 计算平均值。第二个示例则演示了如何设置 Kafka 消费者属性,并从 Kafka 主题读取数据。这两个示例为你提供了使用 Flink 进行实时流处理的基础。随着进一步学习,你将能应对更复杂的实时数据挑战。
81 0
|
5月前
|
消息中间件 Java Kafka
Java中的流处理框架:Kafka Streams与Flink
Java中的流处理框架:Kafka Streams与Flink
|
7月前
|
SQL 大数据 数据处理
[AIGC大数据基础] Flink: 大数据流处理的未来
[AIGC大数据基础] Flink: 大数据流处理的未来
下一篇
无影云桌面