深入理解Dubbo-5.服务注册源码分析(下)

简介: 深入理解Dubbo-5.服务注册源码分析

深入理解Dubbo-5.服务注册源码分析(中):https://developer.aliyun.com/article/1414061


getTransporter


getTransporter是一个自适应扩展点,它针对bind方法添加了自适应注解,意味着,bing方法的具体实现,会基于Transporter$Adaptive方法进行适配,那么在这里面默认的通信协议是netty,所以它会采用netty4的实现,也就是 org.apache.dubbo.remoting.transport.netty4.NettyTransporter。

public static Transporter getTransporter() { return
ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}

NettyTransporter.bind


创建一个nettyserver

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
  return new NettyServer(url, listener);
}

NettyServer


初始化一个nettyserver,并且从url中获得相应的ip/ port。然后调用 doOpen();

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
    // 获取 ip 和端口
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen(); // 调用模板方法 doOpen 启动服务器
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        executor = executorRepository.createExecutorIfAbsent(url);
    }

doOpen


开启netty服务


总体来说,这段代码的主要功能是使用 Netty 框架创建一个服务器,并进行一系列的初始化设置,包括线程池的创建、网络通道的初始化、服务器选项的设置以及管道工厂的创建等。最终将服务器绑定到指定地址上,准备接受客户端的连接请求。

// doOpen() 方法是一个受保护的方法,用于在子类中被调用以进行服务器的打开操作。
protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory(); // 这行代码用于设置 Netty 框架的日志工厂,以便配置日志记录器。
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); // 创建一个用于处理接受连接的线程池。
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); // 创建一个用于处理网络 IO 事件的线程池。
    // 使用 NIO 方式创建 ServerSocketChannel 工厂,其中包含了 boss 线程池和 worker 线程池。
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    // 使用上一步创建的 ChannelFactory 初始化 ServerBootstrap 对象。
        bootstrap = new ServerBootstrap(channelFactory);
    // NettyHandler 负责处理 Netty 的网络事件,这里创建了一个 NettyHandler 对象,并获取其通道列表。
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
    /*
    设置服务器选项:
设置 child.tcpNoDelay 为 true,表示禁用 Nagle 算法,即数据立即发送。
设置 backlog,指定了未完成连接队列的最大长度。
    */
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
    // 使用 ServerBootstrap 绑定到指定的地址,并返回一个 ChannelFuture 对象。
        channel = bootstrap.bind(getBindAddress());
    }

然后需要要注意的是,它这里用到了一个handler来处理客户端传递过来的请求:

nettyServerHandler

NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

这个handler是一个链路,它的正确组成应该是


MultiMessageHandler(heartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHeadler(DubboProtocol))))


后续接收到的请求,会一层一层的处理。比较繁琐


服务注册流程


RegistryProtocol

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);
        ......
      // 到这里才是真正意义上启动一个Netty Server,发布Dubbo协议的服务
      // dubbo还是基于url驱动,所有每次执行都会去改变url
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
        ......
        if (register) {
            // 然后到这里才注册了服务
            register(registryUrl, registeredProviderUrl);
        }
        ......
    }

了解了服务的发布之后,我们继续来看一下服务是如何发起注册的。


服务注册实际上就是把dubbo的协议url地址保存到第三方注册中心上。

private void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

getRegistry


  1. 把url转化为对应配置的注册中心的具体协议
  2. 根据具体协议,从registryFactory中获得指定的注册中心实现


那么这个registryFactory具体是怎么赋值的呢?

private Registry getRegistry(final Invoker<?> originInvoker) {
  //把url转化为配置的具体协议,比如zookeeper://ip:port. 这样后续获得的注册中心就会是基于zk的实现
  URL registryUrl = getRegistryUrl(originInvoker);
  return registryFactory.getRegistry(registryUrl);
}

在RegistryProtocol中存在一段这样的代码,很明显这是通过依赖注入来实现的扩展点。

private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {
  this.registryFactory = registryFactory;
}

按照扩展点的加载规则,我们可以先看看/META-INF/dubbo/internal路径下找到RegistryFactory的配置文件.这个factory有多个扩展点的实现。

dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory

接着,找到RegistryFactory的实现, 发现它里面有一个自适应的方法,根据url中protocol传入的值进行适配

