dubbo - 服务发布流程

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 说明: 图片来自dubbo剖析:一 服务发布 ServiceBean作为provider的发布入口 ServiceConfig负责创建exporter对象 RegistryProtocol负责注册URL到zookeeper DubboProtocol负责bind端口启动监听 发布过程 - 源码走读 provider的发布入口 说明: ServiceBean初始化后的afterPropertiesSet方法负责执行服务导出。

image

说明:

  • 图片来自dubbo剖析:一 服务发布
  • ServiceBean作为provider的发布入口
  • ServiceConfig负责创建exporter对象
  • RegistryProtocol负责注册URL到zookeeper
  • DubboProtocol负责bind端口启动监听


发布过程 - 源码走读

provider的发布入口

说明:

  • ServiceBean初始化后的afterPropertiesSet方法负责执行服务导出。
public class ServiceBean<T> extends ServiceConfig<T> implements 
  InitializingBean, DisposableBean, ApplicationContextAware, 
  ApplicationListener<ContextRefreshedEvent>, BeanNameAware {

    public void afterPropertiesSet() throws Exception {
        
        // 解析各种配置信息,省略相关代码
        if (!isDelay()) {
            export();
        }
    }
}


exporter的创建过程

说明:

  • doExportUrls负责遍历所有注册中心执行服务发布。
  • doExportUrlsFor1Protocol内部通过proxyFactory.getInvoker()创建invoker。
  • doExportUrlsFor1Protocol内部通过protocol.export()创建exporter。
  • protocol.export()进入RegistryProtocol的处理逻辑。
public class ServiceConfig<T> extends AbstractServiceConfig {

    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }


    protected synchronized void doExport() {
        // 省略相关代码,服务发布走的流程路径之一
        doExportUrls();
    }


    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        // 遍历所有注册中心
        for (ProtocolConfig protocolConfig : protocols) {
            // 服务发布走的流程路径之一
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        String scope = url.getParameter(Constants.SCOPE_KEY);
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                // 走远程export逻辑
                if (registryURLs != null && registryURLs.size() > 0) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        // proxyFactory负责创建invoker对象并且包装成wrapperInvoker
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        // 走RegistryProtocol的export操作
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    //  走本地export逻辑
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }

        this.urls.add(url);
    }
}


export的过程

说明:

  • doLocalExport()内部执行protocol.export()进入DubboProtocol的处理逻辑。
  • register()负责注册导出服务到zookeeper节点,进入ZookeeperRegistry的处理逻辑。
  • registry.subscribe()负责订阅,具体订阅内容暂时没理解。
public class RegistryProtocol implements Protocol {

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 这里走服务export的核心流程,会走到dubbo协议的发布流程
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        boolean register = registedProviderUrl.getParameter("register", true);
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
        if (register) {
            // 注册到zookeeper的注册中心
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }

            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }



    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    // protocol.export()执行导出exporter,是DobboProtocol,通过ExporterChangeableWrapper包装
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }


    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }
}


bind端口启动监听

说明:

  • DubboProtocol通过exporterMap保存导出的exporter对象。
  • DubboProtocol通过openServer()方法bind端口启动监听。
public class DubboProtocol extends AbstractProtocol {

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        // 核心在于打开socket连接
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }


    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }


    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
}


注册发布的服务到zookeeper中心

说明:

  • ZookeeperRegistry的doRegister负责创建zookeeper节点保存发布URL信息。
public abstract class FailbackRegistry extends AbstractRegistry {

    public void register(URL url) {
        if (destroyed.get()){
            return;
        }
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 注册zookeeper节点上
            doRegister(url);
        } catch (Exception e) {
           // 省略相关代码
        }
    }

    protected abstract void doRegister(URL url);
}

public class ZookeeperRegistry extends FailbackRegistry {

    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
8天前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
39 0
|
8天前
|
Dubbo Java 应用服务中间件
Dubbo服务暴露机制解密:深入探讨服务提供者的奥秘【九】
Dubbo服务暴露机制解密:深入探讨服务提供者的奥秘【九】
28 0
|
8天前
|
缓存 运维 监控
Dubbo服务降级:保障稳定性的终极指南【六】
Dubbo服务降级:保障稳定性的终极指南【六】
43 0
|
8天前
|
SpringCloudAlibaba Dubbo Java
SpringCloud Alibaba集成Dubbo实现远程服务间调用
SpringCloud Alibaba集成Dubbo实现远程服务间调用
|
8天前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
【Dubbo3高级特性】「框架与服务」 Nacos作为注册中心-服务分组及服务分组聚合实现
67 0
|
8天前
|
Cloud Native Dubbo 应用服务中间件
【Dubbo3高级特性】「微服务云原生架构」带你从零基础认识搭建公司内部服务用户中心体系(实战指南-序章)
【Dubbo3高级特性】「微服务云原生架构」带你从零基础认识搭建公司内部服务用户中心体系(实战指南-序章)
71 0
|
8天前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
51 0
|
8天前
|
XML Dubbo Java
【Dubbo3高级特性】「提升系统安全性」手把手教你如何通过令牌进行Dubbo3服务验证及服务鉴权控制实战指南(二)
【Dubbo3高级特性】「提升系统安全性」手把手教你如何通过令牌进行Dubbo3服务验证及服务鉴权控制实战指南
59 0
|
8天前
|
XML Cloud Native Dubbo
【Dubbo3高级特性】「提升系统安全性」手把手教你如何通过令牌进行Dubbo3服务验证及服务鉴权控制实战指南(一)
【Dubbo3高级特性】「提升系统安全性」手把手教你如何通过令牌进行Dubbo3服务验证及服务鉴权控制实战指南
56 1
|
8天前
|
Kubernetes Dubbo 应用服务中间件
【Dubbo3终极特性】「流量治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin
【Dubbo3终极特性】「流量治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin
78 0