Flink运行时之网络通信NetworkEnvironment分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 网络环境(NetworkEnvironment)是TaskManager进行网络通信的主对象,主要用于跟踪中间结果并负责所有的数据交换。每个TaskManager的实例都包含一个网络环境对象,在TaskManager启动时创建。

网络环境(NetworkEnvironment)是TaskManager进行网络通信的主对象,主要用于跟踪中间结果并负责所有的数据交换。每个TaskManager的实例都包含一个网络环境对象,在TaskManager启动时创建。NetworkEnvironment管理着多个协助通信的关键部件,它们是:

  • NetworkBufferPool:网络缓冲池,负责申请一个TaskManager的所有的内存段用作缓冲池;
  • ConnectionManager:连接管理器,用于管理本地(远程)通信连接;
  • ResultPartitionManager:结果分区管理器,用于跟踪一个TaskManager上所有生产/消费相关的ResultPartition;
  • TaskEventDispatcher:任务事件分发器,从消费者任务分发事件给生产者任务;
  • ResultPartitionConsumableNotifier:结果分区可消费通知器,用于通知消费者生产者生产的结果分区可消费;
  • PartitionStateChecker:分区状态检查器,用于检查分区状态;

当NetworkEnvironment被初始化时,它首先根据配置创建网络缓冲池(NetworkBufferPool)。创建NetworkBufferPool时需要指定Buffer数目、单个Buffer的大小以及Buffer所基于的内存类型,这些信息都是可配置的并封装在配置对象NetworkEnvironmentConfiguration中。

NetworkEnvironment对象包含了上面列举的网络I/O相关的各种部件,这些对象并不随着NetworkEnvironment对象实例化而被立即实例化,它们的实例化会被延后到NetworkEnvironment对象跟TaskManager以及JobManager**关联**(associate)上之后。TaskManager在启动后会向JobManager注册,随后NetworkEnvironment的associateWithTaskManagerAndJobManager方法会得到调用,在其中所有的辅助部件都会得到实例化:

this.partitionManager = new ResultPartitionManager();
this.taskEventDispatcher = new TaskEventDispatcher();
this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
    executionContext,jobManagerGateway,taskManagerGateway,jobManagerTimeout);

this.partitionStateChecker = new JobManagerPartitionStateChecker(jobManagerGateway, taskManagerGateway);

final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get())
    : new LocalConnectionManager();

try {
    //启动网络连接管理器
    connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
} catch (Throwable t) {
    throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
}

当然在TaskManager触发stop动作之后,在其postStop逻辑中,也会跟JobManager进行解关联操作。从而触发NetworkEnvironment的disassociate方法。在disassociate方法中,上述所有的辅助通信部件也将会被释放或回收资源。

在任务执行的核心逻辑中,有一个步骤是需要将自身(Task)注册到网络栈(也就是这里的NetworkEnvironment)。该步骤会调用NetworkEnvironment的实例方法registerTask进行注册,注册之后NetworkEnvironment会对任务的通信进行管理:

public void registerTask(Task task) throws IOException {
    //获得当前任务对象所生产的结果分区集合
    final ResultPartition[] producedPartitions = task.getProducedPartitions();
    //同时获得所有的结果分区写入器
    final ResultPartitionWriter[] writers = task.getAllWriters();

    //正常情况下结果分区数与写入器的数目应该是相等的
    if (writers.length != producedPartitions.length) {
        throw new IllegalStateException("Unequal number of writers and partitions.");
    }

    ResultPartitionConsumableNotifier jobManagerNotifier;

    synchronized (lock) {
        if (isShutdown) {
            throw new IllegalStateException("NetworkEnvironment is shut down");
        }

        //如果当前网络环境对象还没有跟TaskManager进行关联,那么说明调用的时机出现问题,直接抛出异常
        if (!isAssociated()) {
            throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager");
        }

        //遍历任务的每个结果分区,依次进行初始化
        for (int i = 0; i < producedPartitions.length; i++) {
            final ResultPartition partition = producedPartitions[i];
            final ResultPartitionWriter writer = writers[i];

            BufferPool bufferPool = null;

            try {
                //用网络缓冲池创建本地缓冲池,该缓冲池是非固定大小的且请求的缓冲个数是结果分区的子分区个数
                bufferPool = networkBufferPool.createBufferPool(
                    partition.getNumberOfSubpartitions(), false);
                //将本地缓冲池注册到结果分区
                partition.registerBufferPool(bufferPool);
                //结果分区会被注册到结果分区管理器
                partitionManager.registerResultPartition(partition);
            } catch (Throwable t) {
                if (bufferPool != null) {
                    bufferPool.lazyDestroy();
                }

                if (t instanceof IOException) {
                    throw (IOException) t;
                } else {
                    throw new IOException(t.getMessage(), t);
                }
            }

            //向任务事件分发器注册结果分区写入器
            taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
        }

        //获得任务的所有输入闸门
        final SingleInputGate[] inputGates = task.getAllInputGates();

        //遍历输入闸门,为它们设置缓冲池
        for (SingleInputGate gate : inputGates) {
            BufferPool bufferPool = null;

            try {
                //为每个输入闸门设置本地缓冲池,这里创建的本地缓冲池也非固定大小的,且初始化的缓冲数为其包含的输入信道数
                bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
                gate.setBufferPool(bufferPool);
            }
            catch (Throwable t) {
                if (bufferPool != null) {
                    bufferPool.lazyDestroy();
                }

                if (t instanceof IOException) {
                    throw (IOException) t;
                } else {
                    throw new IOException(t.getMessage(), t);
                }
            }
        }

        jobManagerNotifier = partitionConsumableNotifier;
    }

    //遍历所有的结果分区
    for (ResultPartition partition : producedPartitions) {
        //如果某个结果分区的消费者是主动部署的
        if (partition.getEagerlyDeployConsumers()) {
            //则直接通知JobManager,让其告知消费者任务,当前结果分区可被消费
            jobManagerNotifier.notifyPartitionConsumable(
                        partition.getJobId(), partition.getPartitionId());
        }
    }
}

