Dubbo Consumer 服务订阅过程

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 开篇 整个Dubbo Consumer的引用过程比较复杂,这部分的文章会比较多,这篇文章的目的是描述Consumer的订阅过程,侧重于Consumer发现Provider的URL并生成对应的invoker的过程。

开篇

 整个Dubbo Consumer的引用过程比较复杂,这部分的文章会比较多,这篇文章的目的是描述Consumer的订阅过程,侧重于Consumer发现Provider的URL并生成对应的invoker的过程。

 在这篇文章中,主要分为两个部分讲解,第一部分RegistryProtocol的refer过程侧重于描述RegistryProtocol的流程,ZookeeperRegistry的subscribe过程侧重于服务订阅过程。

RegistryProtocol的refer过程

  • RegistryProtocol调用refer() => doRefer()方法。
  • RegistryProtocol的registry是ZookeeperRegistry对象。
public class RegistryProtocol implements Protocol {

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
        // application=dubbo-demo-api-consumer&dubbo=2.0.2&pid=58968
        // &refer=application=dubbo-demo-api-consumer&dubbo=2.0.2 
        // &interface=org.apache.dubbo.demo.DemoService
        // &lazy=false&methods=sayHello&pid=58968
        // &register.ip=172.17.32.176&side=consumer&sticky=false
        // &timestamp=1571824631224&registry=zookeeper&timestamp=1571824632206
        url = URLBuilder.from(url)
                .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                .removeParameter(REGISTRY_KEY)
                .build();
        // registry为ZookeeperRegistry对象
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }

        // 执行doRefer动作
        return doRefer(cluster, registry, type, url);
    }


  • RegistryProtocol的doRefer()执行一系列核心步骤,在代码中都备有注释。
  • 创建RegistryDirectory对象。
  • 生成服务消费者链接并在consumers目录下新增节点注册服务消费者。
  • 创建路由规则链。
  • 订阅 providers、configurators、routers 等节点数据。
  • 将多个服务提供者合并为一个invoker。
  • 重点关注订阅 providers、configurators、routers 等节点数据的流程
public class RegistryProtocol implements Protocol {

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建RegistryDirectory实例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // 设置注册中心
        directory.setRegistry(registry);
        // 设置协议
        directory.setProtocol(protocol);

        // 所有属性放到map中
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        
        // 生成服务消费者链接
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);

        // 注册服务消费者,在 consumers 目录下新节点
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            registry.register(directory.getRegisteredConsumerUrl());
        }

        // 创建路由规则链
        directory.buildRouterChain(subscribeUrl);

        //  订阅 providers、configurators、routers 等节点数据
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

        // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个,生成一个invoker
        Invoker invoker = cluster.join(directory);
        // 在服务提供者处注册消费者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
}


  • RegistryDirectory中的registry是ZookeeperRegistry对象。
  • ZookeeperRegistry中subscribe(url, this)方法中将RegistryDirectory对象作为NotifyListener参数。
  • 执行ZookeeperRegistry对象的subscribe()方法,至此进入了服务订阅过程
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    public void subscribe(URL url) {
        setConsumerUrl(url);
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        // registry为ZookeeperRegistry对象。
        registry.subscribe(url, this);
    }
}


ZookeeperRegistry的subscribe过程

  • ZookeeperRegistry的整个subscribe过程涉及到ZookeeperRegistry、FailbackRegistry 、AbstractRegistry这几个类,整个订阅过程如下图所示。
  • 从图中可以发现父类FailbackRegistry作为执行subscribe()的入口,真正执行的是ZookeeperRegistry的doSubscribe()方法。

ZookeeperRegistry订阅时序图


  • ZookeeperRegistry、FailbackRegistry 、AbstractRegistry三者的类关系如下图,涉及到父类子类具体实现方法的调用。

ZookeeperRegistry


  • 从ZookeeperRegistry订阅时序图可以看出来订阅首先执行FailbackRegistry的subscribe()方法。
  • FailbackRegistry的subscribe()方法调用子类的ZookeeperRegistry的doSubscribe()方法。
