Alluxio源码分析:RPC框架浅析(二)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:         Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS、HBase等一样,也是由主从节点构成的。而节点之间的通信,一般都是采用的RPC通讯模型。Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答。

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS、HBase等一样,也是由主从节点构成的。而节点之间的通信,一般都是采用的RPC通讯模型。Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答。

        继《Alluxio源码分析:RPC框架浅析(一)》一文后,本文继续讲解Alluxio中RPC实现。

        3、Server端实现:RPC Server端口绑定、传输协议等参数设置、Server启动

        AlluxioMaster是Alluxio中Master的实现,那么RPC服务端server自然就会落在它身上了。我们先看AlluxioMaster进程的启动main()方法,如下:

  /**
   * Starts the Alluxio master server via {@code java -cp <ALLUXIO-VERSION> alluxio.Master}.
   *
   * @param args there are no arguments used
   */
  public static void main(String[] args) {
    
	// 启动master时参数应为空
	if (args.length != 0) {
      LOG.info("java -cp {} alluxio.Master", Version.ALLUXIO_JAR);
      System.exit(-1);
    }

    // validate the conf
	// 验证配置信息
    if (!ValidateConf.validate()) {
      LOG.error("Invalid configuration found");
      System.exit(-1);
    }

    try {
    	
      // 调用get()方法,返回AlluxioMaster实例master
      AlluxioMaster master = get();
      
      // 调用实例master的start()方法,启动AlluxioMaster实例master
      master.start();
    } catch (Exception e) {
      LOG.error("Uncaught exception terminating Master", e);
      System.exit(-1);
    }
  }
        它主要干了两件事,一个就是调用get()方法,返回AlluxioMaster实例master,另一个就是调用实例master的start()方法,启动AlluxioMaster实例master。我们先看下get()方法,如下:

  /**
   * Returns a handle to the Alluxio master instance.
   *
   * @return Alluxio master handle
   */
  public static synchronized AlluxioMaster get() {
    
	// 静态AlluxioMaster类型成员变量sAlluxioMaster为空时,通过Factory.create()构造一个,否则返回sAlluxioMaster
	if (sAlluxioMaster == null) {
      sAlluxioMaster = Factory.create();
    }
    return sAlluxioMaster;
  }
        而Factory的create()方法,则会根据参数alluxio.zookeeper.enabled确定返回FaultTolerantAlluxioMaster实例还是AlluxioMaster实例,FaultTolerantAlluxioMaster继承自AlluxioMaster,默认是返回AlluxioMaster实例,代码如下:

    /**
     * @return {@link FaultTolerantAlluxioMaster} if Alluxio configuration is set to use zookeeper,
     *         otherwise, return {@link AlluxioMaster}.
     */
    public static AlluxioMaster create() {
    	
      // 根据参数alluxio.zookeeper.enabled确定返回FaultTolerantAlluxioMaster实例还是AlluxioMaster实例,
      // FaultTolerantAlluxioMaster继承自AlluxioMaster,默认是返回AlluxioMaster实例
      if (MasterContext.getConf().getBoolean(Constants.ZOOKEEPER_ENABLED)) {
        return new FaultTolerantAlluxioMaster();
      }
      return new AlluxioMaster();
    }
        在AlluxioMasterd的构造方法中,涉及RPC相关的,主要是Worker最大和最小线程数、服务端Socker的TServerSocket实例mTServerSocket等的构造,关键代码如下:

    Configuration conf = MasterContext.getConf();

    // Worker最大和最小线程数:分别取参数alluxio.master.worker.threads.max和alluxio.master.worker.threads.min
    mMinWorkerThreads = conf.getInt(Constants.MASTER_WORKER_THREADS_MIN);
    mMaxWorkerThreads = conf.getInt(Constants.MASTER_WORKER_THREADS_MAX);

      // 获取传输提供者mTransportProvider
      mTransportProvider = TransportProvider.Factory.create(conf);
      
      // 构造TServerSocket实例mTServerSocket
      // port取参数alluxio.master.port=19998
      mTServerSocket =
          new TServerSocket(NetworkAddressUtils.getBindAddress(ServiceType.MASTER_RPC, conf));

        再看实例master的start()方法,也就是AlluxioMaster的start()方法,代码如下:

  /**
   * Starts the Alluxio master server.
   *
   * @throws Exception if starting the master fails
   */
  public void start() throws Exception {
    startMasters(true);
    
    // 启动服务
    startServing();
  }
        继续看startServing()方法,如下:

  protected void startServing(String startMessage, String stopMessage) {
    mMasterMetricsSystem.start();
    
    // 启动web服务
    startServingWebServer();
    LOG.info("Alluxio Master version {} started @ {} {}", Version.VERSION, mMasterAddress,
        startMessage);
    
    // 启动RPC服务
    startServingRPCServer();
    LOG.info("Alluxio Master version {} ended @ {} {}", Version.VERSION, mMasterAddress,
        stopMessage);
  }
        撇开启动web服务不说,我们看下启动RPC服务的startServingRPCServer()方法,如下:

  protected void startServingRPCServer() {
    // set up multiplexed thrift processors
	// 构造多路复用处理器TMultiplexedProcessor实例processor
    TMultiplexedProcessor processor = new TMultiplexedProcessor();
    
    // 注册BlockMaster服务
    registerServices(processor, mBlockMaster.getServices());
    
    // 注册FileSystemMaster服务
    registerServices(processor, mFileSystemMaster.getServices());
    
    // 必要的话,注册LineageMaster服务
    if (LineageUtils.isLineageEnabled(MasterContext.getConf())) {
      registerServices(processor, mLineageMaster.getServices());
    }
    
    // register additional masters for RPC service
    // 注册额外的Masters服务
    for (Master master : mAdditionalMasters) {
      registerServices(processor, master.getServices());
    }

    // Return a TTransportFactory based on the authentication type
    TTransportFactory transportFactory;
    try {
    	
      // 获得传输工厂实例
      transportFactory = mTransportProvider.getServerTransportFactory();
    } catch (IOException e) {
      throw Throwables.propagate(e);
    }

    // create master thrift service with the multiplexed processor.
    
    // 构造TThreadPoolServer实例时需要的参数:
    // 服务端Socker:TServerSocket类型实例mTServerSocket
    // 最大Worker线程数mMaxWorkerThreads
    // 最小Worker线程数mMinWorkerThreads
    // 处理器processor
    // 传输工厂transportFactory
    // 协议工厂TBinaryProtocol:二进制协议TBinaryProtocol
    Args args = new TThreadPoolServer.Args(mTServerSocket).maxWorkerThreads(mMaxWorkerThreads)
        .minWorkerThreads(mMinWorkerThreads).processor(processor).transportFactory(transportFactory)
        .protocolFactory(new TBinaryProtocol.Factory(true, true));
    if (MasterContext.getConf().getBoolean(Constants.IN_TEST_MODE)) {
      args.stopTimeoutVal = 0;
    } else {
      args.stopTimeoutVal = Constants.THRIFT_STOP_TIMEOUT_SECONDS;
    }
    
    // 构造TThreadPoolServer实例mMasterServiceServer
    mMasterServiceServer = new TThreadPoolServer(args);

    // start thrift rpc server
    // 标志位正在提供服务mIsServing设置为true
    mIsServing = true;
    // 启动时间mStartTimeMs取当前时间
    mStartTimeMs = System.currentTimeMillis();
    
    // 启动TThreadPoolServer服务
    mMasterServiceServer.serve();
  }
         startServingRPCServer()方法主要处理流程如下:

        1、构造多路复用处理器TMultiplexedProcessor实例processor;

        2、调用registerServices()方法,注册BlockMaster、FileSystemMaster、LineageMaster、额外的Masters等各种服务;

        3、调用TransportProvider的getServerTransportFactory()方法获得传输工厂实例transportFactory;

        4、构造TThreadPoolServer实例时需要的参数:包括:

             (1)服务端Socker:TServerSocket类型实例mTServerSocket;

             (2)最大Worker线程数mMaxWorkerThreads;

             (3)最小Worker线程数mMinWorkerThreads;

             (4)多路复用处理器processor;

             (5)传输工厂transportFactory;

             (6)协议工厂TBinaryProtocol:二进制协议TBinaryProtocol;

        5、构造TThreadPoolServer实例mMasterServiceServer;

        6、标志位正在提供服务mIsServing设置为true;

        7、启动时间mStartTimeMs取当前时间;

        8、调用mMasterServiceServer的serve()方法启动TThreadPoolServer服务。
        先以FileSystemMaster服务为例,看下RPC服务是如何注册的,代码如下:

  private void registerServices(TMultiplexedProcessor processor, Map<String, TProcessor> services) {
    for (Map.Entry<String, TProcessor> service : services.entrySet()) {
      processor.registerProcessor(service.getKey(), service.getValue());
    }
  }

        注册很简单,多路复用处理器processor的registerProcessor()方法即可完成注册,关键是要看注册的是什么东西它的服务是通过FileSystemMaster的getServices()方法获取的,我们跟踪下:

  @Override
  public Map<String, TProcessor> getServices() {
    Map<String, TProcessor> services = new HashMap<String, TProcessor>();
    
    // FileSystemMasterClientService服务
    services.put(
    		
        // key为"FileSystemMasterClient"
        Constants.FILE_SYSTEM_MASTER_CLIENT_SERVICE_NAME,
        
        // 可以看出,FileSystemMasterClientService服务Master端实现者是FileSystemMasterClientServiceHandler
        new FileSystemMasterClientService.Processor<FileSystemMasterClientServiceHandler>(
            new FileSystemMasterClientServiceHandler(this)));
    
    // FileSystemMasterWorkerService服务  
    services.put(
    		
        // key为"FileSystemMasterWorker"
        Constants.FILE_SYSTEM_MASTER_WORKER_SERVICE_NAME,
        
        // 可以看出,FileSystemMasterWorkerService服务Master端实现者是FileSystemMasterWorkerServiceHandler
        new FileSystemMasterWorkerService.Processor<FileSystemMasterWorkerServiceHandler>(
            new FileSystemMasterWorkerServiceHandler(this)));
    return services;
  }
        就俩服务:FileSystemMasterClientService和FileSystemMasterWorkerService,它们在Master端的实现者分别是FileSystemMasterClientServiceHandler和FileSystemMasterWorkerServiceHandler,并且是通过各自Service的Processor来构造的,看到这里,你似乎应该明白什么了吧!这就是Processor的用途。
        再看下获得传输工厂实例transportFactory,它是通过TransportProvider实例mTransportProvider的getServerTransportFactory()方法来获取的,而mTransportProvider的初始化也是在AlluxioMaster构造方法中,通过TransportProvider.Factory.create(conf)来获取的,我们看下代码:

    public static TransportProvider create(Configuration conf) {
      AuthType authType = conf.getEnum(Constants.SECURITY_AUTHENTICATION_TYPE, AuthType.class);
      switch (authType) {
        case NOSASL:
          return new NoSaslTransportProvider(conf);
        case SIMPLE: // intended to fall through
        case CUSTOM:
          return new PlainSaslTransportProvider(conf);
        case KERBEROS:
          throw new UnsupportedOperationException(
              "getClientTransport: Kerberos is not supported currently.");
        default:
          throw new UnsupportedOperationException(
              "getClientTransport: Unsupported authentication type: " + authType.getAuthName());
      }
    }
        它目前仅支持NOSASL和CUSTOM两种类型,分别对应NoSaslTransportProvider和PlainSaslTransportProvider两个类。我们以CUSTOM类型的PlainSaslTransportProvider为例来看下getServerTransportFactory()方法,代码如下:

  @Override
  public TTransportFactory getServerTransportFactory() throws SaslException {
    AuthType authType =
        mConfiguration.getEnum(Constants.SECURITY_AUTHENTICATION_TYPE, AuthType.class);
    TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
    AuthenticationProvider provider =
        AuthenticationProvider.Factory.create(authType, mConfiguration);
    saslFactory
        .addServerDefinition(PlainSaslServerProvider.MECHANISM, null, null,
            new HashMap<String, String>(), new PlainSaslServerCallbackHandler(provider));
    return saslFactory;
  }
        剩余的TThreadPoolServer实例构造、参数选择等上面解释的已经很清晰,读者可自行分析。

        未完待续,请关注《Alluxio源码分析:RPC框架浅析(三)》









