从源码全面解析 dubbo 服务订阅的来龙去脉

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 从源码全面解析 dubbo 服务订阅的来龙去脉

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、消费者订阅服务

读过我们上一篇:从源码全面解析 dubbo 注解配置的来龙去脉 的文章的朋友,我们当时留了一个 EnableDubboConfig 注解里面的 ReferenceAnnotationBeanPostProcessor 方法

1、消费端配置

@DubboReference(protocol = "dubbo", timeout = 100)
private IUserService iUserService;

从这个配置我们可以得出一个信息,Spring 不会自动将 IUserService 注入 Bean 工厂中

当然这句话也是一个废话,人家 Dubbo 自定义的注解,Spring 怎么可能扫描到…

ReferenceAnnotationBeanPostProcessor 这个方法是消费端扫描 @Reference 使用的

本篇将正式的介绍下消费端是如何订阅我们服务端注册在 Zookeeper 上的服务的

2、扫描注解

我们先看这个类的实现:

public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor
    implements ApplicationContextAware, BeanFactoryPostProcessor {}

实现了 BeanFactoryPostProcessor 接口,这个时候如果看过博主的 Spring 的源码系列文章,DNA 应该已经开始活动了

没错,基本上这个接口就是为了往我们的 BeanDefinitionMap 里面注册 BeanDefinition 信息的

想必到这里,就算我们不看源码,也能猜到

这个哥们绝对是将 @DubboReference 的注解扫描封装成 BeanDefinition 注册至 BeanDefinitionMap

我们直接看源码

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory){
    // 拿到当前Spring工厂所有的bean名称
    String[] beanNames = beanFactory.getBeanDefinitionNames();
    for (String beanName : beanNames) {
        // 获取bean的类型
        beanType = beanFactory.getType(beanName);
        // 省略一些代码
        if (beanType != null) {
            // 获取元数据信息
            AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null);
            // 解析@DubboReference注解并注册至BeanDefinitionMap中
            prepareInjection(metadata);
        }
    }
}

我们详细看看这个 prepareInjection 方法是如何解析的 @DubboReference 注解并做了一些什么特殊的操作

protected void prepareInjection(AnnotatedInjectionMetadata metadata) throws BeansException {
    for (AnnotatedFieldElement fieldElement : metadata.getFieldElements()) {
        Class<?> injectedType = fieldElement.field.getType();
        // 配置的参数
        AnnotationAttributes attributes = fieldElement.attributes;
        // 解析&注册
        String referenceBeanName = registerReferenceBean(fieldElement.getPropertyName(), injectedType, attributes, fieldElement.field);
        }
    }
}

这个 registerReferenceBean 里面的逻辑较多,我们只取最关键的,感兴趣的朋友也可以自己去看一看

public String registerReferenceBean(String propertyName, Class<?> injectedType, Map<String, Object> attributes, Member member){
      RootBeanDefinition beanDefinition = new RootBeanDefinition();
      // ReferenceBean.class.getName() = org.apache.dubbo.config.spring.ReferenceBean
        beanDefinition.setBeanClassName(ReferenceBean.class.getName());
        beanDefinition.getPropertyValues().add(ReferenceAttributes.ID, referenceBeanName);
      // referenceProps = 配置的信息
      // interfaceName = com.common.service.IUserService
      // interfaceClass = interface com.common.service.IUserService
        beanDefinition.setAttribute(Constants.REFERENCE_PROPS, attributes);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_CLASS, interfaceClass);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_NAME, interfaceName);
        GenericBeanDefinition targetDefinition = new GenericBeanDefinition();
        targetDefinition.setBeanClass(interfaceClass);
        beanDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, referenceBeanName + "_decorated"));
        beanDefinition.setAttribute(Constants.OBJECT_TYPE_ATTRIBUTE, interfaceClass);
      // 注册至BeanDefinitionMap中
        beanDefinitionRegistry.registerBeanDefinition(referenceBeanName, beanDefinition);
        referenceBeanManager.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName);
        return referenceBeanName;
}

