spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81624875  提示:阅读本文前最好先阅读:《Spark2.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81624875

 

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》
  6. spark2.1.0之源码分析——RPC传输管道处理器详解

在《spark2.1.0之源码分析——RPC传输管道处理器详解》一文中详细介绍了TransportRequestHandler。

由于TransportRequestHandler实际是把请求消息交给RpcHandler进一步处理的,所以这里对RpcHandler首先做个介绍。RpcHandler是一个抽象类,定义了一些RPC处理器的规范,其主要实现见代码清单1。

代码清单1         RpcHandler的实现

public abstract class RpcHandler {

  private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();

  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);

  public abstract StreamManager getStreamManager();

  public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }

  public void channelActive(TransportClient client) { }

  public void channelInactive(TransportClient client) { }

  public void exceptionCaught(Throwable cause, TransportClient client) { }

  private static class OneWayRpcCallback implements RpcResponseCallback {

    private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);

    @Override
    public void onSuccess(ByteBuffer response) {
      logger.warn("Response provided for one-way RPC.");
    }

    @Override
    public void onFailure(Throwable e) {
      logger.error("Error response provided for one-way RPC.", e);
    }

  }

}

代码清单1中RpcHandler的各个方法的作用如下:

  • receive:这是一个抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现。receive接收三个参数,分别是TransportClientByteBufferRpcResponseCallbackRpcResponseCallback用于对请求处理结束后进行回调,无论处理结果是成功还是失败,RpcResponseCallback都会被调用一次。RpcResponseCallback的接口定义如下:
public interface RpcResponseCallback {
  void onSuccess(ByteBuffer response);
  void onFailure(Throwable e);
}
  • 重载的receive:只接收TransportClientByteBuffer两个参数,RpcResponseCallback为默认的ONE_WAY_CALLBACK,其类型为OneWayRpcCallback,从代码清单3-27中OneWayRpcCallback的实现可以看出其onSuccessonFailure只是打印日志,并没有针对客户端做回复处理。
  • channelActive:当与给定客户端相关联的channel处于活动状态时调用。
  • channelInactive:当与给定客户端相关联的channel处于非活动状态时调用。
  • exceptionCaught:当channel产生异常时调用。
  • getStreamManager:获取StreamManager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态。、

介绍完RpcHandler,现在回到TransportRequestHandler的处理过程。TransportRequestHandler处理以上四种RequestMessage的实现见代码清单2。

代码清单2         TransportRequestHandler的handle方法

  @Override
  public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }

结合代码清单2,下面逐一详细分析这四种类型请求的处理过程。

处理块获取请求

         processFetchRequest方法用于处理ChunkFetchRequest类型的消息,其实现见代码清单3。

代码清单3         processFetchRequest的实现

  private void processFetchRequest(final ChunkFetchRequest req) {
    if (logger.isTraceEnabled()) {
      logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
        req.streamChunkId);
    }

    ManagedBuffer buf;
    try {
      streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
      streamManager.registerChannel(channel, req.streamChunkId.streamId);
      buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
    } catch (Exception e) {
      logger.error(String.format("Error opening block %s for request from %s",
        req.streamChunkId, getRemoteAddress(channel)), e);
      respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
      return;
    }

    respond(new ChunkFetchSuccess(req.streamChunkId, buf));
  }

代码清单3中的streamManager是通过调用RpcHandler的getStreamManager方法获取的StreamManager。processFetchRequest的处理都依托于RpcHandler的StreamManager,其处理步骤如下:

  1. 调用StreamManager的checkAuthorization方法,校验客户端是否有权限从给定的流中读取;
  2. 调用StreamManager的registerChannel方法,将一个流和一条(只能是一条)客户端的TCP连接关联起来,这可以保证对于单个的流只会有一个客户端读取。流关闭之后就永远不能够重用了;
  3. 调用StreamManager的getChunk方法,获取单个的块(块被封装为ManagedBuffer)。由于单个的流只能与单个的TCP连接相关联,因此getChunk方法不能为了某个特殊的流而并行调用;
  4. 将ManagedBuffer和流的块Id封装为ChunkFetchSuccess后,调用respond方法返回给客户端。

有关StreamManager的具体实现,读者可以参考《Spark内核设计的艺术 架构设计与实现》一书5.3.5节介绍的NettyStreamManager和6.9.2节介绍的NettyBlockRpcServer中的OneForOneStreamManager。

处理RPC请求

         processRpcRequest方法用于处理RpcRequest类型的消息,其实现见代码清单4。

代码清单4         processRpcRequest的实现

  private void processRpcRequest(final RpcRequest req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }

        @Override
        public void onFailure(Throwable e) {
          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
        }
      });
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    } finally {
      req.body().release();
    }
  }