相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
8月前
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
169 0
|
11月前
|
监控 负载均衡 网络协议
分布式RPC框架Dubbo详解
分布式RPC框架Dubbo详解
107 0
|
缓存 运维 监控
|
分布式计算 网络协议 算法
最简单的RPC框架实现
最简单的RPC框架实现
182 0
最简单的RPC框架实现
|
Dubbo NoSQL Java
Dubbo 高性能 RPC 框架实践
Dubbo 高性能 RPC 框架实践
Dubbo 高性能 RPC 框架实践
|
Dubbo 网络协议 JavaScript
RPC框架:从原理到选型,一文带你搞懂RPC(一)
RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
975 0
RPC框架:从原理到选型,一文带你搞懂RPC(一)
|
XML JSON 运维
RPC框架:从原理到选型,一文带你搞懂RPC(二)
RPC系列的文章是我去年写的,当时写的比较散,现在重新进行整理。对于想学习RPC框架的同学,通过这篇文章,让你知其然并知其所以然,便于以后技术选型
410 0
RPC框架:从原理到选型,一文带你搞懂RPC(二)
|
大数据 调度 流计算
你有必要了解一下Flink底层RPC使用的框架和原理
5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗? 欢迎您关注《大数据成神之路》 前言 对于Flink中各个组件(JobMaster、TaskManager、Dispatcher等),其底层RPC框架基于Akka实现,本文着重分析Flink中的Rpc框架实现机制及梳理其通信流程。
2120 0
|
JavaScript Dubbo Java
高并发架构系列:如何从0到1设计一个类Dubbo的RPC框架
在过去持续分享的几十期阿里Java面试题中,几乎每次都会问到Dubbo相关问题,比如:“如何从0到1设计一个Dubbo的RPC框架”,这个问题主要考察以下几个方面: 你对RPC框架的底层原理掌握程度。 考验你的整体RPC框架系统设计能力。本文详解~
9696 0
|
网络协议 前端开发 Java
谈谈如何使用Netty开发实现高性能的RPC服务器
RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的前提之下,调用远程计算机上运行的某个对象,使用起来就像调用本地的对象一样。
1262 0