从任务被注册到NetworkEnvironment对象的代码段中,我们能够得到一些信息。NetworkEnvironment对象会为当前任务生产端的每个ResultPartition都创建本地缓冲池,缓冲池中的Buffer数为结果分区的子分区数,同时为当前任务消费端的InputGate创建本地缓冲池,缓冲池的Buffer数为InputGate所包含的输入信道数。这些缓冲池都是非固定大小的,也就是说他们会按照网络缓冲池内存段的使用情况进行重平衡。



原文发布时间为:2016-12-14


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
人工智能 边缘计算 物联网
蜂窝网络未来发展趋势的分析
蜂窝网络未来发展趋势的分析
134 2
|
3月前
|
数据采集 缓存 定位技术
网络延迟对Python爬虫速度的影响分析
网络延迟对Python爬虫速度的影响分析
|
4月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
112 5
|
4月前
|
机器学习/深度学习 数据采集 存储
时间序列预测新突破:深入解析循环神经网络(RNN)在金融数据分析中的应用
【10月更文挑战第7天】时间序列预测是数据科学领域的一个重要课题,特别是在金融行业中。准确的时间序列预测能够帮助投资者做出更明智的决策,比如股票价格预测、汇率变动预测等。近年来,随着深度学习技术的发展,尤其是循环神经网络(Recurrent Neural Networks, RNNs)及其变体如长短期记忆网络(LSTM)和门控循环单元(GRU),在处理时间序列数据方面展现出了巨大的潜力。本文将探讨RNN的基本概念,并通过具体的代码示例展示如何使用这些模型来进行金融数据分析。
627 2
|
2月前
|
存储 安全 物联网
浅析Kismet:无线网络监测与分析工具
Kismet是一款开源的无线网络监测和入侵检测系统(IDS),支持Wi-Fi、Bluetooth、ZigBee等协议,具备被动监听、实时数据分析、地理定位等功能。广泛应用于安全审计、网络优化和频谱管理。本文介绍其安装配置、基本操作及高级应用技巧,帮助用户掌握这一强大的无线网络安全工具。
106 9
浅析Kismet:无线网络监测与分析工具
|
2月前
|
数据采集 机器学习/深度学习 人工智能
基于AI的网络流量分析:构建智能化运维体系
基于AI的网络流量分析:构建智能化运维体系
270 13
|
3月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
661 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
3月前
|
安全 网络协议 网络安全
【Azure 环境】从网络包中分析出TLS加密套件信息
An TLS 1.2 connection request was received from a remote client application, but non of the cipher suites supported by the client application are supported by the server. The connection request has failed. 从远程客户端应用程序收到 TLS 1.2 连接请求,但服务器不支持客户端应用程序支持的任何密码套件。连接请求失败。
105 2
|
2月前
|
安全 网络协议 网络安全
网络不稳定导致HTTP代理频繁掉线的分析
随着数字化时代的加速发展,网络安全、隐私保护及内容访问自由成为用户核心需求。HTTP代理服务器因其独特技术优势受到青睐,但其掉线问题频发。本文分析了HTTP代理服务器不稳定导致掉线的主要原因,包括网络问题、服务器质量、用户配置错误及IP资源问题等方面。
166 0
|
3月前
|
存储 安全 网络安全
网络安全法律框架:全球视角下的合规性分析
网络安全法律框架:全球视角下的合规性分析
78 1

热门文章

最新文章