public abstract class FailbackRegistry extends AbstractRegistry {

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // 执行订阅操作
            doSubscribe(url, listener);
        } catch (Exception e) {
        }
    }
}


  • ZookeeperRegistry的doSubscribe()主要执行订阅操作,核心和zookeeper的特性比较相关。
  • 获取待订阅的path信息,包括providers,configurators,routers三类路径。
  • 针对每个待订阅的path,会针对path增加children维度的监听。
  • 针对每个待订阅的path,会一次性获取children的变量并进入notify()流程。
  • zkListeners先根据consumer的URL维度 和 listener维度唯一确定Zookeeper子节点监听器ChildListener对象。
  • ZookeeperRegistry的path的子节点监听器ChildListener回调中执行的ZookeeperRegistry.this.notify()方法。
  • 针对处理path下的children的URL路径会通过toUrlsWithEmpty()方法进行匹配,获取匹配的URL进行处理。
  • 首次获取及后续回调都是执行ZookeeperRegistry.this.notify()方法。
  • notify()的urls是指订阅path下所有符合要求的urls,通过toUrlsWithEmpty()进行匹配。
public class ZookeeperRegistry extends FailbackRegistry {

    // 变量url的值
    // consumer://172.17.32.176/org.apache.dubbo.demo.DemoService?
    // application=dubbo-demo-api-consumer&category=providers,configurators,routers&dubbo=2.0.2
    // &interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello
    // &pid=58968&side=consumer&sticky=false&timestamp=1571824631224
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                // 暂时不关注这部分逻辑
            } else {
                List<URL> urls = new ArrayList<>();
                // 处理providers、configurators、routers等路径
                // /dubbo/org.apache.dubbo.demo.DemoService/providers
                // /dubbo/org.apache.dubbo.demo.DemoService/configurators
                // /dubbo/org.apache.dubbo.demo.DemoService/routers
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        // 创建zk节点变化回调监听器
                        listeners.putIfAbsent(listener, 
                        (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(
                                url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    // 创建path对应的节点
                    zkClient.create(path, false);
                    // 添加path下的children的监听
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    // 处理path下的children
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }

                // 通知回调notify动作
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
        }
    }
}


  • ZookeeperRegistry.notify()方法会调用父类FailbackRegistry.notify()方法进而执行doNotify()方法。
  • FailbackRegistry.doNotify()方法会调用父类AbstractRegistry.notify()。
public abstract class FailbackRegistry extends AbstractRegistry {
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {

        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            addFailedNotified(url, listener, urls);
        }
    }


    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        super.notify(url, listener, urls);
    }
}


  • AbstractRegistry的notify()的urls是指订阅path下所有符合要求的urls。
  • 将所有的urls按照url的category包括(providers,configurators,routers)三类。
  • 将所有的urls按照分组进行回调处理。
  • 回调的listener为RegistryDirectory对象,实现了NotifyListener接口。
public abstract class AbstractRegistry implements Registry {

    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            // 符合要求的URL按照category作为分组key的Map。
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            // 同一类category的URL进行回调,譬如providers的URL一并进行回调。
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            // 调用监听回调
            listener.notify(categoryList);
            // 保存URL信息
            saveProperties(url);
        }
    }
}


