spark2.1.0之源码分析——RPC传输管道处理器详解

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81326016 提示:阅读本文前最好先阅读:《Spark2.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81326016

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》

         TransportChannelHandler实现了Netty的ChannelInboundHandler[1],以便对Netty管道中的消息进行处理。《spark2.1.0之源码分析——RPC管道初始化》一文所展示的图1中的Handler(除了MessageEncoder)由于都实现了ChannelInboundHandler接口,作为自定义的ChannelInboundHandler,因而都要重写channelRead方法。Netty框架使用工作链模式来对每个ChannelInboundHandler的实现类的channelRead方法进行链式调用。TransportChannelHandler实现的channelRead方法见代码清单1。

代码清单1         TransportChannelHandler的channelRead实现

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
    } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
    } else {
      ctx.fireChannelRead(request);
    }
  }

从代码清单1看到,当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler。

MessageHandler的继承体系

         TransportRequestHandler与TransportResponseHandler都继承自抽象类MessageHandler,MessageHandler定义了子类的规范,详细定义见代码清单2。

代码清单2         MessageHandler规范

public abstract class MessageHandler<T extends Message> {
  public abstract void handle(T message) throws Exception;
  public abstract void channelActive();
  public abstract void exceptionCaught(Throwable cause);
  public abstract void channelInactive();
}

MessageHandler中定义的各个方法的作用分别为:

  • handle:用于对接收到的单个消息进行处理;
  • channelActive:当channel激活时调用;
  • exceptionCaught:当捕获到channel发生异常时调用;
  • channelInactive:当channel非激活时调用;

Spark中MessageHandler类的继承体系如图1所示。

MessageHandler类的继承体系
图1       MessageHandler类的继承体系

Message的继承体系

根据代码清单2,我们知道MessageHandler同时也是一个Java泛型类,其子类能处理的消息都派生自接口Message。Message的定义见代码清单3。

代码清单3         Message的定义

public interface Message extends Encodable {
  Type type();
  ManagedBuffer body();
  boolean isBodyInFrame();

Message中定义的三个接口方法的作用分别为:

  • type:返回消息的类型;
  • body:返回消息中可选的内容体;
  • isBodyInFrame:用于判断消息的主体是否包含在消息的同一帧中。

Message接口继承了Encodable接口,Encodable的定义见代码清单4。

代码清单4         Encodable的定义

public interface Encodable {
  int encodedLength();
  void encode(ByteBuf buf);
}

实现Encodable接口的类将可以转换到一个ByteBuf中,多个对象将被存储到预先分配的单个ByteBuf,所以这里的encodedLength用于返回转换的对象数量。下面一起来看看Message的类继承体系,如图2所示。

Message的类继承体系
图2       Message的类继承体系

从图2看到最终的消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口,其中RequestMessage的具体实现有四种,分别是:

  • ChunkFetchRequest:请求获取流的单个块的序列。
  • RpcRequest:此消息类型由远程的RPC服务端进行处理,是一种需要服务端向客户端回复的RPC请求信息类型。
  • OneWayMessage:此消息也需要由远程的RPC服务端进行处理,与RpcRequest不同的是不需要服务端向客户端回复。
  • StreamRequest:此消息表示向远程的服务发起请求,以获取流式数据。

由于OneWayMessage 不需要响应,所以ResponseMessage的对于成功或失败状态的实现各有三种,分别是:

  • ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息;
  • ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息;
  • RpcResponse:处理RpcRequest成功后返回的消息;
  • RpcFailure:处理RpcRequest失败后返回的消息;
  • StreamResponse:处理StreamRequest成功后返回的消息;
  • StreamFailure:处理StreamRequest失败后返回的消息;

ManagedBuffer的继承体系

回头再看看代码清单3中对body接口的定义,可以看到其返回内容体的类型为ManagedBuffer。ManagedBuffer提供了由字节构成数据的不可变视图(也就是说ManagedBuffer并不存储数据,也不是数据的实际来源,这同关系型数据库的视图类似)。我们先来看看抽象类ManagedBuffer中对行为的定义,见代码清单5。

代码清单5         ManagedBuffer的定义

public abstract class ManagedBuffer {
  public abstract long size();
  public abstract ByteBuffer nioByteBuffer() throws IOException;
  public abstract InputStream createInputStream() throws IOException;
  public abstract ManagedBuffer retain();
  public abstract ManagedBuffer release();
  public abstract Object convertToNetty() throws IOException;
}

ManagedBuffer中定义了六个方法,分别为:

  • size:返回数据的字节数。
  • nioByteBuffer:将数据按照NioByteBuffer类型返回。
  • createInputStream:将数据按照InputStream返回。
  • retain:当有新的使用者使用此视图时,增加引用此视图的引用数。
  • release:当有使用者不再使用此视图时,减少引用此视图的引用数。当引用数为0时释放缓冲区。
  • convertToNetty:将缓冲区的数据转换为Netty的对象,用来将数据写到外部。此方法返回的数据类型要么是io.netty.buffer.ByteBuf,要么是io.netty.channel.FileRegion

ManagedBuffer的具体实现有很多,我们可以通过图3来了解。

ManagedBuffer的继承体系
图3       ManagedBuffer的继承体系

图3中列出了ManagedBuffer的五个实现类,其中TestManagedBuffer和RecordingManagedBuffer用于测试。NettyManagedBuffer中的缓冲为io.netty.buffer.ByteBufNioManagedBuffer中的缓冲为java.nio.ByteBufferNettyManagedBuffer和NioManagedBuffer的实现都非常简单,留给读者自行阅读。本节挑选FileSegmentManagedBuffer作为ManagedBuffer具体实现的例子进行介绍。

