Hadoop源码学习:RPC-阿里云开发者社区

开发者社区> 大数据> 正文

Hadoop源码学习:RPC

简介: <br><div style="orphans:2; widows:2; font-size:13px; margin:0px; font-family:Monaco,Menlo,'Ubuntu Mono',Consolas,source-code-pro,SimSun,Song,宋体,幼圆,Heiti,黑体,文泉驿等宽正黑,文泉驿正黑,monospace"> <h1 style="fo

Hadoop源码学习:RPC

Hadoop RPC使用java NIO编写,达到高性能,轻量级,可控性。 主要分为四层:序列化层,函数调用层,网络传输层,服务器端处理框架

  • 序列化层:实现Writable接口
  • 函数调用层:java反射机制和动态代理实现函数调用
  • 网络传输层:使用Socket机制
  • 服务器端处理框架:基于Reactor设计模式的事件驱动I/O模型

如何使用Hadoop RPC:

RPC Server:

  • 1.定义一个协议,实现VersionedProtocol接口,
      public interface TestProtocol extends VersionedProtocol {
      public static final long versionID = 1L;
      void ping() throws IOException;
    }
  • 2.实现RPC协议

     public static class TestImpl implements TestProtocol {
      int fastPingCounter = 0;
    
      public long getProtocolVersion(String protocol, long clientVersion) {
        return TestProtocol.versionID;
      }
      public int add(int v1, int v2) {
        return v1 + v2;
      }
  • 3.服务端开启服务
      Server server = RPC.getServer(new TestImpl(), 0.0.0.0, 0, conf);
      server.start();

RPC Client

  • 1.根据接口,远程调用
      TestProtocol proxy = null;
      proxy = (TestProtocol)RPC.getProxy(
      TestProtocol.class, TestProtocol.versionID, addr, conf);
      int intResult = proxy.add(1, 2);

ipc.RPC类分析

我们来逐行分析代码

    //客户端调用服务
    proxy = (TestProtocol)RPC.getProxy(
    TestProtocol.class, TestProtocol.versionID, addr, conf);

getProxy方法,通过动态代理,将TestProtocol的调用交由RPC.Invoker管理,所有对TestProtocol的方法调用,都将交由RPC.Invoker的invoke()方法处理。

int intResult = proxy.add(1, 2);//RPC.Invoker的invoke()方法处理

RPC.Invoker将函数调用信息(函数名、函数参数列表等)打包成可序列化的RPC.Invocation对象,由ipc.Client发起链接

//从连接池中取出socket链接
client.call(new Invocation(method, args), remoteId);

ipc.Client类分析

  • 用ConnectionId标识一个链接,同一个InetSocketAddress将共享同一个链接
  • 链接被封住成Connection,每个方法调用都被封装成一个Call,Connection维护一个Call的HashMap

ipc.Server类分析

Hadoop采用Master/Slave结构,ipc.Server采用了线程池、事件驱动和Reactor设计模式来提高并发性。
ipc.Server是一个虚类
1.单线程Listener监听客户端请求,从Reader线程池中取出Reader线程处理请求,Reader读取客户端的调用请求并放入CallQueue队列
2.Handler从CallQueue队列中取出Call,执行函数调用,返回结果
3.如果函数调用返回的结果过大或者网络异常,,由单线程Respondeer使用异步方式继续发送未发送完成的结果  详细分析代码:

  • ipc.Server.Listener类

       //服务端开启监听
        ServerSocketChannel acceptChannel = ServerSocketChannel. open();
       //设置为非阻塞模式
        acceptChannel.configureBlocking( false );
    
        // Bind the server socket to the local host and port
        bind(acceptChannel .socket(), address, backlogLength);
        port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
        // create a selector,用于监听事件;
        selector= Selector. open();
       //客户端连接服务器将会发生OP_ACCEPT事件
        acceptChannel .register( selector, SelectionKey. OP_ACCEPT );
        while ( running) {
          SelectionKey key = null ;
          try {
            //阻塞监听事件
            selector.select();
            //发生事件了,获取SelectionKey,SelectionKey.channel()可以获取到发生事件的Channel
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              try {
                if (key.isValid()) {
                  if (key.isAcceptable())
                    //建立链接,执行后续的读取客户端的请求内容
                    doAccept(key);
                }
              } catch (IOException e) {
              }
              key = null ;
            }
          } catch (OutOfMemoryError e) {
          }
       //建立链接,执行后续的读取客户端的请求内容
       void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
        Connection c = null ;
        //发生事件的Channel
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel channel;
        //与客户端建立链接
        while ((channel = server.accept()) != null ) {
          channel.configureBlocking( false );
          channel.socket().setTcpNoDelay( tcpNoDelay );
          //从读者线程池中获取reader,做后续操作
          Reader reader = getReader();
          try {
            reader.startAdd();
            //将客户端链接Channel注册到读者线程的监控器readSelector,一旦可读,激活读者线程开始读取,向Select注册事件返回一个Key
            SelectionKey readKey = reader.registerChannel(channel);
            c = new Connection(readKey, channel, System.currentTimeMillis ());
            //将链接类Connection的引用存到readKey,reader线程可以通过readSelector .selectedKeys()再次获取Key
            //并通过readKey.attachment()获取寄存的引用
            //Connection负责通信,reader也是通过调用Connection.readAndProcess()读取客户端的远程调用请求,并将请求
            //封装成Call,放入callQueue。Handler线程会从callQueue中获取Call,执行远程调用
            readKey.attach(c);
            synchronized (connectionList ) {
              connectionList .add(numConnections , c);
              numConnections ++;
            }finally {
            reader.finishAdd();
          }
    
        }
      }
  • ipc.Server.Listener.Reader类

        public void run() {
          LOG.info("Starting " + getName());
          synchronized (this) {
            while (running) {
              SelectionKey key = null;
              try {
                //阻塞监听客户端的可读事件
                readSelector.select();
                while (adding) {
                  this.wait(1000);
                }              
    
                Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                while (iter.hasNext()) {
                  key = iter.next();
                  iter.remove();
                  if (key.isValid()) {
                    if (key.isReadable()) {
                      //执行读操作
                      doRead(key);
                    }
                  }
                  key = null;
                }
              } 
              ...
            }
          }
        }
      void doRead(SelectionKey key) throws InterruptedException {
        int count = 0;
        //获取寄存的链接的引用
        Connection c = (Connection)key.attachment();
        if (c == null) {
          return;  
        }
        c.setLastContact(System.currentTimeMillis());
    
        try {
          //读取客户端的远程调用请求,封住成Call,put到callQueue队列中
          count = c.readAndProcess();
        } 
        ...
      }
  • ipc.Server.Handler类,调用ipc.RPC.Server.Call()来执行远程调用,将结果返回给客户端、

    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
      throws IOException {
        try {
          Invocation call = (Invocation)param;
          if (verbose) log("Call: " + call);
    
          Method method =
            protocol.getMethod(call.getMethodName(),
                                     call.getParameterClasses());
          method.setAccessible(true);
    
          long startTime = System.currentTimeMillis();
          //反射调用,instance为实例化的Protocol,call.getParameters()为客户端发送过来的要调用的函数以及参数
          Object value = method.invoke(instance, call.getParameters());
    ...
    }
本博客已迁移至:http://edwardsbean.github.io

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

其他文章