spark2.1.0之源码分析——RPC服务端引导程序TransportServerBootstrap

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81867045 提示:阅读本文前最好先阅读:《Spark2.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81867045

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》
  6. spark2.1.0之源码分析——RPC传输管道处理器详解
  7. spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解

通过《spark2.1.0之源码分析——RPC服务器TransportServer》一文的介绍,我们知道TransportServer的构造器中的bootstraps是TransportServerBootstrap的列表。接口TransportServerBootstrap定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。TransportServerBootstrap的定义见代码清单1。

代码清单1         TransportServerBootstrap的定义 

public interface TransportServerBootstrap {
  RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

TransportServerBootstrap的doBootstrap方法将对服务端的RpcHandler进行代理,接收客户端的请求。TransportServerBootstrap有SaslServerBootstrap和EncryptionCheckerBootstrap两个实现类。为了更清楚的说明TransportServerBootstrap的意义,我们以SaslServerBootstrap为例,来讲解其实现(见代码清单2)。

代码清单2         SaslServerBootstrap的doBootstrap实现

  public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
    return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
  }

根据代码清单2,我们知道SaslServerBootstrap的doBootstrap方法实际创建了SaslRpcHandler,SaslRpcHandler负责对管道进行SASL(Simple Authentication and Security Layer)加密。SaslRpcHandler本身也继承了RpcHandler,所以我们重点来看其receive方法的实现,见代码清单3。

代码清单3        SaslRpcHandler的receive方法

  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    if (isComplete) {
      // 将消息传递给SaslRpcHandler所代理的下游RpcHandler并返回
      delegate.receive(client, message, callback);
      return;
    }

    ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
    SaslMessage saslMessage;
    try {
      saslMessage = SaslMessage.decode(nettyBuf);// 对客户端发送的消息进行SASL解密
    } finally {
      nettyBuf.release();
    }

    if (saslServer == null) {
      // 如果saslServer还未创建,则需要创建SparkSaslServer
      client.setClientId(saslMessage.appId);
      saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
        conf.saslServerAlwaysEncrypt());
    }

    byte[] response;
    try {
      response = saslServer.response(JavaUtils.bufferToArray(// 使用saslServer处理已解密的消息
        saslMessage.body().nioByteBuffer()));
    } catch (IOException ioe) {
      throw new RuntimeException(ioe);
    }
    callback.onSuccess(ByteBuffer.wrap(response));

    if (saslServer.isComplete()) {
      logger.debug("SASL authentication successful for channel {}", client);
      isComplete = true;// SASL认证交换已经完成
      if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
        logger.debug("Enabling encryption for channel {}", client);
        // 对管道进行SASL加密
        SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
        saslServer = null;
      } else {
        saslServer.dispose();
        saslServer = null;
      }
    }
  }

根据代码清单3,SaslRpcHandler处理客户端消息的步骤如下:

  1. 如果SASL认证交换已经完成(isComplete等于true),则将消息传递给SaslRpcHandler所代理的下游RpcHandler并返回。
  2. 如果SASL认证交换未完成(isComplete等于false),则对客户端发送的消息进行SASL解密。
  3. 如果saslServer还未创建,则需要创建SparkSaslServer。当SaslRpcHandler接收到客户端的第一条消息时会做此操作。
  4. 使用saslServer处理已解密的消息,并将处理结果通过RpcResponseCallback的回调方法返回给客户端。
  5. 如果SASL认证交换已经完成,则将isComplete置为true。
  6. 对管道进行SASL加密。

SaslServerBootstrap是通过SaslRpcHandler对下游RpcHandler进行代理的一种TransportServerBootstrap。EncryptionCheckerBootstrap是另一种TransportServerBootstrap的实现,它通过将自身加入Netty的管道中实现引导,EncryptionCheckerBootstrap的doBootstrap方法的实现见代码清单4。

代码清单4         EncryptionCheckerBootstrap的doBootstrap实现

    @Override
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
      channel.pipeline().addFirst("encryptionChecker", this);
      return rpcHandler;
    }

在详细介绍了TransportChannelHandler之后我们就可以对《spark2.1.0之源码分析——RPC管道初始化》文中的图1进行扩展,把TransportRequestHandler、TransportServerBootstrap及RpcHandler的处理流程增加进来,如下图所示。

RPC框架服务端处理请求、响应流程图

                                                                          RPC框架服务端处理请求、响应流程图

有读者可能会问,上图中并未见TransportServerBootstrap的身影。根据对TransportServerBootstrap的两种实现的举例,我们知道TransportServerBootstrap将可能存在于图中任何两个组件的箭头连线中间,起到引导、包装、代理的作用。

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

相关文章
|
5月前
|
分布式计算 Java Scala
181 Spark IDEA中编写WordCount程序
181 Spark IDEA中编写WordCount程序
31 0
|
5月前
|
分布式计算 算法 Shell
180 Spark程序执行
180 Spark程序执行
26 0
|
4月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
84 0
|
4月前
|
分布式计算 监控 Java
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序
51 0
|
4月前
|
分布式计算 大数据 Java
大数据必知必会系列——面试官问能不能手写一个spark程序?[新星计划]
大数据必知必会系列——面试官问能不能手写一个spark程序?[新星计划]
48 0
|
3月前
|
分布式计算 资源调度 监控
Spark应用程序的结构与驱动程序
Spark应用程序的结构与驱动程序
|
4月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
108 0
|
5月前
|
SQL 分布式计算 Java
201 Spark SQL查询程序
201 Spark SQL查询程序
35 0
|
6月前
|
Go API 数据库
Go 微服务框架 go-micro 使用客户端 RPC 调用服务端方法返回 408 怎么解决?
Go 微服务框架 go-micro 使用客户端 RPC 调用服务端方法返回 408 怎么解决?
48 0