     FileSegmentManagedBuffer的作用为获取一个文件中的一段,它一共有四个由final修饰的属性,全部都通过FileSegmentManagedBuffer的构造器传入属性值,这四个属性为:

  • conf:即TransportConf。
  • file:所要读取的文件。
  • offset:所要读取文件的偏移量。
  • length:所要读取的长度。

下面将逐个介绍FileSegmentManagedBuffer对于ManagedBuffer的实现。

NIO方式读取文件

FileSegmentManagedBuffer实现的nioByteBuffer方法见代码清单6。

代码清单6         nioByteBuffer方法的实现

  @Override
  public ByteBuffer nioByteBuffer() throws IOException {
    FileChannel channel = null;
    try {
      channel = new RandomAccessFile(file, "r").getChannel();
      if (length < conf.memoryMapBytes()) {
        ByteBuffer buf = ByteBuffer.allocate((int) length);
        channel.position(offset);
        while (buf.remaining() != 0) {
          if (channel.read(buf) == -1) {
            throw new IOException(String.format("Reached EOF before filling buffer\n" +
              "offset=%s\nfile=%s\nbuf.remaining=%s",
              offset, file.getAbsoluteFile(), buf.remaining()));
          }
        }
        buf.flip();
        return buf;
      } else {
        return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
      }
    } catch (IOException e) {
      try {
        if (channel != null) {
          long size = channel.size();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
            e);
        }
      } catch (IOException ignored) {
        // ignore
      }
      throw new IOException("Error in opening " + this, e);
    } finally {
      JavaUtils.closeQuietly(channel);
    }
  }

nioByteBuffer的实现还是很简单的,主要利用RandomAccessFile获取FileChannel,然后使用java.nio.ByteBufferFileChannel的API将数据写入缓冲区java.nio.ByteBuffer中。

文件流方式读取文件

FileSegmentManagedBuffer实现的createInputStream方法见代码清单7。

代码清单7         createInputStream的实现

  @Override
  public InputStream createInputStream() throws IOException {
    FileInputStream is = null;
    try {
      is = new FileInputStream(file);
      ByteStreams.skipFully(is, offset);
      return new LimitedInputStream(is, length);
    } catch (IOException e) {
      try {
        if (is != null) {
          long size = file.length();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
              e);
        }
      } catch (IOException ignored) {
        // ignore
      } finally {
        JavaUtils.closeQuietly(is);
      }
      throw new IOException("Error in opening " + this, e);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(is);
      throw e;
    }
  }

createInputStream的实现还是很简单的,这里不多作介绍。

将数据转换为Netty对象

FileSegmentManagedBuffer实现的convertToNetty方法见代码清单8。

代码清单8         convertToNetty的实现

  @Override
  public Object convertToNetty() throws IOException {
    if (conf.lazyFileDescriptor()) {
      return new DefaultFileRegion(file, offset, length);
    } else {
      FileChannel fileChannel = new FileInputStream(file).getChannel();
      return new DefaultFileRegion(fileChannel, offset, length);
    }
  }

其他方法的实现

其他方法由于实现非常简单,所以这里就不一一列出了,感兴趣的读者可以自行查阅。


[1] ChannelInboundHandler接口的实现及原理不属于本书要分析的内容,感兴趣的同学可以阅读Netty的官方文档或者研究Netty的源码。

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

相关文章
|
7月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
机器学习/深度学习 分布式计算 自然语言处理
Spark机器学习管道 - Estimator
Spark机器学习管道 - Estimator
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
246 0
|
机器学习/深度学习 数据采集 分布式计算
Spark机器学习管道 - Pipeline
Spark机器学习管道 - Pipeline
|
机器学习/深度学习 存储 分布式计算
Spark机器学习管道 - Transformer
Spark机器学习管道 - Transformer
|
消息中间件 Java RocketMQ
RocketMQ源码分析-Rpc通信模块(remoting)二
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
465 0
RocketMQ源码分析-Rpc通信模块(remoting)二
|
消息中间件 编解码 网络协议
RocketMQ源码分析-Rpc通信模块(remoting)一
上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
538 0
RocketMQ源码分析-Rpc通信模块(remoting)一
|
编解码 JSON 网络协议
透视RPC协议:SOFA-BOLT协议源码分析
最近在看Netty相关的资料,刚好SOFA-BOLT是一个比较成熟的Netty自定义协议栈实现,于是决定研读SOFA-BOLT的源码,详细分析其协议的组成,简单分析其客户端和服务端的源码实现。当前阅读的源码是2021-08左右的SOFA-BOLT仓库的master分支源码。
375 0
透视RPC协议:SOFA-BOLT协议源码分析