深入理解Dubbo-5.服务注册源码分析(中)

简介: 深入理解Dubbo-5.服务注册源码分析

深入理解Dubbo-5.服务注册源码分析(上):https://developer.aliyun.com/article/1414041


initialize()

public void initialize() {
        if (!initialized.compareAndSet(false, true)) {
            return;
        }
    // 初始化ApplicationModel:初始化应用模型,其中会维护很多配置信息。
        ApplicationModel.initFrameworkExts();
      // 启动配置中心,用于管理和获取配置信息。
        startConfigCenter();
    // 加载远程配置信息。
        loadRemoteConfigs();
    // 检查全局配置信息。
        checkGlobalConfigs();
        // @since 2.7.8
      // 在2.7.8版本之后的Dubbo中启动元数据中心。
        startMetadataCenter();
    // 初始化元数据服务。
        initMetadataService();
    // 初始化事件监听器。
        initEventListener();
    // 打印初始化完成的日志信息。
        if (logger.isInfoEnabled()) {
            logger.info(NAME + " has been initialized!");
        }
    }

exportServices


发布Dubbo服务

private void exportServices() {
    // 这里面有我们要发布服务的列表
    configManager.getServices().forEach(sc -> {
        // TODO, compatible with ServiceConfig.export()
        ServiceConfig serviceConfig = (ServiceConfig) sc;
        serviceConfig.setBootstrap(this);
        // 是否异步发布还是同步发布
        if (exportAsync) {
            // 使用线程池来异步发布
            ExecutorService executor = executorRepository.getServiceExporterExecutor();
            Future<?> future = executor.submit(() -> {
                sc.export();
                exportedServices.add(sc);
            });
            asyncExportingFutures.add(future);
        } else {
            sc.export();
            exportedServices.add(sc);
        }
    });
}

遍历所有dubbo服务,进行服务发布.

<dubbo:service beanName="ServiceBean:com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService" />
<dubbo:service beanName="ServiceBean:com.gupaoedu.springboot.dubbo.ISayHelloService" />
dubbo://ip:port?com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService&xxx&xxx
dubbo://ip:port?com.gupaoedu.springboot.dubbo.ISayHelloService&xxx&xxx

一个dubbo服务需要发布几次,取决于协议的配置数,如果一个dubbo服务配置了3个协议,rest、webservice、dubbo。


这个时候实际上就会生成三个地址:


dubbo://

rest://

webservice://


export

public synchronized void export() {
    // 首先检查 是否为空,如果为空则获取 DubboBootstrap 的实例,并进行初始化。
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.initialize();
        }
    // 调用 方法,用于检查和更新子配置。
        checkAndUpdateSubConfigs();
        //init serviceMetadata
      // 初始化 ,设置服务的版本、分组、接口类型、接口名称和目标引用。
        serviceMetadata.setVersion(getVersion());
        serviceMetadata.setGroup(getGroup());
        serviceMetadata.setDefaultGroup(getGroup());
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());
    // 如果不应该导出服务,则直接返回。
        if (!shouldExport()) {
            return;
        }
    // 如果应该延迟导出服务,则使用 延迟执行 方法,延迟时间由 方法返回,时间单位为毫秒。DELAY_EXPORT_EXECUTORdoExportgetDelay()
    /*
    为什么延迟发布呢?
    之前老的版本里面,考虑spring配置的装载和 dubbo服务启动配置之间,会有一个先后的关系,如果说spring的一些配置还没有加载,但是dubbo服务已经启动了,这个时候就会导致一定的问题,有的时候就是为了延迟几秒钟等到spring环境加载好了,再去启动dubbo。也是为了保障服务启动的安全性。
    */
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            // 如果不需要延迟导出,则直接调用 方法导出服务。
            doExport();
        }
    // 最后调用 方法,表示服务已经导出。exported()
        exported();
    }

exported

public void exported() {
        List<URL> exportedURLs = this.getExportedUrls();
        exportedURLs.forEach(url -> {
            Map<String, String> parameters = getApplication().getParameters();
            ServiceNameMapping.getExtension(parameters != null ? parameters.get(MAPPING_KEY) : null).map(url);
        });
        // dispatch a ServiceConfigExportedEvent since 2.7.4
    // 发布完成以后 会发布一个事件,服务配置启动成功的事件
        dispatch(new ServiceConfigExportedEvent(this));
    }

doExport

protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
    }

doExportUrls


去发布这个url,也就是基于url的驱动去进行服务的发布,也就到了最关键的阶段了。

private void doExportUrls() {
    // 构建一个 ServiceRepository,将一些服务的描述信息都存储到这里,后续如果要去用的话,就从这里面拿到
    ServiceRepository repository = ApplicationModel.getServiceRepository();
    ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
    repository.registerProvider(
        getUniqueServiceName(),
        ref,
        serviceDescriptor,
        this,
        serviceMetadata
    );
    // 在这里先去拿到注册中心的url,之前前面提到过,一个服务可以配置多个注册中心,也可以配置多个协议
    List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
    // 遍历所有的协议,如果有多个协议的话,就采用不同的协议去发布
    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig)
                                      .map(p -> p + "/" + path)
                                      .orElse(path), group, version);
        // In case user specified path, register service one more time to map it to path.
        repository.registerService(pathKey, interfaceClass);
        // TODO, uncomment this line once service key is unified
        serviceMetadata.setServiceKey(pathKey);
        // 这里将对应的协议 和 多个注册中心传递过去
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

doExportUrlsFor1Protocol


到了这里,也就是最核心的操作


  • 生成url
  • 根据url中配置的协议类型,调用指定协议进行服务的发布
  • 启动服务
  • 注册服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 如果我们配置协议,默认采用Dubbo协议发布
    String name = protocolConfig.getName();
    if (StringUtils.isEmpty(name)) {
        name = DUBBO;
    }
    //用来存储所有的配置信息
    /* dubbo:service
        dubbo:method
            dubbo:argument*/
    Map<String, String> map = new HashMap<String, String>();
    map.put(SIDE_KEY, PROVIDER_SIDE);
    ServiceConfig.appendRuntimeParameters(map);
    AbstractConfig.appendParameters(map, getMetrics());
    AbstractConfig.appendParameters(map, getApplication());
    AbstractConfig.appendParameters(map, getModule());
    // remove 'default.' prefix for configs from ProviderConfig
    // appendParameters(map, provider, Constants.DEFAULT_KEY);
    AbstractConfig.appendParameters(map, provider);
    AbstractConfig.appendParameters(map, protocolConfig);
    AbstractConfig.appendParameters(map, this);
    MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
    if (metadataReportConfig != null && metadataReportConfig.isValid()) {
        map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
    }
    if (CollectionUtils.isNotEmpty(getMethods())) {
        // 去遍历解析所有的methods
        for (MethodConfig method : getMethods()) {
            AbstractConfig.appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            List<ArgumentConfig> arguments = method.getArguments();
            if (CollectionUtils.isNotEmpty(arguments)) {
                // 然后再去遍历解析里面的 arguments
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    if (argument.getType() != null && argument.getType().length() > 0) {
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods.length > 0) {
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                if (methodName.equals(method.getName())) {
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    if (argument.getIndex() != -1) {
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        } else {
                                            throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        for (int j = 0; j < argtypes.length; j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())) {
                                                AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                    throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) {
                        AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    } else {
                        throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }
                }
            }
        } // end of methods for
    }
    // 最终这个if结束完了之后,参数也就组装完成了。
    // 针对泛化添加的参数
    if (ProtocolUtils.isGeneric(generic)) {
        map.put(GENERIC_KEY, generic);
        map.put(METHODS_KEY, ANY_VALUE);
    } else {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put(REVISION_KEY, revision);
        }
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    // 针对token添加的参数
    /**
         * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
         */
    if(ConfigUtils.isEmpty(token) && provider != null) {
        token = provider.getToken();
    }
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put(TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(TOKEN_KEY, token);
        }
    }
    //init serviceMetadata attachments
    serviceMetadata.getAttachments().putAll(map);

在这之前,实际上都是参数的组装

这些数据就是service的元数据,也就是要将这些数据组装成元数据

    // export service
  // 最终获取到 host的地址 以及 端口号
    String host = findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = findConfigedPorts(protocolConfig, name, map);
    // 然后就是组装url了
    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

这也就是我们注册到服务中心的一个地址,但是还没有注册 和 发布

    // You can customize Configurator to append extra parameters
