开发者社区> 晴天哥> 正文

Dubbo ZookeeperRegistry分析

简介: 开篇  这篇文章的目的是在于梳理Dubbo ZookeeperRegistry的注册流程,通过这个流程的分析能够延伸到更多的注册中心。  核心的关注点在于Registry和RegistryFactory对象,RegistryFactory负责动态创建Registry对象,Registry对象负责执行注册中心的注册。
+关注继续查看

开篇

 这篇文章的目的是在于梳理Dubbo ZookeeperRegistry的注册流程,通过这个流程的分析能够延伸到更多的注册中心。

 核心的关注点在于Registry和RegistryFactory对象,RegistryFactory负责动态创建Registry对象,Registry对象负责执行注册中心的注册。

  ServiceConfig类的Protocol的动态代理根据类型返回Protocol对象,注册中心的Protocol对象是RegistryProtocol。


源码分析过程

public class ServiceConfig<T> extends AbstractServiceConfig {
    // protocol是Protocol$Adaptive对象
    private static final Protocol protocol = 
      ExtensionLoader.getExtensionLoader(Protocol.class)
     .getAdaptiveExtension();


    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        String scope = url.getParameter(SCOPE_KEY);

        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 执行Protocol$Adaptive的export()方法
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {

                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    // 执行Protocol$Adaptive的export()方法
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
    }
}
  • ServiceConfig的doExportUrlsFor1Protocol()方法执行export()方法。
  • protocol.export()中的protocol指的是Protocol$Adaptive对象。
  • 继续阅读Protocol$Adaptive的export()方法。


public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        //  这里的url内容是registry:xxx,如下面的
        org.apache.dubbo.common.URL url = arg0.getUrl();
        // extName指的是url的协议名,这里是取的registry
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        // 返回RegistryProtocol对象
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        
        return extension.export(arg0);
    }   
}




registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider&dubbo=2.0.2
&export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=24212
&release=&side=provider&timestamp=1571482218716&pid=24212
&registry=zookeeper&timestamp=1571482215899
  • Protocol$Adaptive的export()方法内部根据URL获取扩展名,url的协议名是registry,返回的扩展是RegistryProtocol对象。
  • 继续阅读RegistryProtocol的export()方法。


public class RegistryProtocol implements Protocol {

    private Cluster cluster;
    private Protocol protocol;
    // registryFactory是RegistryFactory$Adaptive对象
    private RegistryFactory registryFactory;
    private ProxyFactory proxyFactory;

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        URL providerUrl = getProviderUrl(originInvoker);

        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        return new DestroyableExporter<>(exporter);
    }



    private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);

        return registryFactory.getRegistry(registryUrl);
    }


    public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }
}



public class RegistryFactory$Adaptive implements RegistryFactory {

    public Registry getRegistry(URL uRL) {
        String string;
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }

        URL uRL2 = uRL;
        String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (").append(uRL2.toString()).append(") use keys([protocol])").toString());
        }

        RegistryFactory registryFactory = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string);
        
        return registryFactory.getRegistry(uRL);
    }
}
providerUrl的例子:
dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&deprecated=false&dubbo=2.0.2
&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService
&methods=sayHello&pid=25662&release=&side=provider&timestamp=1571485618358

registryUrl的例子:
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider&dubbo=2.0.2
&export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=31349
&release=&side=provider&timestamp=1571498589827&pid=31349&timestamp=1571498589815
  • RegistryProtocol的export()方法内部执行核心三个步骤,获取Reigstry对象、执行register()、执行subscribe()。
  • providerUrl和registryUrl的内容例子如上图。
  • RegistryProtocol的getRegistry()方法通过registryFactory.getRegistry()返回ZookeeperRegistry对象。
  • registryFactory对象是RegistryFactory$Adaptive对象。
  • RegistryFactory$Adaptive的getRegistry()的URL参数协议是zookeeper,通过扩展返回的是ZookeeperRegistryFactory对象。
  • registryFactory.getRegistry(uRL)执行ZookeeperRegistryFactory.getRegistry()返回ZookeeperRegistry对象。
  • RegistryProtocol获取Reigstry对象是ZookeeperRegistry,执行ZookeeperRegistry的register()和subscribe()方法。
  • ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string)中string是"zookeeper",返回的ZookeeperRegistryFactory对象。