RegistryDirectory的notify过程

  • 按照configurators、routers、providers组装成List对象。
  • 针对configurators执行toConfigurators()动作。
  • 针对routers执行toRouters()动作。
  • 针对providers执行refreshOverrideAndInvoker()动作。
  • 每个细节后面分篇幅继续讲解。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(url -> {
                    if (UrlUtils.isConfigurator(url)) {
                        return CONFIGURATORS_CATEGORY;
                    } else if (UrlUtils.isRoute(url)) {
                        return ROUTERS_CATEGORY;
                    } else if (UrlUtils.isProvider(url)) {
                        return PROVIDERS_CATEGORY;
                    }
                    return "";
                }));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        refreshOverrideAndInvoker(providerURLs);
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
JSON Dubbo Java
【Dubbo协议指南】揭秘高性能服务通信,选择最佳协议的终极攻略!
【8月更文挑战第24天】在分布式服务架构中,Apache Dubbo作为一款高性能的Java RPC框架,支持多种通信协议,包括Dubbo协议、HTTP协议及Hessian协议等。Dubbo协议是默认选择,采用NIO异步通讯,适用于高要求的内部服务通信。HTTP协议通用性强,利于跨语言调用;Hessian协议则在数据传输效率上有优势。选择合适协议需综合考虑性能需求、序列化方式、网络环境及安全性等因素。通过合理配置,可实现服务性能最优化及系统可靠性提升。
45 3
|
2月前
|
缓存 Dubbo Java
Dubbo服务消费者启动与订阅原理
该文章主要介绍了Dubbo服务消费者启动与订阅的原理,包括服务消费者的启动时机、启动过程以及订阅和感知最新提供者信息的方式。
Dubbo服务消费者启动与订阅原理
|
2月前
|
Dubbo 网络协议 Java
深入掌握Dubbo服务提供者发布与注册原理
该文章主要介绍了Dubbo服务提供者发布与注册的原理,包括服务发布的流程、多协议发布、构建Invoker、注册到注册中心等过程。
深入掌握Dubbo服务提供者发布与注册原理
|
2月前
|
负载均衡 Dubbo Java
Dubbo服务Spi机制和原理
该文章主要介绍了Dubbo中的SPI(Service Provider Interface)机制和原理,包括SPI的基本概念、Dubbo中的SPI分类以及SPI机制的实现细节。
Dubbo服务Spi机制和原理
|
2月前
|
C# 开发者 Windows
勇敢迈出第一步:手把手教你如何在WPF开源项目中贡献你的第一行代码,从选择项目到提交PR的全过程解析与实战技巧分享
【8月更文挑战第31天】本文指导您如何在Windows Presentation Foundation(WPF)相关的开源项目中贡献代码。无论您是初学者还是有经验的开发者,参与这类项目都能加深对WPF框架的理解并拓展职业履历。文章推荐了一些适合入门的项目如MvvmLight和MahApps.Metro,并详细介绍了从选择项目、设置开发环境到提交代码的全过程。通过具体示例,如添加按钮点击事件处理程序,帮助您迈出第一步。此外,还强调了提交Pull Request时保持专业沟通的重要性。参与开源不仅能提升技能,还能促进社区交流。
32 0
|
2月前
|
缓存 负载均衡 Dubbo
Dubbo服务集群容错原理(重要)
该文章主要介绍了Dubbo服务集群容错的原理,包括集群容错技术的概念、Dubbo中使用的集群容错技术种类及其原理。
|
2月前
|
负载均衡 Dubbo 算法
Dubbo服务负载均衡原理
该文章主要介绍了Dubbo服务负载均衡的原理,包括Dubbo中负载均衡的实现位置、为什么需要负载均衡机制、Dubbo支持的负载均衡算法以及随机负载均衡策略的源码分析。
|
5月前
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
2月前
|
Dubbo Java 应用服务中间件
💥Spring Cloud Dubbo火爆来袭!微服务通信的终极利器,你知道它有多强大吗?🔥
【8月更文挑战第29天】随着信息技术的发展,微服务架构成为企业应用开发的主流模式,而高效的微服务通信至关重要。Spring Cloud Dubbo通过整合Dubbo与Spring Cloud的优势,提供高性能RPC通信及丰富的生态支持,包括服务注册与发现、负载均衡和容错机制等,简化了服务调用管理并支持多种通信协议,提升了系统的可伸缩性和稳定性,成为微服务通信领域的优选方案。开发者仅需关注业务逻辑,而无需过多关心底层通信细节,使得Spring Cloud Dubbo在未来微服务开发中将更加受到青睐。
69 0
|
11天前
|
Dubbo 应用服务中间件 Apache
Star 4w+,Apache Dubbo 3.3 全新发布,Triple X 领衔,开启微服务通信新时代
在 Apache Dubbo 突破 4w Star 之际,Apache Dubbo 团队正式宣布,Dubbo 3.3 正式发布!作为全球领先的开源微服务框架,Dubbo 一直致力于为开发者提供高性能、可扩展且灵活的分布式服务解决方案。此次发布的 Dubbo 3.3,通过 Triple X 的全新升级,突破了以往局限,实现了对南北向与东西向流量的全面支持,并提升了对云原生架构的友好性。
下一篇
无影云桌面