Alluxio源码分析:RPC框架浅析(三)

简介:         Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS、HBase等一样,也是由主从节点构成的。而节点之间的通信,一般都是采用的RPC通讯模型。Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答。

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS、HBase等一样,也是由主从节点构成的。而节点之间的通信,一般都是采用的RPC通讯模型。Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答。

        继《Alluxio源码分析:RPC框架浅析(二)》一文后,本文继续讲解Alluxio中RPC实现。

        4、Client端实现

       以FileSystemMasterClientService为例,Client端实现在FileSystemMasterClient类中,其内部有一个FileSystemMasterClientService.Client类型的成员变量FileSystemMasterClientService.Client,如下:

  private FileSystemMasterClientService.Client mClient = null;
        它的初始化是在afterConnect()方法中完成的,如下:

  @Override
  protected void afterConnect() throws IOException {
    mClient = new FileSystemMasterClientService.Client(mProtocol);
  }
        而使用呢,以读文件时需要的getStatus()方法为例,如下:

  /**
   * @param path the file path
   * @return the file info for the given file id
   * @throws IOException if an I/O error occurs
   * @throws AlluxioException if an Alluxio error occurs
   */
  public synchronized URIStatus getStatus(final AlluxioURI path) throws IOException,
      AlluxioException {
	  
	// RPC调用,执行call()方法
    return retryRPC(new RpcCallableThrowsAlluxioTException<URIStatus>() {
      @Override
      public URIStatus call() throws AlluxioTException, TException {
    	  
    	// 构造一个URIStatus实例,并返回
    	// 构造URIStatus实例时,需要使用FileInfo对象,其通过FileSystemMasterClientService.Client的getStatus()方法获得
        return new URIStatus(ThriftUtils.fromThrift(mClient.getStatus(path.getPath())));
      }
    });
  }
        执行retryRPC()方法,发起一个RPC调用,然后执行call()方法,构造一个URIStatus实例,并返回,构造URIStatus实例时,需要使用FileInfo对象,其通过FileSystemMasterClientService.Client的getStatus()方法获得,也就是我们需要远程调用服务端的FileSystemMasterClientService服务的getStatus()方法。

        我们看下retryRPC(RpcCallableThrowsAlluxioTException)方法,它的定义在其祖先类AbstractClient中,如下:

  /**
   * Similar to {@link #retryRPC(RpcCallable)} except that the RPC call may throw
   * {@link AlluxioTException} and once it is thrown, it will be transformed into
   * {@link AlluxioException} and be thrown.
   *
   * @param rpc the RPC call to be executed
   * @param <V> type of return value of the RPC call
   * @return the return value of the RPC call
   * @throws AlluxioException when {@link AlluxioTException} is thrown by the RPC call
   * @throws IOException when retries exceeds {@link #RPC_MAX_NUM_RETRY} or {@link #close()} has
   *         been called before calling this method or during the retry
   */
  protected synchronized <V> V retryRPC(RpcCallableThrowsAlluxioTException<V> rpc)
      throws AlluxioException, IOException {
    int retry = 0;
    
    // 如果客户端未关闭,即标志位mClosed为false,且重试次数retry小于30次,执行while循环
    while (!mClosed && (retry++) <= RPC_MAX_NUM_RETRY) {
      
      // 调用connect()进行连接
      connect();
      
      try {
    	// 调用rpc的call()方法,这里也就是外部重定义的RpcCallableThrowsAlluxioTException的call()方法
        return rpc.call();
      } catch (AlluxioTException e) {
        throw AlluxioException.from(e);
      } catch (ThriftIOException e) {
        throw new IOException(e);
      } catch (TException e) {
        LOG.error(e.getMessage(), e);
        mConnected = false;
      }
    }
    throw new IOException("Failed after " + retry + " retries.");
  }
        看下connect()方法,如下:

  /**
   * Connects with the remote.
   *
   * @throws IOException if an I/O error occurs
   * @throws ConnectionFailedException if network connection failed
   */
  public synchronized void connect() throws IOException, ConnectionFailedException {
    
	// 标志位 mConnected如果为true,标识连接已经建立,直接返回 
	if (mConnected) {
      return;
    }
	
	// 调用disconnect()方法,处理标志位mConnected、传输协议mProtocol等
    disconnect();
    
    // 检测客户端状态mClosed
    Preconditions.checkState(!mClosed, "Client is closed, will not try to connect.");

    // 连接最大重试次数maxConnectsTry取参数"alluxio.master.retry"
    int maxConnectsTry = mConfiguration.getInt(Constants.MASTER_RETRY_COUNT);
    final int BASE_SLEEP_MS = 50;
    RetryPolicy retry =
        new ExponentialBackoffRetry(BASE_SLEEP_MS, Constants.SECOND_MS, maxConnectsTry);
    
    while (!mClosed) {// 客户端未关闭的话,一直进行while循环,直到连接成功
    	
      // 获取Master地址mAddress
      mAddress = getAddress();
      
      LOG.info("Alluxio client (version {}) is trying to connect with {} {} @ {}", Version.VERSION,
              getServiceName(), mMode, mAddress);

      // 创建传输协议TBinaryProtocol实例binaryProtocol,这是一个二进制协议
      TProtocol binaryProtocol =
          new TBinaryProtocol(mTransportProvider.getClientTransport(mAddress));
      
      // 创建多路复用协议TMultiplexedProtocol实例mProtocol
      mProtocol = new TMultiplexedProtocol(binaryProtocol, getServiceName());
      try {
    	  
    	// 多路复用协议mProtocol的open()方法打开连接
        mProtocol.getTransport().open();
        LOG.info("Client registered with {} {} @ {}", getServiceName(), mMode, mAddress);
        
        // 标志位 mConnected设置为true,标识连接已经建立 
        mConnected = true;
        
        // 调用afterConnect()方法,创建client实例
        afterConnect();
        
        // 检测版本号
        checkVersion(getClient(), getServiceVersion());
        
        return;
      } catch (TTransportException e) {
        LOG.error("Failed to connect (" + retry.getRetryCount() + ") to " + getServiceName() + " "
            + mMode + " @ " + mAddress + " : " + e.getMessage());
        if (!retry.attemptRetry()) {
          break;
        }
      }
    }
    // Reaching here indicates that we did not successfully connect.
    throw new ConnectionFailedException("Failed to connect to " + getServiceName() + " " + mMode
        + " @ " + mAddress + " after " + (retry.getRetryCount()) + " attempts");
  }
        比较简单,前面涉及一些状态位mConnected、mClosed等的判断,避免重复连接或者错误连接等,而最重要的几步如下:

        1、获取Master地址mAddress;

        2、创建传输协议TBinaryProtocol实例binaryProtocol,这是一个二进制协议;

        3、创建多路复用协议TMultiplexedProtocol实例mProtocol;

        4、多路复用协议mProtocol的open()方法打开连接,启动rpc连接;

        5、标志位 mConnected设置为true,标识连接已经建立 ;

        6、调用afterConnect()方法,创建client实例。

        RPC服务一但启动,通讯链接一旦建立,那么剩下的就是类似调用本地方法一样进行远程过程调用了,我们最后再看下读文件时需要使用的getStatus()方法吧,如下:

  /**
   * @param path the file path
   * @return the file info for the given file id
   * @throws IOException if an I/O error occurs
   * @throws AlluxioException if an Alluxio error occurs
   */
  public synchronized URIStatus getStatus(final AlluxioURI path) throws IOException,
      AlluxioException {
	  
	// RPC调用,执行call()方法
    return retryRPC(new RpcCallableThrowsAlluxioTException<URIStatus>() {
      @Override
      public URIStatus call() throws AlluxioTException, TException {
    	  
    	// 构造一个URIStatus实例,并返回
    	// 构造URIStatus实例时,需要使用FileInfo对象,其通过FileSystemMasterClientService.Client的getStatus()方法获得
        return new URIStatus(ThriftUtils.fromThrift(mClient.getStatus(path.getPath())));
      }
    });
  }
        调用FileSystemMasterClientService.Client的getStatus()方法,然后利用ThriftUtils的数据转换方法fromThrift()进行对象转换,就能得到我们需要的对象数据!是不是so easy,屏蔽了很多底层细节,让我们感觉像调用本地方法一样?

        就是这么简单!



