从源码全面解析 dubbo 服务订阅的来龙去脉

简介: 从源码全面解析 dubbo 服务订阅的来龙去脉

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、消费者订阅服务

读过我们上一篇:从源码全面解析 dubbo 注解配置的来龙去脉 的文章的朋友,我们当时留了一个 EnableDubboConfig 注解里面的 ReferenceAnnotationBeanPostProcessor 方法

1、消费端配置

@DubboReference(protocol = "dubbo", timeout = 100)
private IUserService iUserService;

从这个配置我们可以得出一个信息,Spring 不会自动将 IUserService 注入 Bean 工厂中

当然这句话也是一个废话,人家 Dubbo 自定义的注解,Spring 怎么可能扫描到…

ReferenceAnnotationBeanPostProcessor 这个方法是消费端扫描 @Reference 使用的

本篇将正式的介绍下消费端是如何订阅我们服务端注册在 Zookeeper 上的服务的

2、扫描注解

我们先看这个类的实现:

public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor
    implements ApplicationContextAware, BeanFactoryPostProcessor {}

实现了 BeanFactoryPostProcessor 接口,这个时候如果看过博主的 Spring 的源码系列文章,DNA 应该已经开始活动了

没错,基本上这个接口就是为了往我们的 BeanDefinitionMap 里面注册 BeanDefinition 信息的

想必到这里,就算我们不看源码,也能猜到

这个哥们绝对是将 @DubboReference 的注解扫描封装成 BeanDefinition 注册至 BeanDefinitionMap

我们直接看源码

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory){
    // 拿到当前Spring工厂所有的bean名称
    String[] beanNames = beanFactory.getBeanDefinitionNames();
    for (String beanName : beanNames) {
        // 获取bean的类型
        beanType = beanFactory.getType(beanName);
        // 省略一些代码
        if (beanType != null) {
            // 获取元数据信息
            AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null);
            // 解析@DubboReference注解并注册至BeanDefinitionMap中
            prepareInjection(metadata);
        }
    }
}

我们详细看看这个 prepareInjection 方法是如何解析的 @DubboReference 注解并做了一些什么特殊的操作

protected void prepareInjection(AnnotatedInjectionMetadata metadata) throws BeansException {
    for (AnnotatedFieldElement fieldElement : metadata.getFieldElements()) {
        Class<?> injectedType = fieldElement.field.getType();
        // 配置的参数
        AnnotationAttributes attributes = fieldElement.attributes;
        // 解析&注册
        String referenceBeanName = registerReferenceBean(fieldElement.getPropertyName(), injectedType, attributes, fieldElement.field);
        }
    }
}

这个 registerReferenceBean 里面的逻辑较多,我们只取最关键的,感兴趣的朋友也可以自己去看一看

public String registerReferenceBean(String propertyName, Class<?> injectedType, Map<String, Object> attributes, Member member){
      RootBeanDefinition beanDefinition = new RootBeanDefinition();
      // ReferenceBean.class.getName() = org.apache.dubbo.config.spring.ReferenceBean
        beanDefinition.setBeanClassName(ReferenceBean.class.getName());
        beanDefinition.getPropertyValues().add(ReferenceAttributes.ID, referenceBeanName);
      // referenceProps = 配置的信息
      // interfaceName = com.common.service.IUserService
      // interfaceClass = interface com.common.service.IUserService
        beanDefinition.setAttribute(Constants.REFERENCE_PROPS, attributes);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_CLASS, interfaceClass);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_NAME, interfaceName);
        GenericBeanDefinition targetDefinition = new GenericBeanDefinition();
        targetDefinition.setBeanClass(interfaceClass);
        beanDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, referenceBeanName + "_decorated"));
        beanDefinition.setAttribute(Constants.OBJECT_TYPE_ATTRIBUTE, interfaceClass);
      // 注册至BeanDefinitionMap中
        beanDefinitionRegistry.registerBeanDefinition(referenceBeanName, beanDefinition);
        referenceBeanManager.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName);
        return referenceBeanName;
}

到这里,我们的 ReferenceAnnotationBeanPostProcessor 方法将 @DubboReference 扫描组装成 BeanDefinition 注册到了 BeanDefinitionMap

3、创建代理对象

在我们上面注册的时候,有这么一行代码:

beanDefinition.setBeanClassName(ReferenceBean.class.getName());

表明我们当前注册的 BeanClass 类型为 org.apache.dubbo.config.spring.ReferenceBean