到这里,我们的 ReferenceAnnotationBeanPostProcessor 方法将 @DubboReference 扫描组装成 BeanDefinition 注册到了 BeanDefinitionMap

3、创建代理对象

在我们上面注册的时候,有这么一行代码:

beanDefinition.setBeanClassName(ReferenceBean.class.getName());

表明我们当前注册的 BeanClass 类型为 org.apache.dubbo.config.spring.ReferenceBean

当我们的 Spring 去实例化 BeanDefinitionMap 中的对象时,这个时候会调用 ReferenceBeangetObject 方法

Spring 在实例化时会获取每一个 BeanDefinition 的 Object,不存在则创建

我们发现,在 ReferenceBean 里面实际上是重写了 getObject 的方法:

public T getObject() {
    if (lazyProxy == null) {
        createLazyProxy();
    }
    return (T) lazyProxy;
}
private void createLazyProxy() {
    // 创建代理对象
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    proxyFactory.addInterface(interfaceClass);
    Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
    for (Class<?> anInterface : internalInterfaces) {
        proxyFactory.addInterface(anInterface);
    }
    // 进行动态代理(生成动态代理的对象)
    // 这里动态代理用的是JdkDynamicAopProxy
    this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}

我们看下 JdkDynamicAopProxy 里面做了什么?

final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
    // proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    // 这里的targetSource = DubboReferenceLazyInitTargetSource
    TargetSource targetSource = this.advised.targetSource;
    Object target = targetSource.getTarget()
}
public synchronized Object getTarget() throws Exception {
   // 第一次为null,未初始化
   if (this.lazyTarget == null) {
      logger.debug("Initializing lazy target object");
      this.lazyTarget = createObject();
   }
   return this.lazyTarget;
}

我们看下 DubboReferenceLazyInitTargetSourcecreateObject

// 第一次调用时,会初始化该方法
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}

这个 getCallProxy 就是我们订阅服务的地方

4、订阅服务

当我们初始化完毕之后,在我们第一次调用的时候,会调用 getCallProxy() 该方法,去进行服务的订阅,这里会执行该方法:

private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}
private Object getCallProxy() throws Exception {
    synchronized(((DefaultSingletonBeanRegistry)getBeanFactory()).getSingletonMutex()) {
        // 获取reference
        return referenceConfig.get();
    }
}

我们继续向下看,这里会来到 ReferenceConfiginit 方法

protected synchronized void init() {
    ref = createProxy(referenceParameters);
}
private T createProxy(Map<String, String> referenceParameters) {
    // 1、监听注册中心
    // 2、本地保存服务
    createInvokerForRemote();
    URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
            referenceParameters.get(INTERFACE_KEY), referenceParameters);
    consumerUrl = consumerUrl.setScopeModel(getScopeModel());
    consumerUrl = consumerUrl.setServiceModel(consumerModel);
    MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
    // 创建代理类
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
4.1 监听注册中心

private void createInvokerForRemote() {
    if (urls.size() == 1) {
        // URL:
        // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=2532&qos.enable=true&registry=zookeeper&release=3.1.8&timestamp=1686063555583
        URL curUrl = urls.get(0);
        invoker = protocolSPI.refer(interfaceClass, curUrl);
    }
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=14952&qos.enable=true&release=3.1.8&timestamp=1686063658064
    url = getRegistryUrl(url);
    Registry registry = getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }
    // group="a,b" or group="*"
    Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
    String group = qs.get(GROUP_KEY);
    if (StringUtils.isNotEmpty(group)) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
        }
    }
    Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
    return doRefer(cluster, registry, type, url, qs);
}

这里的 QS 如下:

我们直接跳到 MigrationRuleHandlerdoMigrate 中:

public synchronized void doMigrate(MigrationRule rule) {
    MigrationStep step = MigrationStep.APPLICATION_FIRST;
    float threshold = -1f;
    step = rule.getStep(consumerURL);
    threshold = rule.getThreshold(consumerURL);
    if (refreshInvoker(step, threshold, rule)) {
        setMigrationRule(rule);
    }
}

在这个 refreshInvoker 里面,会判断当前注册的方式

