Dubbo Diretory

简介: Cluster 文章中我们提及到会从 Directory 中获取 invoker 列表

网络异常,图片无法展示
|

网络异常,图片无法展示
|


Cluster 文章中我们提及到会从 Directory 中获取 invoker 列表

网络异常,图片无法展示
|


AbstractDirectory 没有封装什么特别的逻辑

protected RouterChain<T> routerChain;
复制代码


内部持有一个路由链

@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    return doList(invocation);
}
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
复制代码


实现接口的 list 接口、抽象方法 doList 给子类实现

先看简单的 StaticDirectory

在多注册中心的时候、我们会创建 StaticDirectory 保存

if (registryURL != null) { // registry url is available
    // for multi-subscription scenario, use 'zone-aware' policy by default
    URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
    // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
    invoker = CLUSTER.join(new StaticDirectory(invokers));
}
复制代码


这里要返回的是接口的类型、所以直接获取 invoker 列表的第一个、返回对应的接口就行了

public Class<T> getInterface() {
    return invokers.get(0).getInterface();
}
复制代码


这里就是返回对应的 invoker 列表、如果路由链不为 null、那么就根据路由配置筛选之后再返回 invoker 列表

@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    List<Invoker<T>> finalInvokers = invokers;
    if (routerChain != null) {
        try {
            finalInvokers = routerChain.route(getConsumerUrl(), invocation);
        } catch (Throwable t) {
            logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
        }
    }
    return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}
复制代码


RegistryDirectory 不单继承了 AbstractDirectory 还实现了接口 NotifyListener。因为它是动态更新服务提供者列表以及动态配置的、所以这里就涉及到订阅和监听

class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
复制代码

网络异常,图片无法展示
|


订阅和取消订阅

public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    registry.subscribe(url, this);
}
public void unSubscribe(URL url) {
    setConsumerUrl(null);
    CONSUMER_CONFIGURATION_LISTENER.removeNotifyListener(this);
    serviceConfigurationListener.stop();
    registry.unsubscribe(url, this);
}
复制代码


RegistryDiretory 自己也作为一个监听器传递过去

public synchronized void notify(List<URL> urls) {
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(this::judgeCategory));
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);
    // providers
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
     * 3.x added for extend URL address
     */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }
    refreshOverrideAndInvoker(providerURLs);
}
复制代码
private void refreshOverrideAndInvoker(List<URL> urls) {
    // mock zookeeper://xxx?mock=return null
    overrideDirectoryUrl();
    refreshInvoker(urls);
}
复制代码


private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");
    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        /**
         * If the calculation is wrong, it is not processed.
         *
         * 1. The protocol configured by the client is inconsistent with the protocol of the server.
         *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
         * 2. The registration center is not robust and pushes illegal specification data.
         *
         */
        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                    .toString()));
            return;
        }
        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}
复制代码


这个在消费者启动创建服务代理的时候曾经说过一下


对于 router 的擦不是参数、将其包装成路由规则、然后更新本地路由信息、会忽略 empty 开头的 URL


对于 configurator 类的参数、解释并根线本地的 Configurator 配置


对于 invoker 参数、如果是 empty 协议的 url、则会禁用该服务、并销毁本地缓存的 Invoker。将新的 URL 和本地的进行合并、找出差异的老 Invoker 并将其销毁。

目录
相关文章
|
11月前
|
Java 测试技术 Nacos
SpringCloud 整合Nacos config
SpringCloud 整合Nacos config
112 0
|
开发框架 分布式计算 负载均衡
SpringBoot整合Dubbo+Zookeeper
SpringBoot整合Dubbo+Zookeeper
721 0
SpringBoot整合Dubbo+Zookeeper
ZooKeeper启动dubbo-admin闪退问题解决
ZooKeeper启动dubbo-admin闪退问题解决
83 0
|
Dubbo 前端开发 Java
基于SpringBoot2.0.x+Dubbo2.6.x+zookeeper3.4.1x创建RPC服务
下图是dubbo官网给的图。在系统越来越庞大的情况下,分布式就显得尤为重要了,应用之间交互是不可避免的,将核心业务抽取出来,作为独立的服务,然后逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。
117 0
|
Java 测试技术 Nacos
Springcloud之Nacos Config服务配置
Springcloud之Nacos Config服务配置
|
Dubbo Java 应用服务中间件
JMeter Dubbo请求插件jmeter-plugin-dubbo.jar
JMeter Dubbo请求插件jmeter-plugin-dubbo.jar
227 0
|
存储 Java Apache
SpringCloud Config 分布式配置
SpringCloud Config 分布式配置
91 1
SpringCloud Config 分布式配置
|
监控 前端开发 JavaScript
dubbo-admin_2.6.x 安装使用
dubbo-admin_2.6.x 安装使用
136 0
dubbo-admin_2.6.x 安装使用
|
JavaScript 前端开发 Dubbo
dubbo-admin安装
dubbo-admin 是一个前后端分离的项目。前端使用vue,后端使用springboot,安装 dubbo-admin 其实就是部署该项目。
|
缓存 负载均衡 监控
Dubbo 常用配置
Dubbo 常用配置
186 0