// 这里就到了扩展点,如果有扩展的配置需要去装载的话,有的话,会根据扩展的配置去比对url,替换url里面的各种参数。
// 当有了扩展点的知识后,现在看这里就清晰很多了
/*
首先,代码通过 ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 获取 ConfiguratorFactory 的扩展加载器。
然后,它检查是否有针对指定 URL 协议的 ConfiguratorFactory 扩展。这里的 url.getProtocol() 可能是获取 URL 的协议部分,比如 "http"、"https" 等。
如果存在针对该协议的 ConfiguratorFactory 扩展,代码会调用它的 getConfigurator 方法来获取一个 Configurator 实例,并且使用该实例来对原始的 URL 进行配置,得到新的 URL。
*/
f (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
        .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }
  // 根据其范围来发布
    String scope = url.getParameter(SCOPE_KEY);
    // 如果scope != null
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
        // 如果其不等于远程,就发送一个本地
        if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);
-----------------------------------------------------------------------------
    /*
这段代码也是一个 Java 代码片段,看起来是在使用 Dubbo 框架进行服务导出(export)的过程。让我来解释一下:
首先,代码通过 URLBuilder.from(url) 创建了一个新的 URLBuilder 对象,并将传入的 url 作为初始 URL。
接着,它使用 URLBuilder 的 setProtocol、setHost、setPort 方法分别设置了协议(LOCAL_PROTOCOL)、主机(LOCALHOST_VALUE)和端口(0)。
然后,通过调用 build() 方法,将 URLBuilder 对象构建为一个新的 URL 对象,即 local。
接下来,代码使用 PROXY_FACTORY.getInvoker 方法,根据 ref、interfaceClass 和 local 创建了一个 Invoker 对象。 PROXY_FACTORY 可能是一个代理工厂,用于创建服务的代理对象。
然后,通过 PROTOCOL.export 方法将该 Invoker 导出为一个 Exporter 对象。 PROTOCOL 可能是 Dubbo 的协议实现,用于处理服务的导出和通信。
最后,代码将导出的 Exporter 对象添加到 exporters 集合中,并打印日志信息,记录导出的服务接口类名和本地注册的 URL。
简而言之,这段代码的作用是将一个服务根据指定的 URL 导出到本地注册中心,以供本地消费者调用。它会构建一个新的本地 URL,创建 Invoker 对象,并使用 Dubbo 的协议实现导出服务,最后记录日志并将 Exporter 对象添加到 exporters 集合中。
    */
    private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL)
                .setHost(LOCALHOST_VALUE)
                .setPort(0)
                .build();
        Exporter<?> exporter = PROTOCOL.export(
                PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }       
-----------------------------------------------------------------------------
        }
        // 发布远程服务
        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            if (CollectionUtils.isNotEmpty(registryURLs)) {
                // 判断发布的服务要注册到那几个注册中心
                // 循环遍历配置的注册中心的列表
                for (URL registryURL : registryURLs) {
                    //if protocol is only injvm ,not register
                    if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                        continue;
                    }
                    // 添加参数
                    url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                    URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        if (url.getParameter(REGISTER_KEY, true)) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        } else {
                            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                        }
                    }
                    // For providers, this is used to enable custom proxy to generate invoker
                    String proxy = url.getParameter(PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        // registry://ip:port
                        registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                    }

Invoker ,调用器. 服务提供者、服务的消费者。

          // 生成一个Invoker
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
          // 将这个发布出去
                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
          exporters.add(exporter);
                }
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                exporters.add(exporter);
            }
            /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
            WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
            if (metadataService != null) {
                metadataService.publishServiceDefinition(url);
            }
        }
    }
    this.urls.add(url);
}
-------------------------------------------------------------------------------
Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
// 其得到的是一个自适应的扩展点,会动态生成Protocol$Adaptive ,然后会调用这个实例里面的export方法
// 理论上应该返回 extension = RegistryProtocol
// 实际返回的是这个QosProtocolWrapper(ProtocolFilterWrapper(ProtocolListenerWrapper(RegistryProtocol))

Wrapper包装


在ExtensionLoader.loadClass这个方法中,有一段这样的判断,如果当前这个类是一个wrapper包装类,也就是这个wrapper中有构造方法,参数是当前被加载的扩展点的类型,则把这个wrapper类加入到cacheWrapperClass缓存中。

