深入理解Dubbo-5.服务注册源码分析(中):https://developer.aliyun.com/article/1414061
getTransporter
getTransporter是一个自适应扩展点,它针对bind方法添加了自适应注解,意味着,bing方法的具体实现,会基于Transporter$Adaptive方法进行适配,那么在这里面默认的通信协议是netty,所以它会采用netty4的实现,也就是 org.apache.dubbo.remoting.transport.netty4.NettyTransporter。
public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}
NettyTransporter.bind
创建一个nettyserver
public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
NettyServer
初始化一个nettyserver,并且从url中获得相应的ip/ port。然后调用 doOpen();
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); // 获取 ip 和端口 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT); try { doOpen(); // 调用模板方法 doOpen 启动服务器 if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } executor = executorRepository.createExecutorIfAbsent(url); }
doOpen
开启netty服务
总体来说,这段代码的主要功能是使用 Netty 框架创建一个服务器,并进行一系列的初始化设置,包括线程池的创建、网络通道的初始化、服务器选项的设置以及管道工厂的创建等。最终将服务器绑定到指定地址上,准备接受客户端的连接请求。
// doOpen() 方法是一个受保护的方法,用于在子类中被调用以进行服务器的打开操作。 protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); // 这行代码用于设置 Netty 框架的日志工厂,以便配置日志记录器。 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); // 创建一个用于处理接受连接的线程池。 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); // 创建一个用于处理网络 IO 事件的线程池。 // 使用 NIO 方式创建 ServerSocketChannel 工厂,其中包含了 boss 线程池和 worker 线程池。 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); // 使用上一步创建的 ChannelFactory 初始化 ServerBootstrap 对象。 bootstrap = new ServerBootstrap(channelFactory); // NettyHandler 负责处理 Netty 的网络事件,这里创建了一个 NettyHandler 对象,并获取其通道列表。 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); /* 设置服务器选项: 设置 child.tcpNoDelay 为 true,表示禁用 Nagle 算法,即数据立即发送。 设置 backlog,指定了未完成连接队列的最大长度。 */ bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind // 使用 ServerBootstrap 绑定到指定的地址,并返回一个 ChannelFuture 对象。 channel = bootstrap.bind(getBindAddress()); }
然后需要要注意的是,它这里用到了一个handler来处理客户端传递过来的请求:
nettyServerHandler
NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
这个handler是一个链路,它的正确组成应该是
MultiMessageHandler(heartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHeadler(DubboProtocol))))
后续接收到的请求,会一层一层的处理。比较繁琐
服务注册流程
RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); ...... // 到这里才是真正意义上启动一个Netty Server,发布Dubbo协议的服务 // dubbo还是基于url驱动,所有每次执行都会去改变url final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); ...... if (register) { // 然后到这里才注册了服务 register(registryUrl, registeredProviderUrl); } ...... }
了解了服务的发布之后,我们继续来看一下服务是如何发起注册的。
服务注册实际上就是把dubbo的协议url地址保存到第三方注册中心上。
private void register(URL registryUrl, URL registeredProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); }
getRegistry
- 把url转化为对应配置的注册中心的具体协议
- 根据具体协议,从registryFactory中获得指定的注册中心实现
那么这个registryFactory具体是怎么赋值的呢?
private Registry getRegistry(final Invoker<?> originInvoker) { //把url转化为配置的具体协议,比如zookeeper://ip:port. 这样后续获得的注册中心就会是基于zk的实现 URL registryUrl = getRegistryUrl(originInvoker); return registryFactory.getRegistry(registryUrl); }
在RegistryProtocol中存在一段这样的代码,很明显这是通过依赖注入来实现的扩展点。
private RegistryFactory registryFactory; public void setRegistryFactory(RegistryFactory registryFactory) { this.registryFactory = registryFactory; }
按照扩展点的加载规则,我们可以先看看/META-INF/dubbo/internal路径下找到RegistryFactory的配置文件.这个factory有多个扩展点的实现。
dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory redis=org.apache.dubbo.registry.redis.RedisRegistryFactory consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
接着,找到RegistryFactory的实现, 发现它里面有一个自适应的方法,根据url中protocol传入的值进行适配
@SPI("dubbo") public interface RegistryFactory { @Adaptive({"protocol"}) Registry getRegistry(URL url);
RegistryFactory$Adaptive
由于在前面的代码中,url中的protocol已经改成了zookeeper,那么这个时候根据zookeeper获得的spi扩展点应该是RegistryFactoryWrapper
import org.apache.dubbo.common.extension.ExtensionLoader; public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory { public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); org.apache.dubbo.common.URL url = arg0; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + url.toString() + ") use keys([protocol])"); org.apache.dubbo.registry.RegistryFactory extension = (org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName); return extension.getRegistry(arg0); } }
RegistryFactoryWrapper
而registryFactory.getRegistry(url)中,由于此时的registryFactory已经是ZookeeperRegistryFactory,所以这里会得到一个zookeeperRegistry。
public class RegistryFactoryWrapper implements RegistryFactory { private RegistryFactory registryFactory; public RegistryFactoryWrapper(RegistryFactory registryFactory) { this.registryFactory = registryFactory; } @Override public Registry getRegistry(URL url) { return new ListenerRegistryWrapper(registryFactory.getRegistry(url), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class).getActivateExtension(url, "registry.listeners"))); } }
因此最终返回的Registry=ListenerRegistryWrapper。
下面这段代码的含义是:
- 获得注册的服务提供者地址
- 调用register发起注册
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { register(registryUrl, registeredProviderUrl); }
RegistryProtocol.register
发起注册流程,registry对象的实例是=ListenerRegistryWrapper。所以调用这个对象的register方法。
private void register(URL registryUrl, URL registeredProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); }
ListenerRegistryWrapper.register
这里做包装的目的,其实应该就是增加了一个监听器的处理过程。
@Override public void register(URL url) { try { registry.register(url); } finally { if (CollectionUtils.isNotEmpty(listeners)) { RuntimeException exception = null; for (RegistryServiceListener listener : listeners) { if (listener != null) { try { listener.onRegister(url); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } } }
ZookeeperRegistry
这个方法中并没有register方法,而ZookeeperRegsitry继承了FailbackRegistry,所以直接进入到FailbackRegistry这个类
- FailbackRegistry,从名字上来看,是一个失败重试机制
- 调用父类的register方法,讲当前url添加到缓存集合中
public void register(URL url) { if (!acceptable(url)) { logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type."); return; } super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } }
ZookeeperRegistry.doRegister
通过curator客户端,把服务地址写入到注册中心。
@Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " +getUrl() + ", cause: " + e.getMessage(), e); } }
Invoker是什么?
Invoker翻译成中文是调用器,它在Dubbo中其实是一个比较重要的领域对象,最核心的是在服务的发布和调用中,都是以Invoker的形态存在。
在刚刚的服务发布过程中,整体分为三个阶段
- 第一个阶段会创造一个invoker
- 第二个阶段会把经历过一系列处理的invoker(各种包装),在DubboProtocol中保存到exporterMap中
- 第三个阶段把dubbo协议的url地址注册到注册中心上
而Invoker的作用就是收到客户端请求的时候,根据接口的全路径作为key,找到实例方法,然后通过反射去调用。
前面没有分析Invoker,我们来简单看看Invoker到底是一个啥东西。
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
ProxyFacotory.getInvoker
这个是一个代理工程,用来生成invoker,从它的定义来看,它是一个自适应扩展点,看到这样的扩展点,我们几乎可以不假思索的想到它会存在一个动态适配器类
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
ProxyFactory
这个方法的简单解读为: 它是一个spi扩展点,并且默认的扩展实现是javassit, 这个接口中有三个方法,并且都是加了@Adaptive的自适应扩展点。所以如果调用getInvoker方法,应该会返回一个ProxyFactory$Adaptive
@SPI("javassist") public interface ProxyFactory { @Adaptive({Constants.PROXY_KEY}) <T> T getProxy(Invoker<T> invoker) throws RpcException; @Adaptive({Constants.PROXY_KEY}) <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException; @Adaptive({Constants.PROXY_KEY}) <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
ProxyFactory$Adaptive
这个自适应扩展点,做了两件事情
- 通过ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)获取了一个指定名称的扩展点。
- 在dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory中,定义了javassis=JavassisProxyFactory
- 调用JavassisProxyFactory的getInvoker方法
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory { public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==null"); org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])"); org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache .dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); } public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==null"); org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])"); org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache .dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0, arg1); } public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException { if (arg2 == null) throw new IllegalArgumentException("url == null"); org.apache.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString()+ ") use keys([proxy])"); org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache .dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } }
JavassistProxyFactory.getInvoker
javassist是一个动态类库,用来实现动态代理的。
proxy:接口的实现: com.gupaoedu.practice.dubbo.SayHelloServiceImpl
type:接口全称 com.gupaoedu.dubbo.ISayHelloService
@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments); } }; }
javassist生成的动态代理代码
通过断点的方式,在Wrapper.getWrapper中的makeWrapper,会创建一个动态代理,核心的方法invokeMethod代码如下
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { com.gupaoedu.dubbo.practice.ISayHelloService w; try { w = ((com.gupaoedu.dubbo.practice.ISayHelloService) $1); } catch (Throwable e) { throw new IllegalArgumentException(e); } try { if ("sayHello".equals($2) && $3.length == 1) { return ($w) w.sayHello((java.lang.String) $4[0]); } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.gupaoedu.dubbo.practice.ISayHelloService."); }
构建好了代理类之后,返回一个AbstractproxyInvoker,并且它实现了doInvoke方法,这个地方似乎看到了dubbo消费者调用过来的时候触发的影子,因为wrapper.invokeMethod本质上就是触发上面动态代理类的方法invokeMethod.
return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments); } };
所以,简单总结一下Invoke本质上应该是一个代理,经过层层包装最终进行了发布。当消费者发起请求的时候,会获得这个invoker进行调用。
最终发布出去的invoker, 也不是单纯的一个代理,也是经过多层包装
InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))

