spark2.1.0之源码分析——RPC客户端TransportClient详解

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/82143001 提示:阅读本文前最好先阅读:《Spark2.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/82143001

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》
  6. spark2.1.0之源码分析——RPC传输管道处理器详解
  7. spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解
  8. spark2.1.0之源码分析——RPC服务端引导程序TransportServerBootstrap

         在《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解》一文曾介绍过服务端RpcHandler对请求消息的处理,现在来看看客户端发送RPC请求的原理。我们在分析《spark2.1.0之源码分析——RPC管道初始化》中列出的代码清单2中的createChannelHandler方法时,看到调用了TransportClient的构造器(见代码清单1),其中TransportResponseHandler的引用将赋给handler属性。

代码清单1         TransportClient的构造器

  public TransportClient(Channel channel, TransportResponseHandler handler) {
    this.channel = Preconditions.checkNotNull(channel);
    this.handler = Preconditions.checkNotNull(handler);
    this.timedOut = false;
  }

TransportClient一共有五个方法用于发送请求,分别为:

  1. fetchChunk:从远端协商好的流中请求单个块;
  2. stream:使用流的ID,从远端获取流数据;
  3. sendRpc:向服务端发送RPC的请求,通过At least Once Delivery原则保证请求不会丢失;
  4. sendRpcSync:向服务端发送异步的RPC的请求,并根据指定的超时时间等待响应;
  5. send:向服务端发送RPC的请求,但是并不期望能获取响应,因而不能保证投递的可靠性;

本节只选择最常用的sendRpc和fetchChunk进行分析,其余实现都可以触类旁通。

发送RPC请求

         sendRpc方法的实现见代码清单2。

代码清单2         sendRpc的实现

  public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isTraceEnabled()) {
      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    }
    // 使用UUID生成请求主键requestId
    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    handler.addRpcRequest(requestId, callback);// 添加requestId与RpcResponseCallback的引用之间的关系
    // 发送RPC请求
    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", requestId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeRpcRequest(requestId);
            channel.close();
            try {
              callback.onFailure(new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });

    return requestId;
  }

结合代码清单2,我们知道sendRpc方法的实现步骤如下:

  1. 使用UUID生成请求主键requestId;
  2. 调用addRpcRequest向handler(特别提醒下读者这里的handler不是RpcHandler,而是通过TransportClient构造器传入的TransportResponseHandler)添加requestId与回调类RpcResponseCallback的引用之间的关系。TransportResponseHandler的addRpcRequest方法(见代码清单3)将更新最后一次请求的时间为当前系统时间,然后将requestId与RpcResponseCallback之间的映射加入到outstandingRpcs缓存中。outstandingRpcs专门用于缓存发出的RPC请求信息。
  3. 调用Channel的writeAndFlush方法将RPC请求发送出去,这和在《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解》一文列出的代码清单7中服务端调用的respond方法响应客户端的一样,都是使用Channel的writeAndFlush方法。当发送成功或者失败时会回调ChannelFutureListener的operationComplete方法。如果发送成功,那么只会打印requestId、远端地址及花费时间的日志,如果发送失败,除了打印错误日志外,还要调用TransportResponseHandler的removeRpcRequest方法(见代码清单4)将此次请求从outstandingRpcs缓存中移除。

代码清单3        添加RPC请求到缓存

  public void addRpcRequest(long requestId, RpcResponseCallback callback) {
    updateTimeOfLastRequest();
    outstandingRpcs.put(requestId, callback);
  }

代码清单4         从缓存中删除RPC请求

  public void removeRpcRequest(long requestId) {
    outstandingRpcs.remove(requestId);
  }

请求发送成功后,客户端将等待接收服务端的响应。根据《spark2.1.0之源码分析——RPC管道初始化》一文的图1,返回的消息也会传递给TransportChannelHandler的channelRead方法(见《spark2.1.0之源码分析——RPC传输管道处理器详解》一文的代码清单1),根据之前的分析,消息的分析将最后交给TransportResponseHandler的handle方法来处理。TransportResponseHandler的handle方法分别对《spark2.1.0之源码分析——RPC传输管道处理器详解》一文的图2中的六种ResponseMessage进行处理,由于服务端使用processRpcRequest方法(见《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解》一文的代码清单4)处理RpcRequest类型的消息后返回给客户端的消息为RpcResponse或RpcFailure,所以我们来看看客户端的TransportResponseHandler的handle方法是如何处理RpcResponse和RpcFailure,见代码清单5。