当我们的 Spring 去实例化 BeanDefinitionMap 中的对象时,这个时候会调用 ReferenceBeangetObject 方法

Spring 在实例化时会获取每一个 BeanDefinition 的 Object,不存在则创建

我们发现,在 ReferenceBean 里面实际上是重写了 getObject 的方法:

public T getObject() {
    if (lazyProxy == null) {
        createLazyProxy();
    }
    return (T) lazyProxy;
}
private void createLazyProxy() {
    // 创建代理对象
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    proxyFactory.addInterface(interfaceClass);
    Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
    for (Class<?> anInterface : internalInterfaces) {
        proxyFactory.addInterface(anInterface);
    }
    // 进行动态代理(生成动态代理的对象)
    // 这里动态代理用的是JdkDynamicAopProxy
    this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}

我们看下 JdkDynamicAopProxy 里面做了什么?

final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
    // proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    // 这里的targetSource = DubboReferenceLazyInitTargetSource
    TargetSource targetSource = this.advised.targetSource;
    Object target = targetSource.getTarget()
}
public synchronized Object getTarget() throws Exception {
   // 第一次为null,未初始化
   if (this.lazyTarget == null) {
      logger.debug("Initializing lazy target object");
      this.lazyTarget = createObject();
   }
   return this.lazyTarget;
}

我们看下 DubboReferenceLazyInitTargetSourcecreateObject

// 第一次调用时,会初始化该方法
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}

这个 getCallProxy 就是我们订阅服务的地方

4、订阅服务

当我们初始化完毕之后,在我们第一次调用的时候,会调用 getCallProxy() 该方法,去进行服务的订阅,这里会执行该方法:

private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}
private Object getCallProxy() throws Exception {
    synchronized(((DefaultSingletonBeanRegistry)getBeanFactory()).getSingletonMutex()) {
        // 获取reference
        return referenceConfig.get();
    }
}

我们继续向下看,这里会来到 ReferenceConfiginit 方法

protected synchronized void init() {
    ref = createProxy(referenceParameters);
}
private T createProxy(Map<String, String> referenceParameters) {
    // 1、监听注册中心
    // 2、本地保存服务
    createInvokerForRemote();
    URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
            referenceParameters.get(INTERFACE_KEY), referenceParameters);
    consumerUrl = consumerUrl.setScopeModel(getScopeModel());
    consumerUrl = consumerUrl.setServiceModel(consumerModel);
    MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
    // 创建代理类
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
4.1 监听注册中心

private void createInvokerForRemote() {
    if (urls.size() == 1) {
        // URL:
        // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=2532&qos.enable=true&registry=zookeeper&release=3.1.8&timestamp=1686063555583
        URL curUrl = urls.get(0);
        invoker = protocolSPI.refer(interfaceClass, curUrl);
    }
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=14952&qos.enable=true&release=3.1.8&timestamp=1686063658064
    url = getRegistryUrl(url);
    Registry registry = getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }
    // group="a,b" or group="*"
    Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
    String group = qs.get(GROUP_KEY);
    if (StringUtils.isNotEmpty(group)) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
        }
    }
    Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
    return doRefer(cluster, registry, type, url, qs);
}

这里的 QS 如下:

我们直接跳到 MigrationRuleHandlerdoMigrate 中:

public synchronized void doMigrate(MigrationRule rule) {
    MigrationStep step = MigrationStep.APPLICATION_FIRST;
    float threshold = -1f;
    step = rule.getStep(consumerURL);
    threshold = rule.getThreshold(consumerURL);
    if (refreshInvoker(step, threshold, rule)) {
        setMigrationRule(rule);
    }
}

在这个 refreshInvoker 里面,会判断当前注册的方式

private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
        MigrationStep originStep = currentStep;
        if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
            boolean success = true;
            switch (step) {
                // 接口&应用
                case APPLICATION_FIRST:
                    migrationInvoker.migrateToApplicationFirstInvoker(newRule);
                    break;
                // 应用
                case FORCE_APPLICATION:
                    success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
                    break;
                // 接口
                case FORCE_INTERFACE:
                default:
                    success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
            }
            return success;
        }
        return true;
    }

我们本次只讲 接口注册,我们直接跳到:ZookeeperRegistrydoSubscribe 方法