else if (isWrapperClass(clazz)) {
  cacheWrapperClass(clazz);
}
private boolean isWrapperClass(Class<?> clazz) {
  try {
    clazz.getConstructor(type);
    return true;
  } catch (NoSuchMethodException e) {
    return false;
  }
}
//上面的判断是说,只要针对当前扩展点的类,如果存在一个构造方法,参数是当前需要加载的扩展点的对
象,那么就会进行包装
public ProtocolListenerWrapper(Protocol protocol) {
  if (protocol == null) {
    throw new IllegalArgumentException("protocol == null");
  }
  this.protocol = protocol;
}

我们可以在dubbo的配置文件中找到三个Wrapper org.apache.dubbo.rpc.Protocol 。

qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper

ProtocolListenerWrapper, 用于服务export时候插入监听机制


QosprotocolWrapper, 如果当前配置了注册中心,则会启动一个Qos server.qos是dubbo的在线运维命令,dubbo2.5.8新版本重构了telnet模块,提供了新的telnet命令支持,新版本的telnet端口与dubbo协议的端口是不同的端口,默认为22222


ProtocolFilterWrapper,对invoker进行filter的包装,实现请求的过滤


接着,在getExtension->createExtension方法中,会对cacheWrapperClass集合进行判断,如果集合不为空,则进行包装

Set<Class<?>> wrapperClasses = cachedWrapperClasses;
  if (CollectionUtils.isNotEmpty(wrapperClasses)) {
    for (Class<?> wrapperClass : wrapperClasses) {
      instance = injectExtension((T)wrapperClass.getConstructor(type).newInstance(instance));
    }
  }

这三个扩展点在注册场景中都不会生效,执行的逻辑中会先判断当前是否是注册协议,如果是则直接基

于协议去发布服务

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
    return protocol.export(invoker);
  }
  return protocol.export(buildInvokerChain(invoker,Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol{
  public void destroy() {
    throw new UnsupportedOperationException("The method public abstractvoid org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
  }
  public int getDefaultPort() {
    throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
  }
  public Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) throw new
IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() ==
null");
    URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" :url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
    Protocol extension =
(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dub
bo.rpc.Protocol.class).getExtension(extName);
        // 实际上调用的是动态生成的适配类中的export();
    return extension.export(arg0);
}
  public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0,
org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
    if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
    String extName = ( url.getProtocol() == null ? "dubbo" :url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension =(org.apache.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
}
  public java.util.List getServers() {
    throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
  }
}
-------------------------------------------------------------------------------            

RegistryProtocol

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
      // 到这里才是真正意义上启动一个Netty Server,发布Dubbo协议的服务
      // dubbo还是基于url驱动,所有每次执行都会去改变url
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 然后到这里才注册了服务
            register(registryUrl, registeredProviderUrl);
        }
        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }


服务发布流程


doLocalExport


服务启动过程

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

protocol.export


protocol是通过依赖注入来初始化的一个协议扩展点,并且我们可以看到这个protocol.export()方法上

增加了@Adaptive注解,表示它是一个动态适配的扩展点,意味着最终的执行链路应该是


ProtocolListenerWrapper ->QosProtocolWrapper ->ProtocolFilterWrapper-DubboProtocol

所以这里又回到了自适应扩展


如果 ProviderUrl: dubbo:// 是这样,那么就会选择 DubboProtocol


所以最终会去调用 DubboProtocol.export()

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 把服务对应的invoker 存储,将来调用的时候,从map中拿到即可
        exporterMap.put(key, exporter);
        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            }
        }
    // 开启一个服务
        openServer(url);
      // 优化序列化
        optimizeSerialization(url);
        return exporter;
    }

openServer


往下看这个过程,进入到openServer(),从名字来看它是用来开启一个服务。


去开启一个服务,并且放入到缓存中->在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。

private void openServer(URL url) {
      // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
        String key = url.getAddress();
      //client 也可以暴露一个只有server可以调用的服务
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            //是否在serverMap中缓存了
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        // 创建服务器实例
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // 服务器已创建,则根据 url 中的配置重置服务器
                server.reset(url);
            }
        }
    }

createServer


创建服务.

在很多地方,这个地址一直伴随着dubbo的启动、消费、以及整个生命周期中。

private ProtocolServer createServer(URL url) {
    //组装url,在url中添加心跳时间、编解码参数
        url = URLBuilder.from(url)
                // 当服务关闭以后,发送一个只读的事件,默认是开启状态
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // 启动心跳配置
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    //通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
    //创建ExchangeServer.
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return new DubboProtocolServer(server);
    }

