从源码全面解析 dubbo 服务订阅的来龙去脉

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 从源码全面解析 dubbo 服务订阅的来龙去脉

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、消费者订阅服务

读过我们上一篇:从源码全面解析 dubbo 注解配置的来龙去脉 的文章的朋友,我们当时留了一个 EnableDubboConfig 注解里面的 ReferenceAnnotationBeanPostProcessor 方法

1、消费端配置

@DubboReference(protocol = "dubbo", timeout = 100)
private IUserService iUserService;

从这个配置我们可以得出一个信息,Spring 不会自动将 IUserService 注入 Bean 工厂中

当然这句话也是一个废话,人家 Dubbo 自定义的注解,Spring 怎么可能扫描到…

ReferenceAnnotationBeanPostProcessor 这个方法是消费端扫描 @Reference 使用的

本篇将正式的介绍下消费端是如何订阅我们服务端注册在 Zookeeper 上的服务的

2、扫描注解

我们先看这个类的实现:

public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor
    implements ApplicationContextAware, BeanFactoryPostProcessor {}

实现了 BeanFactoryPostProcessor 接口,这个时候如果看过博主的 Spring 的源码系列文章,DNA 应该已经开始活动了

没错,基本上这个接口就是为了往我们的 BeanDefinitionMap 里面注册 BeanDefinition 信息的

想必到这里,就算我们不看源码,也能猜到

这个哥们绝对是将 @DubboReference 的注解扫描封装成 BeanDefinition 注册至 BeanDefinitionMap

我们直接看源码

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory){
    // 拿到当前Spring工厂所有的bean名称
    String[] beanNames = beanFactory.getBeanDefinitionNames();
    for (String beanName : beanNames) {
        // 获取bean的类型
        beanType = beanFactory.getType(beanName);
        // 省略一些代码
        if (beanType != null) {
            // 获取元数据信息
            AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null);
            // 解析@DubboReference注解并注册至BeanDefinitionMap中
            prepareInjection(metadata);
        }
    }
}

我们详细看看这个 prepareInjection 方法是如何解析的 @DubboReference 注解并做了一些什么特殊的操作

protected void prepareInjection(AnnotatedInjectionMetadata metadata) throws BeansException {
    for (AnnotatedFieldElement fieldElement : metadata.getFieldElements()) {
        Class<?> injectedType = fieldElement.field.getType();
        // 配置的参数
        AnnotationAttributes attributes = fieldElement.attributes;
        // 解析&注册
        String referenceBeanName = registerReferenceBean(fieldElement.getPropertyName(), injectedType, attributes, fieldElement.field);
        }
    }
}

这个 registerReferenceBean 里面的逻辑较多,我们只取最关键的,感兴趣的朋友也可以自己去看一看

public String registerReferenceBean(String propertyName, Class<?> injectedType, Map<String, Object> attributes, Member member){
      RootBeanDefinition beanDefinition = new RootBeanDefinition();
      // ReferenceBean.class.getName() = org.apache.dubbo.config.spring.ReferenceBean
        beanDefinition.setBeanClassName(ReferenceBean.class.getName());
        beanDefinition.getPropertyValues().add(ReferenceAttributes.ID, referenceBeanName);
      // referenceProps = 配置的信息
      // interfaceName = com.common.service.IUserService
      // interfaceClass = interface com.common.service.IUserService
        beanDefinition.setAttribute(Constants.REFERENCE_PROPS, attributes);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_CLASS, interfaceClass);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_NAME, interfaceName);
        GenericBeanDefinition targetDefinition = new GenericBeanDefinition();
        targetDefinition.setBeanClass(interfaceClass);
        beanDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, referenceBeanName + "_decorated"));
        beanDefinition.setAttribute(Constants.OBJECT_TYPE_ATTRIBUTE, interfaceClass);
      // 注册至BeanDefinitionMap中
        beanDefinitionRegistry.registerBeanDefinition(referenceBeanName, beanDefinition);
        referenceBeanManager.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName);
        return referenceBeanName;
}

到这里,我们的 ReferenceAnnotationBeanPostProcessor 方法将 @DubboReference 扫描组装成 BeanDefinition 注册到了 BeanDefinitionMap

3、创建代理对象

在我们上面注册的时候,有这么一行代码:

beanDefinition.setBeanClassName(ReferenceBean.class.getName());

表明我们当前注册的 BeanClass 类型为 org.apache.dubbo.config.spring.ReferenceBean

当我们的 Spring 去实例化 BeanDefinitionMap 中的对象时,这个时候会调用 ReferenceBeangetObject 方法

Spring 在实例化时会获取每一个 BeanDefinition 的 Object,不存在则创建

我们发现,在 ReferenceBean 里面实际上是重写了 getObject 的方法:

public T getObject() {
    if (lazyProxy == null) {
        createLazyProxy();
    }
    return (T) lazyProxy;
}
private void createLazyProxy() {
    // 创建代理对象
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    proxyFactory.addInterface(interfaceClass);
    Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
    for (Class<?> anInterface : internalInterfaces) {
        proxyFactory.addInterface(anInterface);
    }
    // 进行动态代理(生成动态代理的对象)
    // 这里动态代理用的是JdkDynamicAopProxy
    this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}

我们看下 JdkDynamicAopProxy 里面做了什么?

final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
    // proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    // 这里的targetSource = DubboReferenceLazyInitTargetSource
    TargetSource targetSource = this.advised.targetSource;
    Object target = targetSource.getTarget()
}
public synchronized Object getTarget() throws Exception {
   // 第一次为null,未初始化
   if (this.lazyTarget == null) {
      logger.debug("Initializing lazy target object");
      this.lazyTarget = createObject();
   }
   return this.lazyTarget;
}

我们看下 DubboReferenceLazyInitTargetSourcecreateObject

// 第一次调用时,会初始化该方法
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}

这个 getCallProxy 就是我们订阅服务的地方

4、订阅服务

当我们初始化完毕之后,在我们第一次调用的时候,会调用 getCallProxy() 该方法,去进行服务的订阅,这里会执行该方法:

private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}
private Object getCallProxy() throws Exception {
    synchronized(((DefaultSingletonBeanRegistry)getBeanFactory()).getSingletonMutex()) {
        // 获取reference
        return referenceConfig.get();
    }
}

我们继续向下看,这里会来到 ReferenceConfiginit 方法

protected synchronized void init() {
    ref = createProxy(referenceParameters);
}
private T createProxy(Map<String, String> referenceParameters) {
    // 1、监听注册中心
    // 2、本地保存服务
    createInvokerForRemote();
    URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
            referenceParameters.get(INTERFACE_KEY), referenceParameters);
    consumerUrl = consumerUrl.setScopeModel(getScopeModel());
    consumerUrl = consumerUrl.setServiceModel(consumerModel);
    MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
    // 创建代理类
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
4.1 监听注册中心

private void createInvokerForRemote() {
    if (urls.size() == 1) {
        // URL:
        // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=2532&qos.enable=true&registry=zookeeper&release=3.1.8&timestamp=1686063555583
        URL curUrl = urls.get(0);
        invoker = protocolSPI.refer(interfaceClass, curUrl);
    }
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=14952&qos.enable=true&release=3.1.8&timestamp=1686063658064
    url = getRegistryUrl(url);
    Registry registry = getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }
    // group="a,b" or group="*"
    Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
    String group = qs.get(GROUP_KEY);
    if (StringUtils.isNotEmpty(group)) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
        }
    }
    Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
    return doRefer(cluster, registry, type, url, qs);
}

这里的 QS 如下:

我们直接跳到 MigrationRuleHandlerdoMigrate 中:

public synchronized void doMigrate(MigrationRule rule) {
    MigrationStep step = MigrationStep.APPLICATION_FIRST;
    float threshold = -1f;
    step = rule.getStep(consumerURL);
    threshold = rule.getThreshold(consumerURL);
    if (refreshInvoker(step, threshold, rule)) {
        setMigrationRule(rule);
    }
}

在这个 refreshInvoker 里面,会判断当前注册的方式

private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
        MigrationStep originStep = currentStep;
        if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
            boolean success = true;
            switch (step) {
                // 接口&应用
                case APPLICATION_FIRST:
                    migrationInvoker.migrateToApplicationFirstInvoker(newRule);
                    break;
                // 应用
                case FORCE_APPLICATION:
                    success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
                    break;
                // 接口
                case FORCE_INTERFACE:
                default:
                    success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
            }
            return success;
        }
        return true;
    }