代码清单5         RpcResponse和RpcFailure消息的处理

    } else if (message instanceof RpcResponse) {
      RpcResponse resp = (RpcResponse) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);// 获取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.body().size());
      } else {
        outstandingRpcs.remove(resp.requestId);
        try {
          listener.onSuccess(resp.body().nioByteBuffer());
        } finally {
          resp.body().release();
        }
      }
    } else if (message instanceof RpcFailure) {
      RpcFailure resp = (RpcFailure) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); // 获取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingRpcs.remove(resp.requestId);
        listener.onFailure(new RuntimeException(resp.errorString));
      }

从代码清单5看到,处理RpcResponse的逻辑为:

  1. 使用RpcResponse对应的RpcRequest的主键requestId,从outstandingRpcs缓存中获取注册的RpcResponseCallback,此处的RpcResponseCallback即为代码清单2中传递给sendRpc方法的RpcResponseCallback;
  2. 移除outstandingRpcs缓存中requestId和RpcResponseCallback的注册信息;
  3. 调用RpcResponseCallback的onSuccess方法,处理成功响应后的具体逻辑。这里的RpcResponseCallback需要各个使用TransportClient的sendRpc方法的场景中分别实现;
  4. 最后释放RpcResponse的body,回收资源。

处理RpcFailure的逻辑为:

  1. 使用RpcFailure对应的RpcRequest的主键requestId,从outstandingRpcs缓存中获取注册的RpcResponseCallback,此处的RpcResponseCallback即为代码清单2中传递给sendRpc方法的RpcResponseCallback;
  2. 移除outstandingRpcs缓存中requestId和RpcResponseCallback的注册信息;
  3. 调用RpcResponseCallback的onFailure方法,处理失败响应后的具体逻辑。这里的RpcResponseCallback需要在使用TransportClient的sendRpc方法时指定或实现。

发送获取块请求

         fetchChunk的实现见代码清单6。

代码清单6         fetchChunk的实现

  public void fetchChunk(
      long streamId,
      final int chunkIndex,
      final ChunkReceivedCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isDebugEnabled()) {
      logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
    }

    final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);// 创建StreamChunkId
    // 添加StreamChunkId与ChunkReceivedCallback之间的对应关系
    handler.addFetchRequest(streamChunkId, callback);
    // 发送块请求
    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", streamChunkId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeFetchRequest(streamChunkId);
            channel.close();
            try {
              callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });
  }

结合代码清单6,我们知道fetchChunk方法的实现步骤如下:

  1. 使用流的标记streamId和块的索引chunkIndex创建StreamChunkId;
  2. 调用addFetchRequest向handler(特别提醒下读者这里的handler不是RpcHandler,而是通过TransportClient构造器传入的TransportResponseHandler)添加StreamChunkId与回调类ChunkReceivedCallback的引用之间的关系。TransportResponseHandler的addFetchRequest方法(见代码清单7)将更新最后一次请求的时间为当前系统时间,然后将StreamChunkId与ChunkReceivedCallback之间的映射加入到outstandingFetches缓存中。outstandingFetches专门用于缓存发出的块请求信息。
  3. 调用Channel的writeAndFlush方法将块请求发送出去,这和在《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解》一文列出的代码清单7中服务端调用的respond方法响应客户端的一样,都是使用Channel的writeAndFlush方法。当发送成功或者失败时会回调ChannelFutureListener的operationComplete方法。如果发送成功,那么只会打印StreamChunkId、远端地址及花费时间的日志,如果发送失败,除了打印错误日志外,还要调用TransportResponseHandler的removeFetchRequest方法(见代码清单8)将此次请求从outstandingFetches缓存中移除。

代码清单7         添加块请求到缓存

  public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
    updateTimeOfLastRequest();
    outstandingFetches.put(streamChunkId, callback);
  }

代码清单8         从缓存中删除块请求

  public void removeFetchRequest(StreamChunkId streamChunkId) {
    outstandingFetches.remove(streamChunkId);
  }

请求发送成功后,客户端将等待接收服务端的响应。根据《spark2.1.0之源码分析——RPC管道初始化》一文的图1,返回的消息也会传递给TransportChannelHandler的channelRead方法(见《spark2.1.0之源码分析——RPC传输管道处理器详解》一文的代码清单1),根据之前的分析,消息的分析将最后交给TransportResponseHandler的handle方法来处理。TransportResponseHandler的handle方法分别对《spark2.1.0之源码分析——RPC传输管道处理器详解》一文的图2中的六种处理结果进行处理,由于服务端使用processFetchRequest方法(见《spark2.1.0之源码分析——服务端RPC处理器RpcHandler详解》一文的代码清单3)处理ChunkFetchRequest类型的消息后返回给客户端的消息为ChunkFetchSuccess或ChunkFetchFailure,所以我们来看看客户端的TransportResponseHandler的handle方法是如何处理ChunkFetchSuccess和ChunkFetchFailure,见代码清单9

