spark2.1.0之源码分析——RPC传输管道处理器详解

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81326016 提示:阅读本文前最好先阅读:《Spark2.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81326016

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》

         TransportChannelHandler实现了Netty的ChannelInboundHandler[1],以便对Netty管道中的消息进行处理。《spark2.1.0之源码分析——RPC管道初始化》一文所展示的图1中的Handler(除了MessageEncoder)由于都实现了ChannelInboundHandler接口,作为自定义的ChannelInboundHandler,因而都要重写channelRead方法。Netty框架使用工作链模式来对每个ChannelInboundHandler的实现类的channelRead方法进行链式调用。TransportChannelHandler实现的channelRead方法见代码清单1。

代码清单1         TransportChannelHandler的channelRead实现

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
    } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
    } else {
      ctx.fireChannelRead(request);
    }
  }

从代码清单1看到,当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler。

MessageHandler的继承体系

         TransportRequestHandler与TransportResponseHandler都继承自抽象类MessageHandler,MessageHandler定义了子类的规范,详细定义见代码清单2。

代码清单2         MessageHandler规范

public abstract class MessageHandler<T extends Message> {
  public abstract void handle(T message) throws Exception;
  public abstract void channelActive();
  public abstract void exceptionCaught(Throwable cause);
  public abstract void channelInactive();
}

MessageHandler中定义的各个方法的作用分别为:

  • handle:用于对接收到的单个消息进行处理;
  • channelActive:当channel激活时调用;
  • exceptionCaught:当捕获到channel发生异常时调用;
  • channelInactive:当channel非激活时调用;

Spark中MessageHandler类的继承体系如图1所示。

MessageHandler类的继承体系
图1       MessageHandler类的继承体系

Message的继承体系

根据代码清单2,我们知道MessageHandler同时也是一个Java泛型类,其子类能处理的消息都派生自接口Message。Message的定义见代码清单3。

代码清单3         Message的定义

public interface Message extends Encodable {
  Type type();
  ManagedBuffer body();
  boolean isBodyInFrame();

Message中定义的三个接口方法的作用分别为:

  • type:返回消息的类型;
  • body:返回消息中可选的内容体;
  • isBodyInFrame:用于判断消息的主体是否包含在消息的同一帧中。

Message接口继承了Encodable接口,Encodable的定义见代码清单4。

代码清单4         Encodable的定义

public interface Encodable {
  int encodedLength();
  void encode(ByteBuf buf);
}

实现Encodable接口的类将可以转换到一个ByteBuf中,多个对象将被存储到预先分配的单个ByteBuf,所以这里的encodedLength用于返回转换的对象数量。下面一起来看看Message的类继承体系,如图2所示。

Message的类继承体系
图2       Message的类继承体系

从图2看到最终的消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口,其中RequestMessage的具体实现有四种,分别是:

  • ChunkFetchRequest:请求获取流的单个块的序列。
  • RpcRequest:此消息类型由远程的RPC服务端进行处理,是一种需要服务端向客户端回复的RPC请求信息类型。
  • OneWayMessage:此消息也需要由远程的RPC服务端进行处理,与RpcRequest不同的是不需要服务端向客户端回复。
  • StreamRequest:此消息表示向远程的服务发起请求,以获取流式数据。

由于OneWayMessage 不需要响应,所以ResponseMessage的对于成功或失败状态的实现各有三种,分别是:

  • ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息;
  • ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息;
  • RpcResponse:处理RpcRequest成功后返回的消息;
  • RpcFailure:处理RpcRequest失败后返回的消息;
  • StreamResponse:处理StreamRequest成功后返回的消息;
  • StreamFailure:处理StreamRequest失败后返回的消息;

ManagedBuffer的继承体系

回头再看看代码清单3中对body接口的定义,可以看到其返回内容体的类型为ManagedBuffer。ManagedBuffer提供了由字节构成数据的不可变视图(也就是说ManagedBuffer并不存储数据,也不是数据的实际来源,这同关系型数据库的视图类似)。我们先来看看抽象类ManagedBuffer中对行为的定义,见代码清单5。

代码清单5         ManagedBuffer的定义

public abstract class ManagedBuffer {
  public abstract long size();
  public abstract ByteBuffer nioByteBuffer() throws IOException;
  public abstract InputStream createInputStream() throws IOException;
  public abstract ManagedBuffer retain();
  public abstract ManagedBuffer release();
  public abstract Object convertToNetty() throws IOException;
}

ManagedBuffer中定义了六个方法,分别为:

  • size:返回数据的字节数。
  • nioByteBuffer:将数据按照NioByteBuffer类型返回。
  • createInputStream:将数据按照InputStream返回。
  • retain:当有新的使用者使用此视图时,增加引用此视图的引用数。
  • release:当有使用者不再使用此视图时,减少引用此视图的引用数。当引用数为0时释放缓冲区。
  • convertToNetty:将缓冲区的数据转换为Netty的对象,用来将数据写到外部。此方法返回的数据类型要么是io.netty.buffer.ByteBuf,要么是io.netty.channel.FileRegion

ManagedBuffer的具体实现有很多,我们可以通过图3来了解。

ManagedBuffer的继承体系
图3       ManagedBuffer的继承体系

图3中列出了ManagedBuffer的五个实现类,其中TestManagedBuffer和RecordingManagedBuffer用于测试。NettyManagedBuffer中的缓冲为io.netty.buffer.ByteBufNioManagedBuffer中的缓冲为java.nio.ByteBufferNettyManagedBuffer和NioManagedBuffer的实现都非常简单,留给读者自行阅读。本节挑选FileSegmentManagedBuffer作为ManagedBuffer具体实现的例子进行介绍。