我们本次只讲 接口注册,我们直接跳到:ZookeeperRegistrydoSubscribe 方法

public void doSubscribe(final URL url, final NotifyListener listener) {
    List<URL> urls = new ArrayList<>();
    for (String path : toCategoriesPath(url)) {
        // 创建目录
        zkClient.create(path, false, true);
        // 增加监听
        // 1、/dubbo/com.msb.common.service.IUserService/providers
        // 2、/dubbo/com.msb.common.service.IUserService/configurators
        // 3、/dubbo/com.msb.common.service.IUserService/routers
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    // 将Zookeeper服务保存
    notify(url, listener, urls);
}
4.2 本地保存服务

直接跳到 AbstractRegistrydoSaveProperties 方法

  • 创建文件
  • 将服务端数据存入文件中
public void doSaveProperties(long version) {
    File lockfile = null;
    // 创建文件
    lockfile = new File(file.getAbsolutePath() + ".lock");
    tmpProperties = new Properties();
    Set<Map.Entry<Object, Object>> entries = properties.entrySet();
    for (Map.Entry<Object, Object> entry : entries) {
        tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
    }
    try (FileOutputStream outputFile = new FileOutputStream(file)) {
        tmpProperties.store(outputFile, "Dubbo Registry Cache");
    }
}

这里存储的数据如下:简单理解,各种服务端的信息

com.common.service.IUserService -> empty://192.168.0.103/com.common.service.IUserService?application=dubbo-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false empty://192.168.0.103/com.msb.common.service.IUserService?application=dubbo-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false dubbo://192.168.0.103:20883/com.msb.common.service.IUserService?anyhost=true&application=dubbo-provider&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.msb.common.service.IUserService&metadata-type=remote&methods=getUserById&register-mode=inter

如果后续我们的注册中心(Zookeeper)挂掉之后,我们的系统从本地磁盘读取服务信息也可以正常通信。

只是没有办法及时更新服务

4.3 创建动态代理类

private T createProxy(Map<String, String> referenceParameters) {
    // 省略代码
    // create service proxy
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

这里我们直接跳到 JavassistProxyFactorygetProxy 方法

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

这里直接创建代理类,当我们去调用 InvokerInvocationHandler 这个方法

至于为什么要调用 InvokerInvocationHandler ,大家可以看下之前写的动态代理文章:2023年再不会动态代理,就要被淘汰了

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
        if (serviceModel instanceof ConsumerModel) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
            rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
        }
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }

三、流程图

  • 原图可私信获取

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。


相关文章
|
10天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
10天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
29天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
11天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
8月前
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
3月前
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
80 2
|
5月前
|
Dubbo Java 应用服务中间件
💥Spring Cloud Dubbo火爆来袭!微服务通信的终极利器,你知道它有多强大吗?🔥
【8月更文挑战第29天】随着信息技术的发展,微服务架构成为企业应用开发的主流模式,而高效的微服务通信至关重要。Spring Cloud Dubbo通过整合Dubbo与Spring Cloud的优势,提供高性能RPC通信及丰富的生态支持,包括服务注册与发现、负载均衡和容错机制等,简化了服务调用管理并支持多种通信协议,提升了系统的可伸缩性和稳定性,成为微服务通信领域的优选方案。开发者仅需关注业务逻辑,而无需过多关心底层通信细节,使得Spring Cloud Dubbo在未来微服务开发中将更加受到青睐。
92 0
|
2月前
|
Dubbo Cloud Native 应用服务中间件
阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。
在云原生时代,微服务架构成为主流。阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。示例代码展示了如何在项目中实现两者的整合,通过 Nacos 动态调整服务状态和配置,适应多变的业务需求。
51 2
|
3月前
|
Dubbo Java 应用服务中间件
Dubbo学习圣经:从入门到精通 Dubbo3.0 + SpringCloud Alibaba 微服务基础框架
尼恩团队的15大技术圣经,旨在帮助开发者系统化、体系化地掌握核心技术,提升技术实力,从而在面试和工作中脱颖而出。本文介绍了如何使用Dubbo3.0与Spring Cloud Gateway进行整合,解决传统Dubbo架构缺乏HTTP入口的问题,实现高性能的微服务网关。

推荐镜像

更多