代码清单4中将RpcRequest消息的内容体、发送消息的客户端以及一个RpcResponseCallback类型的匿名内部类作为参数传递给了RpcHandler的receive方法。这就是说真正用于处理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。由于RpcHandler是抽象类(见代码清单1),其receive方法也是抽象方法,所以具体的操作将由RpcHandler的实现了receive方法的子类来完成。所有继承RpcHandler的子类都需要在其receive方法的具体实现中回调RpcResponseCallback的onSuccess(处理成功时)或者onFailure(处理失败时)方法。从RpcResponseCallback的实现来看,无论处理结果成功还是失败,都将调用respond方法对客户端进行响应。

处理流请求

         processStreamRequest方法用于处理StreamRequest类型的消息,其实现见代码清单5。

代码清单5         processStreamRequest的实现

  private void processStreamRequest(final StreamRequest req) {
    ManagedBuffer buf;
    try {
      buf = streamManager.openStream(req.streamId);// 将获取到的流数据封装为ManagedBuffer
    } catch (Exception e) {
      logger.error(String.format(
        "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
      respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
      return;
    }

    if (buf != null) {
      respond(new StreamResponse(req.streamId, buf.size(), buf));
    } else {
      respond(new StreamFailure(req.streamId, String.format(
        "Stream '%s' was not found.", req.streamId)));
    }
  }

代码清单5中也使用了RpcHandler的StreamManager,其处理步骤如下:

  1. 调用StreamManager的openStream方法将获取到的流数据封装为ManagedBuffer;
  2. 当成功或失败时调用respond方法向客户端响应。

处理无需回复的RPC请求

         processOneWayMessage方法用于处理StreamRequest类型的消息,其实现见代码清单6。

代码清单6         processOneWayMessage的实现

  private void processOneWayMessage(OneWayMessage req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
    } finally {
      req.body().release();
    }
  }

processOneWayMessage方法的实现processRpcRequest非常相似,区别在于processOneWayMessage调用了代码清单1中ONE_WAY_CALLBACK的receive方法,因而processOneWayMessage在处理完RPC请求后不会对客户端作出响应。

         从以上四种处理的分析可以看出最终的处理都由RpcHandler及其内部组件完成。除了OneWayMessage的消息外,其余三种消息都是最终调用respond方法响应客户端,其实现见代码清单7。

代码清单7         respond的实现

  private void respond(final Encodable result) {
    final SocketAddress remoteAddress = channel.remoteAddress();
    channel.writeAndFlush(result).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            logger.trace("Sent result {} to client {}", result, remoteAddress);
          } else {
            logger.error(String.format("Error sending result %s to %s; closing connection",
              result, remoteAddress), future.cause());
            channel.close();
          }
        }
      }
    );
  }

可以看到respond方法中实际调用了Channel的writeAndFlush方法[1]来响应客户端。

 


[1] Channel的writeAndFlush方法涉及Netty的实现细节及原理,这并不是本书所要阐述的内容,有兴趣的读者可以访问Netty官网:http://netty.io获取更多信息。

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

相关文章
|
5月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
7月前
|
Go API 数据库
Go 微服务框架 go-micro 使用客户端 RPC 调用服务端方法返回 408 怎么解决?
Go 微服务框架 go-micro 使用客户端 RPC 调用服务端方法返回 408 怎么解决?
50 0
|
8月前
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
170 0
|
JSON 网络协议 安全
gRPC(八)生态 grpc-gateway 应用:同一个服务端支持Rpc和Restful Api
版权声明:本文为博主原创文章,未经博主允许不得转载。https://blog.csdn.net/weixin_46618592/article/details/127776709?spm=1001.2014.3001.5501
411 0
gRPC(八)生态 grpc-gateway 应用:同一个服务端支持Rpc和Restful Api
|
网络协议 Linux 网络安全
AnolisOS8.6做NFS服务端,挂载失败 mount: RPC: Unable to receive; errno = Connection refused
anolis8.6安装nfs服务端,在显示共享目录时,始终报错
|
消息中间件 Java RocketMQ
RocketMQ源码分析-Rpc通信模块(remoting)二
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
407 0
RocketMQ源码分析-Rpc通信模块(remoting)二
|
消息中间件 编解码 网络协议
RocketMQ源码分析-Rpc通信模块(remoting)一
上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
480 0
RocketMQ源码分析-Rpc通信模块(remoting)一
RPC框架(7 - 实现服务端自动注册服务)
RPC框架(7 - 实现服务端自动注册服务)
|
3月前
|
负载均衡 Dubbo Java
Dubbo 3.x:探索阿里巴巴的开源RPC框架新技术
随着微服务架构的兴起,远程过程调用(RPC)框架成为了关键组件。Dubbo,作为阿里巴巴的开源RPC框架,已经演进到了3.x版本,带来了许多新特性和技术改进。本文将探讨Dubbo 3.x中的一些最新技术,包括服务注册与发现、负载均衡、服务治理等,并通过代码示例展示其使用方式。
87 9
|
8月前
|
消息中间件 负载均衡 Dubbo
如何自己设计一个类似Dubbo的RPC框架?
如何自己设计一个类似Dubbo的RPC框架?
64 0