Exchangers.bind

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
    //获取 Exchanger,默认为 HeaderExchanger。
  //调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

headerExchanger.bind


这里面包含多个逻辑:


  • new DecodeHandler(new HeaderExchangeHandler(handler))
  • Transporters.bind
  • new HeaderExchangeServer


这段代码是一个 Java 方法,其作用是将一个 ExchangeHandler 对象绑定到某个 URL 上,以创建一个 ExchangeServer 对象并返回。ExchangeServer 是一个 Dubbo 框架中的概念,表示一个 Dubbo 服务提供方所使用的网络通信服务器。


具体来说,这个方法接收两个参数:


  • URL url:表示要绑定到的目标 URL。这个 URL 包含了一些必要的信息,比如主机名、端口号、协议类型等。
  • ExchangeHandler handler:表示要绑定的 ExchangeHandler 对象,它是 Dubbo 框架中的核心组件之一,负责处理网络通信协议的编解码、消息的序列化和反序列化等工作。


在实现中,这个方法首先通过 Transporters.bind() 方法创建一个网络服务器,该方法会根据传入的 URL 中指定的协议类型(比如 TCP、HTTP 等),创建对应的网络服务器。然后,将这个新创建的服务器对象传递给一个 DecodeHandler 对象进行进一步的处理。DecodeHandler 是 Dubbo 框架中的一个组件,用于对网络数据进行解码和转换。最后,将这个 DecodeHandler 对象封装在一个 HeaderExchangeHandler 对象中,完成 Dubbo 协议头的添加和解析工作。HeaderExchangeHandler 是 Dubbo 框架中的一个核心组件,负责处理 Dubbo 协议头的生成和解析。


最终,这个方法会返回一个新的 HeaderExchangeServer 对象,该对象封装了之前创建的网络服务器以及协议头解析器等组件。这个 HeaderExchangeServer 对象可以被用来启动 Dubbo 服务提供方的网络通信服务。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }


深入理解Dubbo-5.服务注册源码分析(下):https://developer.aliyun.com/article/1414078

目录
相关文章
|
3月前
|
存储 负载均衡 监控
深入理解Dubbo-6.服务消费源码分析(下)
深入理解Dubbo-6.服务消费源码分析
33 0
|
3月前
|
缓存 Dubbo Java
深入理解Dubbo-5.服务注册源码分析(下)
深入理解Dubbo-5.服务注册源码分析
30 0
|
3月前
|
Dubbo Java 应用服务中间件
深入理解Dubbo-5.服务注册源码分析(上)
深入理解Dubbo-5.服务注册源码分析
42 0
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo 第四节: Spring与Dubbo整合原理与源码分析
DubboConfigConfigurationRegistrar的主要作⽤就是对propties⽂件进⾏解析并根据不同的配置项项⽣成对应类型的Bean对象。
|
3月前
|
Java Spring
深入理解Dubbo-7.服务消费调用源码分析(下)
深入理解Dubbo-7.服务消费调用源码分析
37 0
|
3月前
|
负载均衡 Dubbo 应用服务中间件
深入理解Dubbo-7.服务消费调用源码分析(中)
深入理解Dubbo-7.服务消费调用源码分析
43 0
|
3月前
|
负载均衡 算法 Dubbo
深入理解Dubbo-7.服务消费调用源码分析(上)
深入理解Dubbo-7.服务消费调用源码分析
77 0
|
3月前
|
Dubbo Java 应用服务中间件
深入理解Dubbo-6.服务消费源码分析(上)
深入理解Dubbo-6.服务消费源码分析
35 1
|
7月前
|
负载均衡 Dubbo Java
RPC框架-dubbo:架构及源码分析-初篇
在自学或面试dubbo时,相关的问题有很多,例如dubbo 的基本工作原理,这是使用过dubbo后应该知道的。包括dubbo的分层架构、长短链接选择、二进制协议支持;之后是使用方式(服务的注册、发现、调用方式),基础配置(超时时间、线程数),这些是最基本的。 在这些问题之后,就可以继续深入底层:关于连接方式,使用长连接还是短连接?为什么? dubbo的二进制协议支持哪些,之间有什么区别/优缺点等等,也可以考察在使用过程中遇到过哪些问题,是如何解决的。这些都需要深入理解,并且有真实、长时间使用经验。
146 0
|
5月前
|
负载均衡 Dubbo 应用服务中间件
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
55 0