一、引言
对于 Java
开发者而言,关于 dubbo
,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。
但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。
本期 dubbo
源码解析系列文章,将带你领略 dubbo
源码的奥秘
本期源码文章吸收了之前 Spring
、Kakfa
、JUC
源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。
虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!
废话不多说,发车!
二、消费者订阅服务
读过我们上一篇:从源码全面解析 dubbo 注解配置的来龙去脉 的文章的朋友,我们当时留了一个 EnableDubboConfig
注解里面的 ReferenceAnnotationBeanPostProcessor
方法
1、消费端配置
@DubboReference(protocol = "dubbo", timeout = 100) private IUserService iUserService;
从这个配置我们可以得出一个信息,Spring
不会自动将 IUserService
注入 Bean
工厂中
当然这句话也是一个废话,人家 Dubbo
自定义的注解,Spring
怎么可能扫描到…
而 ReferenceAnnotationBeanPostProcessor
这个方法是消费端扫描 @Reference
使用的
本篇将正式的介绍下消费端是如何订阅我们服务端注册在 Zookeeper
上的服务的
2、扫描注解
我们先看这个类的实现:
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements ApplicationContextAware, BeanFactoryPostProcessor {}
实现了 BeanFactoryPostProcessor
接口,这个时候如果看过博主的 Spring
的源码系列文章,DNA
应该已经开始活动了
没错,基本上这个接口就是为了往我们的 BeanDefinitionMap
里面注册 BeanDefinition
信息的
想必到这里,就算我们不看源码,也能猜到
这个哥们绝对是将 @DubboReference
的注解扫描封装成 BeanDefinition
注册至 BeanDefinitionMap
的
我们直接看源码
@Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory){ // 拿到当前Spring工厂所有的bean名称 String[] beanNames = beanFactory.getBeanDefinitionNames(); for (String beanName : beanNames) { // 获取bean的类型 beanType = beanFactory.getType(beanName); // 省略一些代码 if (beanType != null) { // 获取元数据信息 AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null); // 解析@DubboReference注解并注册至BeanDefinitionMap中 prepareInjection(metadata); } } }
我们详细看看这个 prepareInjection
方法是如何解析的 @DubboReference
注解并做了一些什么特殊的操作
protected void prepareInjection(AnnotatedInjectionMetadata metadata) throws BeansException { for (AnnotatedFieldElement fieldElement : metadata.getFieldElements()) { Class<?> injectedType = fieldElement.field.getType(); // 配置的参数 AnnotationAttributes attributes = fieldElement.attributes; // 解析&注册 String referenceBeanName = registerReferenceBean(fieldElement.getPropertyName(), injectedType, attributes, fieldElement.field); } } }
这个 registerReferenceBean
里面的逻辑较多,我们只取最关键的,感兴趣的朋友也可以自己去看一看
public String registerReferenceBean(String propertyName, Class<?> injectedType, Map<String, Object> attributes, Member member){ RootBeanDefinition beanDefinition = new RootBeanDefinition(); // ReferenceBean.class.getName() = org.apache.dubbo.config.spring.ReferenceBean beanDefinition.setBeanClassName(ReferenceBean.class.getName()); beanDefinition.getPropertyValues().add(ReferenceAttributes.ID, referenceBeanName); // referenceProps = 配置的信息 // interfaceName = com.common.service.IUserService // interfaceClass = interface com.common.service.IUserService beanDefinition.setAttribute(Constants.REFERENCE_PROPS, attributes); beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_CLASS, interfaceClass); beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_NAME, interfaceName); GenericBeanDefinition targetDefinition = new GenericBeanDefinition(); targetDefinition.setBeanClass(interfaceClass); beanDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, referenceBeanName + "_decorated")); beanDefinition.setAttribute(Constants.OBJECT_TYPE_ATTRIBUTE, interfaceClass); // 注册至BeanDefinitionMap中 beanDefinitionRegistry.registerBeanDefinition(referenceBeanName, beanDefinition); referenceBeanManager.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName); return referenceBeanName; }
到这里,我们的 ReferenceAnnotationBeanPostProcessor
方法将 @DubboReference
扫描组装成 BeanDefinition
注册到了 BeanDefinitionMap
中
3、创建代理对象
在我们上面注册的时候,有这么一行代码:
beanDefinition.setBeanClassName(ReferenceBean.class.getName());
表明我们当前注册的 Bean
的 Class
类型为 org.apache.dubbo.config.spring.ReferenceBean
当我们的 Spring
去实例化 BeanDefinitionMap
中的对象时,这个时候会调用 ReferenceBean
的 getObject
方法
Spring 在实例化时会获取每一个
BeanDefinition
的 Object,不存在则创建
我们发现,在 ReferenceBean
里面实际上是重写了 getObject
的方法:
public T getObject() { if (lazyProxy == null) { createLazyProxy(); } return (T) lazyProxy; } private void createLazyProxy() { // 创建代理对象 ProxyFactory proxyFactory = new ProxyFactory(); proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource()); proxyFactory.addInterface(interfaceClass); Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces(); for (Class<?> anInterface : internalInterfaces) { proxyFactory.addInterface(anInterface); } // 进行动态代理(生成动态代理的对象) // 这里动态代理用的是JdkDynamicAopProxy this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader); }
我们看下 JdkDynamicAopProxy
里面做了什么?
final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable { // proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource()); // 这里的targetSource = DubboReferenceLazyInitTargetSource TargetSource targetSource = this.advised.targetSource; Object target = targetSource.getTarget() } public synchronized Object getTarget() throws Exception { // 第一次为null,未初始化 if (this.lazyTarget == null) { logger.debug("Initializing lazy target object"); this.lazyTarget = createObject(); } return this.lazyTarget; }
我们看下 DubboReferenceLazyInitTargetSource
的 createObject
// 第一次调用时,会初始化该方法 private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource { @Override protected Object createObject() throws Exception { return getCallProxy(); } @Override public synchronized Class<?> getTargetClass() { return getInterfaceClass(); } }
这个 getCallProxy
就是我们订阅服务的地方
4、订阅服务
当我们初始化完毕之后,在我们第一次调用的时候,会调用 getCallProxy()
该方法,去进行服务的订阅,这里会执行该方法:
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource { @Override protected Object createObject() throws Exception { return getCallProxy(); } @Override public synchronized Class<?> getTargetClass() { return getInterfaceClass(); } } private Object getCallProxy() throws Exception { synchronized(((DefaultSingletonBeanRegistry)getBeanFactory()).getSingletonMutex()) { // 获取reference return referenceConfig.get(); } }
我们继续向下看,这里会来到 ReferenceConfig
的 init
方法
protected synchronized void init() { ref = createProxy(referenceParameters); } private T createProxy(Map<String, String> referenceParameters) { // 1、监听注册中心 // 2、本地保存服务 createInvokerForRemote(); URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0, referenceParameters.get(INTERFACE_KEY), referenceParameters); consumerUrl = consumerUrl.setScopeModel(getScopeModel()); consumerUrl = consumerUrl.setServiceModel(consumerModel); MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel()); // 创建代理类 return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic)); }
4.1 监听注册中心
private void createInvokerForRemote() { if (urls.size() == 1) { // URL: // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=2532&qos.enable=true®istry=zookeeper&release=3.1.8×tamp=1686063555583 URL curUrl = urls.get(0); invoker = protocolSPI.refer(interfaceClass, curUrl); } } public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=14952&qos.enable=true&release=3.1.8×tamp=1686063658064 url = getRegistryUrl(url); Registry registry = getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY); String group = qs.get(GROUP_KEY); if (StringUtils.isNotEmpty(group)) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs); } } Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY)); return doRefer(cluster, registry, type, url, qs); }
这里的 QS
如下:
我们直接跳到 MigrationRuleHandler
的 doMigrate
中:
public synchronized void doMigrate(MigrationRule rule) { MigrationStep step = MigrationStep.APPLICATION_FIRST; float threshold = -1f; step = rule.getStep(consumerURL); threshold = rule.getThreshold(consumerURL); if (refreshInvoker(step, threshold, rule)) { setMigrationRule(rule); } }
在这个 refreshInvoker
里面,会判断当前注册的方式
private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) { MigrationStep originStep = currentStep; if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) { boolean success = true; switch (step) { // 接口&应用 case APPLICATION_FIRST: migrationInvoker.migrateToApplicationFirstInvoker(newRule); break; // 应用 case FORCE_APPLICATION: success = migrationInvoker.migrateToForceApplicationInvoker(newRule); break; // 接口 case FORCE_INTERFACE: default: success = migrationInvoker.migrateToForceInterfaceInvoker(newRule); } return success; } return true; }
我们本次只讲 接口注册
,我们直接跳到:ZookeeperRegistry
的 doSubscribe
方法
public void doSubscribe(final URL url, final NotifyListener listener) { List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { // 创建目录 zkClient.create(path, false, true); // 增加监听 // 1、/dubbo/com.msb.common.service.IUserService/providers // 2、/dubbo/com.msb.common.service.IUserService/configurators // 3、/dubbo/com.msb.common.service.IUserService/routers List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 将Zookeeper服务保存 notify(url, listener, urls); }
4.2 本地保存服务
直接跳到 AbstractRegistry
的 doSaveProperties
方法
- 创建文件
- 将服务端数据存入文件中
public void doSaveProperties(long version) { File lockfile = null; // 创建文件 lockfile = new File(file.getAbsolutePath() + ".lock"); tmpProperties = new Properties(); Set<Map.Entry<Object, Object>> entries = properties.entrySet(); for (Map.Entry<Object, Object> entry : entries) { tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue()); } try (FileOutputStream outputFile = new FileOutputStream(file)) { tmpProperties.store(outputFile, "Dubbo Registry Cache"); } }
这里存储的数据如下:简单理解,各种服务端的信息
com.common.service.IUserService -> empty://192.168.0.103/com.common.service.IUserService?application=dubbo-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100×tamp=1686064969935&unloadClusterRelated=false empty://192.168.0.103/com.msb.common.service.IUserService?application=dubbo-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100×tamp=1686064969935&unloadClusterRelated=false dubbo://192.168.0.103:20883/com.msb.common.service.IUserService?anyhost=true&application=dubbo-provider&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.msb.common.service.IUserService&metadata-type=remote&methods=getUserById®ister-mode=inter
如果后续我们的注册中心(Zookeeper)挂掉之后,我们的系统从本地磁盘读取服务信息也可以正常通信。
只是没有办法及时更新服务
4.3 创建动态代理类
private T createProxy(Map<String, String> referenceParameters) { // 省略代码 // create service proxy return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic)); }
这里我们直接跳到 JavassistProxyFactory
的 getProxy
方法
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
这里直接创建代理类,当我们去调用 InvokerInvocationHandler
这个方法
至于为什么要调用 InvokerInvocationHandler
,大家可以看下之前写的动态代理文章:2023年再不会动态代理,就要被淘汰了
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args); if (serviceModel instanceof ConsumerModel) { rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel); rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method)); } return InvocationUtil.invoke(invoker, rpcInvocation); }
三、流程图
- 原图可私信获取
四、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。