@SPI("dubbo")
public interface RegistryFactory {
  @Adaptive({"protocol"})
  Registry getRegistry(URL url);

RegistryFactory$Adaptive


由于在前面的代码中,url中的protocol已经改成了zookeeper,那么这个时候根据zookeeper获得的spi扩展点应该是RegistryFactoryWrapper

import org.apache.dubbo.common.extension.ExtensionLoader;
public class RegistryFactory$Adaptive implements
org.apache.dubbo.registry.RegistryFactory {
  public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {
    if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;
    String extName = ( url.getProtocol() == null ? "dubbo" :
url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.registry.RegistryFactory extension =
(org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);
    return extension.getRegistry(arg0);
  }
}

RegistryFactoryWrapper


而registryFactory.getRegistry(url)中,由于此时的registryFactory已经是ZookeeperRegistryFactory,所以这里会得到一个zookeeperRegistry。

public class RegistryFactoryWrapper implements RegistryFactory {
  private RegistryFactory registryFactory;
  public RegistryFactoryWrapper(RegistryFactory registryFactory) {
    this.registryFactory = registryFactory;
  }
  @Override
  public Registry getRegistry(URL url) {
    return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
    Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class).getActivateExtension(url, "registry.listeners")));
  }
}

因此最终返回的Registry=ListenerRegistryWrapper。


下面这段代码的含义是:


  • 获得注册的服务提供者地址
  • 调用register发起注册
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
  if (register) {
    register(registryUrl, registeredProviderUrl);
  }

RegistryProtocol.register


发起注册流程,registry对象的实例是=ListenerRegistryWrapper。所以调用这个对象的register方法。

private void register(URL registryUrl, URL registeredProviderUrl) {
  Registry registry = registryFactory.getRegistry(registryUrl);
  registry.register(registeredProviderUrl);
}

ListenerRegistryWrapper.register


这里做包装的目的,其实应该就是增加了一个监听器的处理过程。

@Override
    public void register(URL url) {
        try {
            registry.register(url);
        } finally {
            if (CollectionUtils.isNotEmpty(listeners)) {
                RuntimeException exception = null;
                for (RegistryServiceListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.onRegister(url);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }

ZookeeperRegistry


这个方法中并没有register方法,而ZookeeperRegsitry继承了FailbackRegistry,所以直接进入到FailbackRegistry这个类


  • FailbackRegistry,从名字上来看,是一个失败重试机制
  • 调用父类的register方法,讲当前url添加到缓存集合中
public void register(URL url) {
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
            // Record a failed registration request to a failed list, retry regularly
            addFailedRegistered(url);
        }
    }

ZookeeperRegistry.doRegister


通过curator客户端,把服务地址写入到注册中心。

@Override
public void doRegister(URL url) {
  try {
    zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
  } catch (Throwable e) {
    throw new RpcException("Failed to register " + url + " to zookeeper " +getUrl() + ", cause: " + e.getMessage(), e);
  }
}

Invoker是什么?


Invoker翻译成中文是调用器,它在Dubbo中其实是一个比较重要的领域对象,最核心的是在服务的发布和调用中,都是以Invoker的形态存在。


在刚刚的服务发布过程中,整体分为三个阶段


  • 第一个阶段会创造一个invoker
  • 第二个阶段会把经历过一系列处理的invoker(各种包装),在DubboProtocol中保存到exporterMap中
  • 第三个阶段把dubbo协议的url地址注册到注册中心上


而Invoker的作用就是收到客户端请求的时候,根据接口的全路径作为key,找到实例方法,然后通过反射去调用。


前面没有分析Invoker,我们来简单看看Invoker到底是一个啥东西。

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

ProxyFacotory.getInvoker


这个是一个代理工程,用来生成invoker,从它的定义来看,它是一个自适应扩展点,看到这样的扩展点,我们几乎可以不假思索的想到它会存在一个动态适配器类

ProxyFactory proxyFactory =
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

ProxyFactory


这个方法的简单解读为: 它是一个spi扩展点,并且默认的扩展实现是javassit, 这个接口中有三个方法,并且都是加了@Adaptive的自适应扩展点。所以如果调用getInvoker方法,应该会返回一个ProxyFactory$Adaptive

@SPI("javassist")
public interface ProxyFactory {
  @Adaptive({Constants.PROXY_KEY})
  <T> T getProxy(Invoker<T> invoker) throws RpcException;
  @Adaptive({Constants.PROXY_KEY})
  <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
  @Adaptive({Constants.PROXY_KEY})
  <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

ProxyFactory$Adaptive


这个自适应扩展点,做了两件事情


  • 通过ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)获取了一个指定名称的扩展点。
  • 在dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory中,定义了javassis=JavassisProxyFactory
  • 调用JavassisProxyFactory的getInvoker方法
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory
{
  public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws
org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==null");
    org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = url.getParameter("proxy", "javassist");
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])");
    org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);
    return extension.getProxy(arg0);
  }
  public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean
