dubbo - 服务发布流程-阿里云开发者社区

开发者社区> 晴天哥> 正文

dubbo - 服务发布流程

简介: 说明: 图片来自dubbo剖析:一 服务发布 ServiceBean作为provider的发布入口 ServiceConfig负责创建exporter对象 RegistryProtocol负责注册URL到zookeeper DubboProtocol负责bind端口启动监听 发布过程 - 源码走读 provider的发布入口 说明: ServiceBean初始化后的afterPropertiesSet方法负责执行服务导出。
+关注继续查看

image

说明:

  • 图片来自dubbo剖析:一 服务发布
  • ServiceBean作为provider的发布入口
  • ServiceConfig负责创建exporter对象
  • RegistryProtocol负责注册URL到zookeeper
  • DubboProtocol负责bind端口启动监听


发布过程 - 源码走读

provider的发布入口

说明:

  • ServiceBean初始化后的afterPropertiesSet方法负责执行服务导出。
public class ServiceBean<T> extends ServiceConfig<T> implements 
  InitializingBean, DisposableBean, ApplicationContextAware, 
  ApplicationListener<ContextRefreshedEvent>, BeanNameAware {

    public void afterPropertiesSet() throws Exception {
        
        // 解析各种配置信息,省略相关代码
        if (!isDelay()) {
            export();
        }
    }
}


exporter的创建过程

说明:

  • doExportUrls负责遍历所有注册中心执行服务发布。
  • doExportUrlsFor1Protocol内部通过proxyFactory.getInvoker()创建invoker。
  • doExportUrlsFor1Protocol内部通过protocol.export()创建exporter。
  • protocol.export()进入RegistryProtocol的处理逻辑。
public class ServiceConfig<T> extends AbstractServiceConfig {

    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }


    protected synchronized void doExport() {
        // 省略相关代码,服务发布走的流程路径之一
        doExportUrls();
    }


    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        // 遍历所有注册中心
        for (ProtocolConfig protocolConfig : protocols) {
            // 服务发布走的流程路径之一
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        String scope = url.getParameter(Constants.SCOPE_KEY);
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                // 走远程export逻辑
                if (registryURLs != null && registryURLs.size() > 0) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        // proxyFactory负责创建invoker对象并且包装成wrapperInvoker
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        // 走RegistryProtocol的export操作
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    //  走本地export逻辑
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }

        this.urls.add(url);
    }
}


export的过程

说明:

  • doLocalExport()内部执行protocol.export()进入DubboProtocol的处理逻辑。
  • register()负责注册导出服务到zookeeper节点,进入ZookeeperRegistry的处理逻辑。
  • registry.subscribe()负责订阅,具体订阅内容暂时没理解。
public class RegistryProtocol implements Protocol {

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 这里走服务export的核心流程,会走到dubbo协议的发布流程
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        boolean register = registedProviderUrl.getParameter("register", true);
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
        if (register) {
            // 注册到zookeeper的注册中心
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }

            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }



    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    // protocol.export()执行导出exporter,是DobboProtocol,通过ExporterChangeableWrapper包装
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }


    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }
}


bind端口启动监听

说明:

  • DubboProtocol通过exporterMap保存导出的exporter对象。
  • DubboProtocol通过openServer()方法bind端口启动监听。
public class DubboProtocol extends AbstractProtocol {

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        // 核心在于打开socket连接
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }


    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }


    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
}


注册发布的服务到zookeeper中心

说明:

  • ZookeeperRegistry的doRegister负责创建zookeeper节点保存发布URL信息。
public abstract class FailbackRegistry extends AbstractRegistry {

    public void register(URL url) {
        if (destroyed.get()){
            return;
        }
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 注册zookeeper节点上
            doRegister(url);
        } catch (Exception e) {
           // 省略相关代码
        }
    }

    protected abstract void doRegister(URL url);
}

public class ZookeeperRegistry extends FailbackRegistry {

    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Dubbo服务消费者调用过程
上图是服务消费的主过程: 首先通过ReferenceConfig类的private void init()方法会先检查初始化所有的配置信息后,调用private T createProxy(Map map)创建代理,消费者最终得到的是服务的代理, 在createProxy接着调用Protocol接口实现的 Invoker refer(Class type, URL url)方法生成Invoker实例(如上图中的红色部分),这是服务消费的关键。
955 0
阿里云服务器备案流程(ICP备案)
阿里云服务器备案流程(ICP备案)
828 0
使用Zipkin和Brave 实现dubbo服务调用跟踪
使用Zipkin和Brave 实现dubbo服务调用跟踪 通过dubbo的Filter来实现dubbo调用链的跟踪信息,跟踪实现类为DrpcClientInterceptor和DrpcServerInterceptor,分别实现消费方与提供方的服务跟踪。
3258 0
源码分析Dubbo服务消费端启动流程
通过前面文章详解,我们知道Dubbo服务消费者标签dubbo:reference最终会在Spring容器中创建一个对应的ReferenceBean实例,而ReferenceBean实现了Spring生命周期接口:InitializingBean,接下来应该看一下其afterPropertiesSet方法的实现。
1466 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
11934 0
数据管理 DMS :SQLServer 2008的性能优化服务发布
数据库性能诊断和优化是提高数据库性能和稳定性的关键技术之一。快速的发现异常、定位根因并且进行止损,是每个用户的需求。 数据管理DMS 增强了对SQL Server 2008 R2 性能优化服务的支持,希望可以帮助用户更快速的定位问题和解决问题。
2185 0
+关注
晴天哥
专注java技术,热爱长跑和阅读开源代码
400
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载