代码清单9         ChunkFetchSuccess和ChunkFetchFailure消息的处理

    if (message instanceof ChunkFetchSuccess) {
      ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel));
        resp.body().release();
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
        resp.body().release();
      }
    } else if (message instanceof ChunkFetchFailure) {
      ChunkFetchFailure resp = (ChunkFetchFailure) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
          "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
      }
    }

从代码清单9看到,处理ChunkFetchSuccess的逻辑为:

  1. 使用ChunkFetchSuccess对应的StreamChunkId,从outstandingFetches缓存中获取注册的ChunkReceivedCallback,此处的ChunkReceivedCallback即为代码清单6中传递给fetchChunk方法的ChunkReceivedCallback;
  2. 移除outstandingFetches缓存中StreamChunkId和ChunkReceivedCallback的注册信息;
  3. 调用ChunkReceivedCallback的onSuccess方法,处理成功响应后的具体逻辑。这里的ChunkReceivedCallback需要各个使用TransportClient的fetchChunk方法的场景中分别实现;
  4. 最后释放ChunkFetchSuccess的body,回收资源。

处理ChunkFetchFailure的逻辑为:

  1. 使用ChunkFetchFailure对应的StreamChunkId,从outstandingFetches缓存中获取注册的ChunkReceivedCallback,此处的ChunkReceivedCallback即为代码清单6中传递给fetchChunk方法的ChunkReceivedCallback;
  2. 移除outstandingFetches缓存中StreamChunkId和ChunkReceivedCallback的注册信息;
  3. 调用ChunkReceivedCallback的onFailure方法,处理失败响应后的具体逻辑。这里的ChunkReceivedCallback需要各个使用TransportClient的fetchChunk方法的场景中分别实现。

在详细介绍了TransportClient和TransportResponseHandler之后,对于客户端我们就可以扩展《spark2.1.0之源码分析——RPC管道初始化》一文的图1,把TransportResponseHandler及TransportClient的处理流程增加进来,如下图所示。

客户端请求、响应流程图客户端请求、响应流程图

上图中的序号①表示调用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)将更新最后一次请求的时间为当前系统时间,然后将requestId与RpcResponseCallback之间的映射加入到outstandingRpcs缓存中(或将StreamChunkId与ChunkReceivedCallback之间的映射加入到outstandingFetches缓存中)。②表示调用Channel的writeAndFlush方法将RPC请求发送出去。图中的虚线表示当TransportResponseHandler处理RpcResponse和RpcFailure时将从outstandingRpcs缓存中获取此请求对应的RpcResponseCallback(或处理ChunkFetchSuccess和ChunkFetchFailure时将从outstandingFetches缓存中获取StreamChunkId对应的ChunkReceivedCallback),并执行回调。此外,TransportClientBootstrap将可能存在于上图中任何两个组件的箭头连线中间。

 

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

 

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
4月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
7月前
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
145 0
|
Go 流计算
gRPC阅读日记(七)客户端的RPC构建2
gRPC阅读日记(七)客户端的RPC构建2
|
Go 流计算
gRPC阅读日记(六)来看看客户端的rpc请求如何实现
gRPC阅读日记(六)来看看客户端的rpc请求如何实现
|
消息中间件 Java RocketMQ
RocketMQ源码分析-Rpc通信模块(remoting)二
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
405 0
RocketMQ源码分析-Rpc通信模块(remoting)二
|
消息中间件 编解码 网络协议
RocketMQ源码分析-Rpc通信模块(remoting)一
上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
470 0
RocketMQ源码分析-Rpc通信模块(remoting)一
|
编解码 JSON 网络协议
透视RPC协议:SOFA-BOLT协议源码分析
最近在看Netty相关的资料,刚好SOFA-BOLT是一个比较成熟的Netty自定义协议栈实现,于是决定研读SOFA-BOLT的源码,详细分析其协议的组成,简单分析其客户端和服务端的源码实现。当前阅读的源码是2021-08左右的SOFA-BOLT仓库的master分支源码。
290 0
透视RPC协议:SOFA-BOLT协议源码分析
|
分布式计算 Java Hadoop
Spark对接Lindorm Phoenix5.x轻客户端
Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。
1806 0
|
分布式计算 Java Spark
Spark2.1.0之内置RPC框架
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80799622         在Spark中很多地方都涉及网络通信,比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。
1974 0
|
3月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
159 0