private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
        MigrationStep originStep = currentStep;
        if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
            boolean success = true;
            switch (step) {
                // 接口&应用
                case APPLICATION_FIRST:
                    migrationInvoker.migrateToApplicationFirstInvoker(newRule);
                    break;
                // 应用
                case FORCE_APPLICATION:
                    success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
                    break;
                // 接口
                case FORCE_INTERFACE:
                default:
                    success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
            }
            return success;
        }
        return true;
    }

我们本次只讲 接口注册,我们直接跳到:ZookeeperRegistrydoSubscribe 方法

public void doSubscribe(final URL url, final NotifyListener listener) {
    List<URL> urls = new ArrayList<>();
    for (String path : toCategoriesPath(url)) {
        // 创建目录
        zkClient.create(path, false, true);
        // 增加监听
        // 1、/dubbo/com.msb.common.service.IUserService/providers
        // 2、/dubbo/com.msb.common.service.IUserService/configurators
        // 3、/dubbo/com.msb.common.service.IUserService/routers
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    // 将Zookeeper服务保存
    notify(url, listener, urls);
}
4.2 本地保存服务

直接跳到 AbstractRegistrydoSaveProperties 方法

  • 创建文件
  • 将服务端数据存入文件中
public void doSaveProperties(long version) {
    File lockfile = null;
    // 创建文件
    lockfile = new File(file.getAbsolutePath() + ".lock");
    tmpProperties = new Properties();
    Set<Map.Entry<Object, Object>> entries = properties.entrySet();
    for (Map.Entry<Object, Object> entry : entries) {
        tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
    }
    try (FileOutputStream outputFile = new FileOutputStream(file)) {
        tmpProperties.store(outputFile, "Dubbo Registry Cache");
    }
}

这里存储的数据如下:简单理解,各种服务端的信息

com.common.service.IUserService -> empty://192.168.0.103/com.common.service.IUserService?application=dubbo-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false empty://192.168.0.103/com.msb.common.service.IUserService?application=dubbo-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false dubbo://192.168.0.103:20883/com.msb.common.service.IUserService?anyhost=true&application=dubbo-provider&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.msb.common.service.IUserService&metadata-type=remote&methods=getUserById&register-mode=inter

如果后续我们的注册中心(Zookeeper)挂掉之后,我们的系统从本地磁盘读取服务信息也可以正常通信。

只是没有办法及时更新服务

4.3 创建动态代理类

private T createProxy(Map<String, String> referenceParameters) {
    // 省略代码
    // create service proxy
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

这里我们直接跳到 JavassistProxyFactorygetProxy 方法

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

这里直接创建代理类,当我们去调用 InvokerInvocationHandler 这个方法

至于为什么要调用 InvokerInvocationHandler ,大家可以看下之前写的动态代理文章:2023年再不会动态代理,就要被淘汰了

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
        if (serviceModel instanceof ConsumerModel) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
            rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
        }
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }

三、流程图

  • 原图可私信获取

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。


相关文章
|
3天前
|
XML 人工智能 Java
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
Spring Bean名称生成规则(含源码解析、自定义Spring Bean名称方式)
|
5天前
|
存储 开发工具 对象存储
Javaweb之SpringBootWeb案例之阿里云OSS服务入门的详细解析
Javaweb之SpringBootWeb案例之阿里云OSS服务入门的详细解析
11 0
|
11天前
yolo-world 源码解析(六)(2)
yolo-world 源码解析(六)
22 0
|
11天前
yolo-world 源码解析(六)(1)
yolo-world 源码解析(六)
15 0
|
11天前
yolo-world 源码解析(五)(4)
yolo-world 源码解析(五)
22 0
|
11天前
yolo-world 源码解析(五)(1)
yolo-world 源码解析(五)
33 0
|
11天前
yolo-world 源码解析(二)(2)
yolo-world 源码解析(二)
22 0
|
25天前
|
XML Java Android开发
Android实现自定义进度条(源码+解析)
Android实现自定义进度条(源码+解析)
53 1
|
11天前
Marker 源码解析(二)(3)
Marker 源码解析(二)
16 0

推荐镜像

更多