Flink运行时之基于Netty的网络通信中

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: PartitionRequestClient 分区请求客户端(PartitionRequestClient)用于发起远程PartitionRequest请求,它也是RemoteChannel跟Netty通信层之间进行衔接的对象。

PartitionRequestClient

分区请求客户端(PartitionRequestClient)用于发起远程PartitionRequest请求,它也是RemoteChannel跟Netty通信层之间进行衔接的对象。

对单一的TaskManager而言只存在一个NettyClient实例。但处于同一TaskManager中不同的任务实例可能会跟不同的远程TaskManager上的任务之间交换数据,不同的TaskManager实例会有不同的ConnectionID(用于标识不同的IP地址)。因此,Flink采用PartitionRequestClient来对应ConnectionID,并提供了分区请求客户端工厂(PartitionRequestClientFactory)来创建PartitionRequestClient并保存ConnectionID与之的对应关系。

接下来,我们重点分析一下其请求ResultPartition的requestSubpartition方法:

public ChannelFuture requestSubpartition(
        final ResultPartitionID partitionId,
        final int subpartitionIndex,
        final RemoteInputChannel inputChannel,
        int delayMs) throws IOException {
    checkNotClosed();

    //将当前请求数据的RemoteInputChannel的实例注入到NettyClient的ChannelHandler管道的
    //PartitionRequestClientHandler实例中
    partitionRequestHandler.addInputChannel(inputChannel);

    //构建PartitionRequest请求对象
    final PartitionRequest request = new PartitionRequest(
            partitionId, subpartitionIndex, inputChannel.getInputChannelId());

    //构建一个ChannelFutureListener的实例,当I/O操作执行失败后,会触发相关的错误处理逻辑
    final ChannelFutureListener listener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                partitionRequestHandler.removeInputChannel(inputChannel);
                inputChannel.onError(
                        new LocalTransportException(
                                "Sending the partition request failed.",
                                future.channel().localAddress(), future.cause()
                        ));
            }
        }
    };

    //立即发送该请求,并注册listener
    if (delayMs == 0) {
        ChannelFuture f = tcpChannel.writeAndFlush(request);
        f.addListener(listener);
        return f;
    }
    //如果请求需要延迟一定的时间,则延迟发送请求
    else {
        final ChannelFuture[] f = new ChannelFuture[1];
        tcpChannel.eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                f[0] = tcpChannel.writeAndFlush(request);
                f[0].addListener(listener);
            }
        }, delayMs, TimeUnit.MILLISECONDS);

        return f[0];
    }
}

NettyMessage

Flink为基于Netty的通信框架定义了自己的通信消息格式,以及相应的解编码器。NettyMessage由固定大小的消息头和自定义的消息体组成:

NettyMessage-structure

NettyMessage消息头的结构如下图:

NettyMessage-header-structure

由上图可见,NettyMessage消息头所占的空间是固定的9个字节。其中,Frame占用一个整型值的空间其存储的值为整个消息(消息头和自定义的消息体)的大小。Magic Number同样占用一个整形数值的空间且值是固定不变的;而Id表示消息的类型,占用一个字节空间。

自定义的消息体部分依不同的消息类型各有不同。

服务端的消息有:

  • BufferResponse:服务端给出的Buffer响应消息,编号为0;
  • ErrorResponse:服务端的错误响应消息,编号为1;

客户端的消息有:

  • PartitionRequest:客户端发起的分区请求,编号为2;
  • TaskEventRequest:客户端发起的任务事件请求,编号为3;
  • CancelPartitionRequest:客户端发起的取消分区请求,编号为4;
  • CloseRequest:客户端发起的关闭请求,编号为5;

另外,它定义了读写接口,面向的对象是Netty的字节缓冲(ByteBuf)。解编码器NettyMessageEncoder和NettyMessageDecoder以静态内部类实现,分别用来在消息的两种表示(NettyMessage和ByteBuf)之间进行转换。

NettyProtocol

NettyProtocol定义了基于Netty进行网络通信时客户端和服务端对事件的处理逻辑与顺序。由于Netty中所有事件处理逻辑的代码都处于扩展自ChannelHandler接口的类中,所以,NettyProtocol约定了所有的协议实现者,必须提供服务端和客户端处理逻辑的ChannelHandler数组。

最终这些ChannelHandler将依据它们在数组中的顺序进行链接以形成ChannelPipeline。

PartitionRequestProtocol作为NettyProtocol唯一的实现,负责实例化并编排客户端和服务端的ChannelHandler。按照顺序链接的这些ChannelHandler可被视为“协议栈”。接下来,我们分别就客户端和服务端的协议栈给出了图示。

客户端的整个的协议栈如下图所示:

NettyClient-channel-handler-pipeline

PartitionRequestProtocol构建出的客户端协议栈将会被构建成ChannelPipeline,并注册到客户端引导对象Bootstrap中:

bootstrap.handler(newChannelInitializer<SocketChannel>(){
    @Override
    publicvoidinitChannel(SocketChannelchannel)throwsException{
        channel.pipeline().addLast(protocol.getClientChannelHandlers());
    }
});

服务端协议栈如下图所示:

NettyServer-channel-handler-pipeline

同客户端协议栈,服务端协议栈也会被构建成ChannelPipeline并注册到服务端引导对象ServerBootstrap中:

bootstrap.childHandler(newChannelInitializer<SocketChannel>(){
    @Override
    publicvoidinitChannel(SocketChannelchannel)throwsException{
        channel.pipeline().addLast(protocol.getServerChannelHandlers());
    }
});

需要注意的是,无论是客户端还是服务端,数据都存在流入(inbound)和流出(outbound)的过程。流入对应着处理器接口为ChannelInboundHandler,而流出对应的处理器接口为ChannelOutboundHandler。因此,两个协议方法所获取到的ChannelHandler数组并不是安装其元素的绝对顺序组成的管道。而是会区分其类型是流入还是流出(根据接口的类型判断),结合不同的类型并按照其在数组中的顺序将其链接成管道。


原文发布时间为:2017-01-11=2

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
Java 调度
Netty运行原理问题之ChannelHandler在Netty中扮演什么角色
Netty运行原理问题之ChannelHandler在Netty中扮演什么角色
|
3月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
3月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
3月前
|
调度
Netty运行原理问题之事件调度工作的问题如何解决
Netty运行原理问题之事件调度工作的问题如何解决
|
4月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之网络缓冲池(NetworkBufferPool)中可用内存不足,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
65 1
|
3月前
|
数据采集 分布式计算 Kubernetes
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
Apache Flink 实践问题之ZooKeeper 网络瞬断时如何解决
91 4
|
3月前
|
存储 缓存 Java
实时计算 Flink版操作报错合集之怎么处理在运行作业时遇到报错::ClassCastException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
开发者
Netty运行原理问题之Netty高性能实现的问题如何解决
Netty运行原理问题之Netty高性能实现的问题如何解决
|
3月前
|
API 开发者
Netty运行原理问题之Netty实现低开发门槛的问题如何解决
Netty运行原理问题之Netty实现低开发门槛的问题如何解决