相关文章
|
24天前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
3月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC
|
2月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
3月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
|
5月前
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
412 6
|
4月前
|
分布式计算 负载均衡 数据安全/隐私保护
什么是RPC?有哪些RPC框架?
RPC(Remote Procedure Call,远程过程调用)是一种允许运行在一台计算机上的程序调用另一台计算机上子程序的技术。这种技术屏蔽了底层的网络通信细节,使得程序间的远程通信如同本地调用一样简单。RPC机制使得开发者能够构建分布式计算系统,其中不同的组件可以分布在不同的计算机上,但它们之间可以像在同一台机器上一样相互调用。
160 8
|
4月前
|
网络协议 Dubbo Java
什么是RPC?RPC和HTTP对比?RPC有什么缺点?市面上常用的RPC框架?
选择合适的RPC框架和通信协议,对于构建高效、稳定的分布式系统至关重要。开发者需要根据自己的业务需求和系统架构,综合考虑各种因素,做出适宜的技术选型。
461 1
|
4月前
|
负载均衡 Java
使用Java实现RPC框架
使用Java实现RPC框架
|
4月前
|
负载均衡 Java
|
5月前
|
分布式计算 资源调度 网络协议
分布式系统详解--框架(Hadoop--RPC协议)
分布式系统详解--框架(Hadoop--RPC协议)
47 0
下一篇
无影云桌面