public void doSubscribe(final URL url, final NotifyListener listener) {
    List<URL> urls = new ArrayList<>();
    for (String path : toCategoriesPath(url)) {
        // 创建目录
        zkClient.create(path, false, true);
        // 增加监听
        // 1、/dubbo/com.msb.common.service.IUserService/providers
        // 2、/dubbo/com.msb.common.service.IUserService/configurators
        // 3、/dubbo/com.msb.common.service.IUserService/routers
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    // 将Zookeeper服务保存
    notify(url, listener, urls);
}
4.2 本地保存服务

直接跳到 AbstractRegistrydoSaveProperties 方法

  • 创建文件
  • 将服务端数据存入文件中
public void doSaveProperties(long version) {
    File lockfile = null;
    // 创建文件
    lockfile = new File(file.getAbsolutePath() + ".lock");
    tmpProperties = new Properties();
    Set<Map.Entry<Object, Object>> entries = properties.entrySet();
    for (Map.Entry<Object, Object> entry : entries) {
        tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
    }
    try (FileOutputStream outputFile = new FileOutputStream(file)) {
        tmpProperties.store(outputFile, "Dubbo Registry Cache");
    }
}

这里存储的数据如下:简单理解,各种服务端的信息

com.common.service.IUserService -> empty://192.168.0.103/com.common.service.IUserService?application=dubbo-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false empty://192.168.0.103/com.msb.common.service.IUserService?application=dubbo-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false dubbo://192.168.0.103:20883/com.msb.common.service.IUserService?anyhost=true&application=dubbo-provider&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.msb.common.service.IUserService&metadata-type=remote&methods=getUserById&register-mode=inter

如果后续我们的注册中心(Zookeeper)挂掉之后,我们的系统从本地磁盘读取服务信息也可以正常通信。

只是没有办法及时更新服务

4.3 创建动态代理类

private T createProxy(Map<String, String> referenceParameters) {
    // 省略代码
    // create service proxy
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

这里我们直接跳到 JavassistProxyFactorygetProxy 方法

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

这里直接创建代理类,当我们去调用 InvokerInvocationHandler 这个方法

至于为什么要调用 InvokerInvocationHandler ,大家可以看下之前写的动态代理文章:2023年再不会动态代理,就要被淘汰了

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
        if (serviceModel instanceof ConsumerModel) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
            rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
        }
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }

三、流程图

  • 原图可私信获取

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。


相关文章
|
弹性计算 运维 安全
优化管理与服务:操作系统控制平台的订阅功能解析
本文介绍了如何通过操作系统控制平台提升系统效率,优化资源利用。首先,通过阿里云官方平台开通服务并安装SysOM组件,体验操作系统控制平台的功能。接着,详细讲解了订阅管理功能,包括创建订阅、查看和管理ECS实例的私有YUM仓库权限。订阅私有YUM仓库能够集中管理软件包版本、提升安全性,并提供灵活的配置选项。最后总结指出,使用阿里云的订阅和私有YUM仓库功能,可以提高系统可靠性和运维效率,确保业务顺畅运行。
|
10月前
|
网络协议 安全 Devops
Infoblox DDI (NIOS) 9.0 - DNS、DHCP 和 IPAM (DDI) 核心网络服务管理
Infoblox DDI (NIOS) 9.0 - DNS、DHCP 和 IPAM (DDI) 核心网络服务管理
419 4
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1174 29
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
490 4
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
Dubbo Java 应用服务中间件
Spring Cloud Dubbo:微服务通信的高效解决方案
【10月更文挑战第15天】随着信息技术的发展,微服务架构成为企业应用开发的主流。Spring Cloud Dubbo结合了Dubbo的高性能RPC和Spring Cloud的生态系统,提供高效、稳定的微服务通信解决方案。它支持多种通信协议,具备服务注册与发现、负载均衡及容错机制,简化了服务调用的复杂性,使开发者能更专注于业务逻辑的实现。
344 2
|
Dubbo Java 应用服务中间件
💥Spring Cloud Dubbo火爆来袭!微服务通信的终极利器,你知道它有多强大吗?🔥
【8月更文挑战第29天】随着信息技术的发展,微服务架构成为企业应用开发的主流模式,而高效的微服务通信至关重要。Spring Cloud Dubbo通过整合Dubbo与Spring Cloud的优势,提供高性能RPC通信及丰富的生态支持,包括服务注册与发现、负载均衡和容错机制等,简化了服务调用管理并支持多种通信协议,提升了系统的可伸缩性和稳定性,成为微服务通信领域的优选方案。开发者仅需关注业务逻辑,而无需过多关心底层通信细节,使得Spring Cloud Dubbo在未来微服务开发中将更加受到青睐。
260 0

推荐镜像

更多
  • DNS