     FileSegmentManagedBuffer的作用为获取一个文件中的一段,它一共有四个由final修饰的属性,全部都通过FileSegmentManagedBuffer的构造器传入属性值,这四个属性为:

  • conf:即TransportConf。
  • file:所要读取的文件。
  • offset:所要读取文件的偏移量。
  • length:所要读取的长度。

下面将逐个介绍FileSegmentManagedBuffer对于ManagedBuffer的实现。

NIO方式读取文件

FileSegmentManagedBuffer实现的nioByteBuffer方法见代码清单6。

代码清单6         nioByteBuffer方法的实现

  @Override
  public ByteBuffer nioByteBuffer() throws IOException {
    FileChannel channel = null;
    try {
      channel = new RandomAccessFile(file, "r").getChannel();
      if (length < conf.memoryMapBytes()) {
        ByteBuffer buf = ByteBuffer.allocate((int) length);
        channel.position(offset);
        while (buf.remaining() != 0) {
          if (channel.read(buf) == -1) {
            throw new IOException(String.format("Reached EOF before filling buffer\n" +
              "offset=%s\nfile=%s\nbuf.remaining=%s",
              offset, file.getAbsoluteFile(), buf.remaining()));
          }
        }
        buf.flip();
        return buf;
      } else {
        return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
      }
    } catch (IOException e) {
      try {
        if (channel != null) {
          long size = channel.size();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
            e);
        }
      } catch (IOException ignored) {
        // ignore
      }
      throw new IOException("Error in opening " + this, e);
    } finally {
      JavaUtils.closeQuietly(channel);
    }
  }

nioByteBuffer的实现还是很简单的,主要利用RandomAccessFile获取FileChannel,然后使用java.nio.ByteBufferFileChannel的API将数据写入缓冲区java.nio.ByteBuffer中。

文件流方式读取文件

FileSegmentManagedBuffer实现的createInputStream方法见代码清单7。

代码清单7         createInputStream的实现

  @Override
  public InputStream createInputStream() throws IOException {
    FileInputStream is = null;
    try {
      is = new FileInputStream(file);
      ByteStreams.skipFully(is, offset);
      return new LimitedInputStream(is, length);
    } catch (IOException e) {
      try {
        if (is != null) {
          long size = file.length();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
              e);
        }
      } catch (IOException ignored) {
        // ignore
      } finally {
        JavaUtils.closeQuietly(is);
      }
      throw new IOException("Error in opening " + this, e);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(is);
      throw e;
    }
  }

createInputStream的实现还是很简单的,这里不多作介绍。

将数据转换为Netty对象

FileSegmentManagedBuffer实现的convertToNetty方法见代码清单8。

代码清单8         convertToNetty的实现

  @Override
  public Object convertToNetty() throws IOException {
    if (conf.lazyFileDescriptor()) {
      return new DefaultFileRegion(file, offset, length);
    } else {
      FileChannel fileChannel = new FileInputStream(file).getChannel();
      return new DefaultFileRegion(fileChannel, offset, length);
    }
  }

其他方法的实现

其他方法由于实现非常简单,所以这里就不一一列出了,感兴趣的读者可以自行查阅。


[1] ChannelInboundHandler接口的实现及原理不属于本书要分析的内容,感兴趣的同学可以阅读Netty的官方文档或者研究Netty的源码。

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

相关文章
|
7月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
机器学习/深度学习 分布式计算 自然语言处理
Spark机器学习管道 - Estimator
Spark机器学习管道 - Estimator
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
233 0
|
机器学习/深度学习 数据采集 分布式计算
Spark机器学习管道 - Pipeline
Spark机器学习管道 - Pipeline
|
机器学习/深度学习 存储 分布式计算
Spark机器学习管道 - Transformer
Spark机器学习管道 - Transformer
|
消息中间件 Java RocketMQ
RocketMQ源码分析-Rpc通信模块(remoting)二
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
450 0
RocketMQ源码分析-Rpc通信模块(remoting)二
|
消息中间件 编解码 网络协议
RocketMQ源码分析-Rpc通信模块(remoting)一
上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
534 0
RocketMQ源码分析-Rpc通信模块(remoting)一
|
编解码 JSON 网络协议
透视RPC协议:SOFA-BOLT协议源码分析
最近在看Netty相关的资料,刚好SOFA-BOLT是一个比较成熟的Netty自定义协议栈实现,于是决定研读SOFA-BOLT的源码,详细分析其协议的组成,简单分析其客户端和服务端的源码实现。当前阅读的源码是2021-08左右的SOFA-BOLT仓库的master分支源码。
366 0
透视RPC协议:SOFA-BOLT协议源码分析
|
分布式计算 大数据 Spark
【Spark Summit East 2017】基于Spark ML和GraphFrames的大规模文本分析管道
本讲义出自Alexey Svyatkovskiy在Spark Summit East 2017上的演讲,主要介绍了基于Spark ML和GraphFrames的大规模文本分析管道的实现,并介绍了用于的描绘直方图、计算描述性统计的跨平台的Scala数据聚合基元——Histogrammar package,并分享了非结构化数据处理、高效访问的数据存储格式以及大规模图处理等问题。
2136 0
|
28天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
82 2
ClickHouse与大数据生态集成:Spark & Flink 实战