public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
}


public abstract class AbstractRegistryFactory implements RegistryFactory {

    public Registry getRegistry(URL url) {
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
}
  • ZookeeperRegistryFactory作为AbstractRegistryFactory的子类,父类AbstractRegistryFactory的getRegistry会调用子类的createRegistry()方法返回ZookeeperRegistry对象。
  • 执行ZookeeperRegistry的register()和subscribe()方法。


public class ZookeeperRegistry extends FailbackRegistry {

    private final static String DEFAULT_ROOT = "dubbo";
    private final String root;
    private final Set<String> anyServices = new ConcurrentHashSet<>();
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

    @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }


    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

}



public abstract class FailbackRegistry extends AbstractRegistry {

    public void register(URL url) {
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            doRegister(url);
        } catch (Exception e) {
        }
    }

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
           
        }
    }
}
  • ZookeeperRegistry是FailbackRegistry的子类,父类FailbackRegistry统一的register()和subscribe()入口,具体的实现在子类ZookeeperRegistry。
  • 子类ZookeeperRegistry的doRegister()和doSubscribe()是执行具体的注册和订阅工作。
  • ZookeeperRegistry的doRegister()核心是在zookeeper节点上创建zk临时节点。
  • ZookeeperRegistry的doSubscribe()过于复杂,在consumer端进行分析。


Registry类图

Registry

RegistryFactory

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
strcmp函数实现及分析
最近看C,看到strcmp函数,对它的实现原型不很清楚,于是到网上搜。网上算法一大堆,看了很多代码后自己做了一下总结  strcmp函数是C/C++中基本的函数,它对两个字符串进行比较,然后返回比较结果,函数形式如下:int strcmp(const char* str1, const char* str2); 其中str1和str2可以是字符串常量或者字符串变量,返回值为整形。
1603 0
微服务容错组件Hystrix设计分析
在分布式微服务场景下,由于各个业务服务的纵向拆分,加上通常会使用集群技术来保障业务服务的可靠性,由此导致了应用服务节点的爆炸式增长,服务节点的增多会导致出故障的概率也随之增加。如之前文章所阐述的,某个应用节点的不可用可能导致最终整个平台正常运行受影响,因此我们需要一些手段去应对这种异常情况。Hystrix正是一种专门针对微服务容错处理的基础组件,本文主要针对容错组件Hystrix进行设计分析,希望对大家有所裨益。
22 0
【集合框架】JDK1.8源码分析之TreeMap(五)
  当我们需要把插入的元素进行排序的时候,就是时候考虑TreeMap了,从名字上来看,TreeMap肯定是和树是脱不了干系的,它是一个排序了的Map,下面我们来着重分析其源码,理解其底层如何实现排序功能。下面,开始分析。
8 0
Spring、Struts2优点分析以及Spring MVC、Struts2优点比较
  Spring 及其优点   大部分项目都少不了Spring的身影,为什么大家对他如此青睐,而且对他的追捧丝毫没有减退之势呢Spring是什么:Spring是一个轻量级的DI和AOP容器框架。说它轻量级有一大部分原因是相对与EJB的(虽然本人从没有接触过EJB的应用),重要的是,Spring是非侵入式的,基于spring开发的应用一般不依赖于spring的类。DI:称作依赖注入(Dependency Injection),和控制反转一个概念,具体的讲,当一个角色需要另外一个角色协助的时候,在传统的程序设计中,通常有调用者来创建被调用者的实例。但是在spring中创建被调用者将不再有调用者完
41 0
+关注
晴天哥
专注java技术,热爱长跑和阅读开源代码 邮箱 lebron374@163.com
403
文章
0
问答
来源圈子
更多
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
OceanBase 入门到实战教程
立即下载
阿里云图数据库GDB,加速开启“图智”未来.ppt
立即下载
实时数仓Hologres技术实战一本通2.0版(下)
立即下载