arg1) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==null");
    org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = url.getParameter("proxy", "javassist");
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])");
    org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);
    return extension.getProxy(arg0, arg1);
}
  public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws
org.apache.dubbo.rpc.RpcException {
    if (arg2 == null) throw new IllegalArgumentException("url == null");
    org.apache.dubbo.common.URL url = arg2;
    String extName = url.getParameter("proxy", "javassist");
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])");
    org.apache.dubbo.rpc.ProxyFactory extension =
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);
    return extension.getInvoker(arg0, arg1, arg2);
  }
}

JavassistProxyFactory.getInvoker


javassist是一个动态类库,用来实现动态代理的。


proxy:接口的实现: com.gupaoedu.practice.dubbo.SayHelloServiceImpl


type:接口全称 com.gupaoedu.dubbo.ISayHelloService

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
  final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
  return new AbstractProxyInvoker<T>(proxy, type, url) {
    @Override
    protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
      return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);
    }
  };
}

javassist生成的动态代理代码


通过断点的方式,在Wrapper.getWrapper中的makeWrapper,会创建一个动态代理,核心的方法invokeMethod代码如下

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws
java.lang.reflect.InvocationTargetException {
    com.gupaoedu.dubbo.practice.ISayHelloService w;
    try {
      w = ((com.gupaoedu.dubbo.practice.ISayHelloService) $1);
    } catch (Throwable e) {
      throw new IllegalArgumentException(e);
    }
    try {
      if ("sayHello".equals($2) && $3.length == 1) {
        return ($w) w.sayHello((java.lang.String) $4[0]);
      }
    } catch (Throwable e) {
      throw new java.lang.reflect.InvocationTargetException(e);
    }
    throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not
found method \"" + $2 + "\" in class com.gupaoedu.dubbo.practice.ISayHelloService.");
}

构建好了代理类之后,返回一个AbstractproxyInvoker,并且它实现了doInvoke方法,这个地方似乎看到了dubbo消费者调用过来的时候触发的影子,因为wrapper.invokeMethod本质上就是触发上面动态代理类的方法invokeMethod.

return new AbstractProxyInvoker<T>(proxy, type, url) {
  @Override
  protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
    return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);
  }
};

所以,简单总结一下Invoke本质上应该是一个代理,经过层层包装最终进行了发布。当消费者发起请求的时候,会获得这个invoker进行调用。


最终发布出去的invoker, 也不是单纯的一个代理,也是经过多层包装


InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))


目录
相关文章
|
存储 负载均衡 监控
深入理解Dubbo-6.服务消费源码分析(下)
深入理解Dubbo-6.服务消费源码分析
226 0
|
缓存 编解码 Dubbo
深入理解Dubbo-5.服务注册源码分析(中)
深入理解Dubbo-5.服务注册源码分析
198 0
|
XML Dubbo Java
【Dubbo3技术专题】回顾Dubbo2.x的技术原理和功能实现及源码分析(温故而知新)(二)
【Dubbo3技术专题】回顾Dubbo2.x的技术原理和功能实现及源码分析(温故而知新)
210 2
|
XML 监控 Dubbo
【Dubbo3技术专题】回顾Dubbo2.x的技术原理和功能实现及源码分析(温故而知新)(一)
【Dubbo3技术专题】回顾Dubbo2.x的技术原理和功能实现及源码分析(温故而知新)
271 1
|
Dubbo Java 应用服务中间件
深入理解Dubbo-6.服务消费源码分析(上)
深入理解Dubbo-6.服务消费源码分析
226 1
|
Dubbo Java 应用服务中间件
Dubbo 第四节: Spring与Dubbo整合原理与源码分析
DubboConfigConfigurationRegistrar的主要作⽤就是对propties⽂件进⾏解析并根据不同的配置项项⽣成对应类型的Bean对象。
359 0
|
Java Spring
深入理解Dubbo-7.服务消费调用源码分析(下)
深入理解Dubbo-7.服务消费调用源码分析
173 0
|
负载均衡 Dubbo 应用服务中间件
深入理解Dubbo-7.服务消费调用源码分析(中)
深入理解Dubbo-7.服务消费调用源码分析
206 0
|
负载均衡 算法 Dubbo
深入理解Dubbo-7.服务消费调用源码分析(上)
深入理解Dubbo-7.服务消费调用源码分析
375 0
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用