客户端注册
实现猜想
- 生成远程服务的代理
- 获得目标服务的url地址
- 还需要建立和注册中心的动态感知
- 网络连接的建立
- 服务通信的过程中
- filter过滤
- 实现负载均衡
- 实现集群容错
注入方式
Dubbo的服务消费者注入也有两种方式:
- 通过xml形式
- 基于注解的方式
@RestController public class SayController { @DubboReference(registry = {"shanghai","hunan"}, protocol = "dubbo", loadbalance = "consistenthash", mock = "com.gupaoedu.springboot.dubbo.springbootdubbosampleconsumer.MockSayHelloService", timeout = 500, cluster = "failfast",check = false,methods = { @Method(loadbalance = "",name ="" ) },retries = 5) ISayHelloService sayHelloService; @GetMapping("/say") public String say(){ return sayHelloService.sayHello("Mic"); } }
当前Bean被加载的时候,去识别这个Bean里面的成员变量的时候,需要去扫描这个注解@DubboReference。
所以首先会在DubboAutoConfiguration中配置一个自动装配机制
DubboAutoConfiguration
@ConditionalOnMissingBean @Bean(name = ReferenceAnnotationBeanPostProcessor.BEAN_NAME) public ReferenceAnnotationBeanPostProcessor referenceAnnotationBeanPostProcessor() { return new ReferenceAnnotationBeanPostProcessor(); } // 会将这三种不同的注解都传过去,要识别的注解类型是哪些? public ReferenceAnnotationBeanPostProcessor() { super(new Class[]{DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class}); }
最终会执行 ReferenceAnnotationBeanPostProcessor 中的重写方法 doGetInjectedBean ,也就是实现bean的依赖注入的方法。
(前置介绍:
ReferenceAnnotationBeanPostProcessor 是一个 Spring Bean 后置处理器,它实现了 BeanPostProcessor 接口。当 Spring 容器创建完一个 Bean 后,会自动调用所有注册的后置处理器的 postProcessBeforeInitialization 和 postProcessAfterInitialization 方法,来对 Bean 进行前置/后置处理。
在这里,ReferenceAnnotationBeanPostProcessor 的主要作用是对使用 DubboReference、Reference 或者 com.alibaba.dubbo.config.annotation.Reference 标注的属性进行依赖注入,这些属性都表示 Dubbo RPC 服务的引用。为了实现依赖注入,ReferenceAnnotationBeanPostProcessor 需要在 Bean 创建完成后,扫描 Bean 中的属性,检查是否有需要注入的 Dubbo 引用,并通过 Dubbo 的引用获取相应的实例,将其注入到 Bean 的属性中。
具体来说,当 Spring 容器创建完一个 Bean 后,ReferenceAnnotationBeanPostProcessor 的 postProcessAfterInitialization 方法会被调用,在该方法中,会遍历 Bean 中的所有属性,检查是否使用了 Dubbo 引用注解(如 DubboReference),如果发现有,就会调用 doGetInjectedBean 方法,来完成对该属性值的注入。这个过程需要通过 Dubbo 的引用获取相应的实例,以完成依赖注入。)
doGetInjectedBean
在这个方法中,主要做两个事情
- 注册一个ReferenceBean到Spring IOC容器中
- 调用 getOrCreateProxy 返回一个动态代理对象
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectedElement injectedElement) throws Exception { // 根据注解的属性和注入类型,通过 buildReferencedBeanName 方法构建引用 Bean 的名称 String referencedBeanName = this.buildReferencedBeanName(attributes, injectedType); // 获取标记有 @Reference 注解的属性上指定的 Bean 名称 String referenceBeanName = this.getReferenceBeanName(attributes, injectedType); // 根据指定的 referenceBeanName、注解属性和注入类型构建 ReferenceBean 实例 (referenceBean),如果该实例不存在的话。接下来,通过调用 isLocalServiceBean 方法,判断 referencedBeanName 是否是本地服务 Bean。 ReferenceBean referenceBean = this.buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType); boolean localServiceBean = this.isLocalServiceBean(referencedBeanName, referenceBean, attributes); // 将 referenceBean 注册到 Spring 容器中,并设置相关属性,如是否是本地服务 Bean、注入类型等。 this.registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType); this.cacheInjectedReferenceBean(referenceBean, injectedElement); // 根据引用的 Bean 名称 (referencedBeanName)、referenceBean、是否是本地服务 Bean (localServiceBean) 和注入类型 (injectedType),获取或创建动态代理。这个动态代理可以用于在执行方法调用时进行远程调用。 return this.getOrCreateProxy(referencedBeanName, referenceBean, localServiceBean, injectedType); }
getOrCreateProxy
获取或者创建一个动态代理对象。
private Object getOrCreateProxy(String referencedBeanName, ReferenceBean referenceBean, boolean localServiceBean, Class<?> serviceInterfaceType) { if (localServiceBean) { // 如果是本地服务bean,则new一个动态代理 return Proxy.newProxyInstance(this.getClassLoader(), new Class[]{serviceInterfaceType}, this.newReferencedBeanInvocationHandler(referencedBeanName)); } else { this.exportServiceBeanIfNecessary(referencedBeanName); return referenceBean.get(); } }
可以看到,最终返回的动态代理对象,是通过referenceBean.get();来获得的。
ReferenceConfig.get
public synchronized T get() { if (this.destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + this.url + ") has already destroyed!"); } else { if (this.ref == null) { this.init(); } return this.ref; } }
init方法
开始调用init方法进行ref也就是代理对象的初始化动作.
- 检查配置信息
- 根据dubbo配置,构建map集合。
- 调用 createProxy 创建动态代理对象
public synchronized void init() { //如果已经初始化,则直接返回 if (!this.initialized) { if (this.bootstrap == null) { this.bootstrap = DubboBootstrap.getInstance(); this.bootstrap.init(); } //检查配置 this.checkAndUpdateSubConfigs(); //检查本地存根 local与stub this.checkStubAndLocal(this.interfaceClass); ConfigValidationUtils.checkMock(this.interfaceClass, this); Map<String, String> map = new HashMap(); map.put("side", "consumer"); //添加运行时参数 ReferenceConfigBase.appendRuntimeParameters(map); if (!ProtocolUtils.isGeneric(this.generic)) { //获取版本信息 String revision = Version.getVersion(this.interfaceClass, this.version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } //获取接口方法列表,添加到map中 String[] methods = Wrapper.getWrapper(this.interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + this.interfaceClass.getName()); map.put("methods", "*"); } else { map.put("methods", StringUtils.join(new HashSet(Arrays.asList(methods)), ",")); } } // 其实这里和服务端一样,也需要解析配置信息拼接成url,可以通过注册中心拿到配置信息,也可以使用自己配置的信息 //通过class加载配置信息 map.put("interface", this.interfaceName); AbstractConfig.appendParameters(map, this.getMetrics()); AbstractConfig.appendParameters(map, this.getApplication()); AbstractConfig.appendParameters(map, this.getModule()); AbstractConfig.appendParameters(map, this.consumer); AbstractConfig.appendParameters(map, this); //将元数据配置信息放入到map中 MetadataReportConfig metadataReportConfig = this.getMetadataReportConfig(); if (metadataReportConfig != null && metadataReportConfig.isValid()) { map.putIfAbsent("metadata-type", "remote"); } //遍历methodConfig,组装method参数信息 Map<String, AsyncMethodInfo> attributes = null; if (CollectionUtils.isNotEmpty(this.getMethods())) { attributes = new HashMap(); Iterator var4 = this.getMethods().iterator(); while(var4.hasNext()) { MethodConfig methodConfig = (MethodConfig)var4.next(); AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName()); String retryKey = methodConfig.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = (String)map.remove(retryKey); if ("false".equals(retryValue)) { map.put(methodConfig.getName() + ".retries", "0"); } } AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig); if (asyncMethodInfo != null) { attributes.put(methodConfig.getName(), asyncMethodInfo); } } } //获取服务消费者ip地址 String hostToRegistry = ConfigUtils.getSystemProperty("DUBBO_IP_TO_REGISTRY"); if (StringUtils.isEmpty(hostToRegistry)) { hostToRegistry = NetUtils.getLocalHost(); } else if (NetUtils.isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:DUBBO_IP_TO_REGISTRY, value:" + hostToRegistry); } map.put("register.ip", hostToRegistry); this.serviceMetadata.getAttachments().putAll(map); this.ref = this.createProxy(map); this.serviceMetadata.setTarget(this.ref); this.serviceMetadata.addAttribute("refClass", this.ref); ConsumerModel consumerModel = this.repository.lookupReferredService(this.serviceMetadata.getServiceKey()); consumerModel.setProxyObject(this.ref); consumerModel.init(attributes); this.initialized = true; this.dispatch(new ReferenceConfigInitializedEvent(this, this.invoker)); } }
createProxy
我们先来思考一下,创建动态代理对象这个过程中,它可能会有哪些操作步骤?这个方法要能猜出来,那必然需要对dubbo的使用比较熟悉。
首先我们需要注意一个点,这里是创建一个代理对象,而这个代理对象应该也和协议有关系,也就是不同的协议,使用的代理对象也应该不一样。
观察下面的代码,我们发现没有这么简单,正常创建动态代理,通过那两种方式即可去构建就行,但是这里会有很多前置的东西。
private T createProxy(Map<String, String> map) { URL u; // 是不是同JVM调用 if (this.shouldJvmRefer(map)) { URL url = (new URL("injvm", "127.0.0.1", 0, this.interfaceClass.getName())).addParameters(map); // 那么就从本地的代理调用 this.invoker = REF_PROTOCOL.refer(this.interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + this.interfaceClass.getName()); } } else { // 如果不是本地调用,那么就是远程调用 this.urls.clear(); URL monitorUrl; // <dubbo:reference url = "dubbo:// ; dubbo://(可以配置多套参数)"> 点对点调用,是指直接在服务提供者和服务消费者之间建立直连的通信通道,绕过注册中心的调用方式。 if (this.url != null && this.url.length() > 0) { // 处理url,然后去遍历 // 构建url 然后添加到urls里面 String[] us = CommonConstants.SEMICOLON_SPLIT_PATTERN.split(this.url); if (us != null && us.length > 0) { String[] var11 = us; int var14 = us.length; for(int var17 = 0; var17 < var14; ++var17) { String u = var11[var17]; URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(this.interfaceName); } if (UrlUtils.isRegistry(url)) { this.urls.add(url.addParameterAndEncoded("refer", StringUtils.toQueryString(map))); } else { this.urls.add(ClusterUtils.mergeUrl(url, map)); } } } } // 如果我们发布的协议不是injvm协议,injvm是本地协议 else if (!"injvm".equalsIgnoreCase(this.getProtocol())) { // 检查注册中心的配置 this.checkRegistry();// 配置的dubbo.registry List<URL> us = ConfigValidationUtils.loadRegistries(this, false); if (CollectionUtils.isNotEmpty(us)) { for(Iterator var3 = us.iterator(); var3.hasNext(); this.urls.add(u.addParameterAndEncoded("refer", StringUtils.toQueryString(map)))) { u = (URL)var3.next(); monitorUrl = ConfigValidationUtils.loadMonitor(this, u); if (monitorUrl != null) { map.put("monitor", URL.encode(monitorUrl.toFullString())); } } } if (this.urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + this.interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } if (this.urls.size() == 1) { this.invoker = REF_PROTOCOL.refer(this.interfaceClass, (URL)this.urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList(); URL registryURL = null; Iterator var16 = this.urls.iterator(); while(var16.hasNext()) { monitorUrl = (URL)var16.next(); invokers.add(REF_PROTOCOL.refer(this.interfaceClass, monitorUrl)); if (UrlUtils.isRegistry(monitorUrl)) { registryURL = monitorUrl; } } if (registryURL != null) { u = registryURL.addParameterIfAbsent("cluster", "zone-aware"); this.invoker = CLUSTER.join(new StaticDirectory(u, invokers)); // 在这里之所以通过 StaticDirectory 去维护,是因为我们现在配置的 registry是静态的,主要是注册地址是写死的 } else { this.invoker = CLUSTER.join(new StaticDirectory(invokers)); } } }
...... // 构建好了之后,通过这个方式,创建动态代理 return PROXY_FACTORY.getProxy(this.invoker, ProtocolUtils.isGeneric(this.generic)); } }
在上面这个方法中,有两个核心的代码需要关注,分别是。
- REF_PROTOCOL.refer, 这个是生成invoker对象,之前我们说过,它是一个调用器,是dubbo中比较重要的领域对象,它在这里承担这服务调用的核心逻辑.
- PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)), 构建一个代理对象,代理客户端的请求。
REF_PROTOCOL.ref
我们先来分析refer方法。
REF_PROTOCOL是一个自适应扩展点,现在我们看到这个代码,应该是比较熟悉了。它会生成一个
Protocol$Adaptive的类,然后根据refer传递的的url参数来决定当前路由到哪个具体的协议处理器。
Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
前面我们分析服务发布的时候,已经说过了这个过程,所以就跳过,直接进入到RegistryProtocol.refer中
RegistryProtocol.refer
RegistryProtocol这个类我们已经很熟悉了,服务注册和服务启动都是在这个类里面触发的。
现在我们又通过这个方法来获得一个inovker对象,那我们继续去分析refer里面做了什么事情。
这里面的代码逻辑比较简单
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 获得注册中心的url地址 // 此时,这里得到的是zookeeper:// url = this.getRegistryUrl(url); //registryFactory,是一个自适应扩展点,RegistryFactory$Adaptive //定位到org.apache.dubbo.registry.RegistryFactory这个类可以知道,返回的实例是:ZookeeperRegistryFactory,并且是一个被RegistryFactoryWrapper包装的实例 Registry registry = this.registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return this.proxyFactory.getInvoker(registry, type, url); } else { Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded("refer")); String group = (String)qs.get("group"); return group == null || group.length() <= 0 || CommonConstants.COMMA_SPLIT_PATTERN.split(group).length <= 1 && !"*".equals(group) ? this.doRefer(this.cluster, registry, type, url) : this.doRefer(this.getMergeableCluster(), registry, type, url); } }
doRefer
doRefer方法创建一个RegistryDirectory实例,然后生成服务者消费者连接,并向注册中心进行注册。注册完毕后,紧接着订阅providers、configurators、roters。
等节点下的数据。完成订阅后,RegistryDirectory会收到到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在providers产生多个节点。
这个时候就需要Cluster将多个服务节点合并为一个,并生成一个invoker。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { //初始化RegistryDirectory(注册中心的目录) RegistryDirectory<T> directory = new RegistryDirectory(type, url); directory.setRegistry(registry);// 注册中心 directory.setProtocol(this.protocol);// 协议 Map<String, String> parameters = new HashMap(directory.getConsumerUrl().getParameters()); //注册consumer://协议url URL subscribeUrl = new URL("consumer", (String)parameters.remove("register.ip"), 0, type.getName(), parameters); if (directory.isShouldRegister()) { //注册服务消费者的url地址 directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); //进行订阅 订阅地址的变化 //subscribe订阅信息消费url、通知监听、配置监听、订阅url //toSubscribeUrl:订阅信息:category、providers、configurators、routers directory.subscribe(toSubscribeUrl(subscribeUrl)); //一个注册中心会存在多个服务提供者,所以在这里需要把多个服务提供者通过cluster.join合并成一个 Invoker<T> invoker = cluster.join(directory); // 只是初始化了一个RegistryDirectory,然后通过 Cluster.join 来返回一个Invoker对象 List<RegistryProtocolListener> listeners = this.findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } else { //通过RegistryInvokerWrapper进行包装 RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper(directory, cluster, invoker, subscribeUrl); Iterator var11 = listeners.iterator(); while(var11.hasNext()) { RegistryProtocolListener listener = (RegistryProtocolListener)var11.next(); listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper; } }
Cluster是什么?
我们只关注一下Invoker这个代理类的创建过程,其他的暂且不关心
// 把directory放进去代表将来能从这里面拿到地址列表 Invoker invoker=cluster.join(directory)
cluster其实是在RegistryProtocol中通过set方法完成依赖注入的,并且,它还是一个被包装的。
public void setCluster(Cluster cluster) { this.cluster = cluster; }
所以,Cluster是一个被依赖注入的自适应扩展点,注入的对象实例是一个Cluster$Adaptive的动态代理类。
如下可以看到Cluster的定义
@SPI(FailoverCluster.NAME) public interface Cluster { @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
Cluster$Adaptive
在动态适配的类中会基于extName,选择一个合适的扩展点进行适配,由于默认情况下cluster:failover,所以getExtension(“failover”)理论上应该返回FailOverCluster。但实际上,这里做了包装MockClusterWrapper(FailOverCluster)
public class Cluster$Adaptive implements org.apache.dubbo.rpc.cluster.Cluster { public org.apache.dubbo.rpc.Invoker join(org.apache.dubbo.rpc.cluster.Directory arg0) throws org.apache.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argumentgetUrl() == null"); org.apache.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover"); if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (" + url.toString() + ") use keys([cluster])"); org.apache.dubbo.rpc.cluster.Cluster extension = (org.apache.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(org.apa che.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }
cluster.join
所以再回到doRefer方法,下面这段代码, 实际是调用MockClusterWrapper(FailOverCluster.join)
public class MockClusterWrapper implements Cluster { private Cluster cluster; public MockClusterWrapper(Cluster cluster) { this.cluster = cluster; } @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } }
再调用AbstractCluster中的join方法
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)); }
doJoin返回的是FailoverClusterInvoker。
buildClusterInterceptors从名字可以看出,这里是构建一个Cluster的拦截器。
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) { AbstractClusterInvoker<T> last = clusterInvoker; //通过激活扩展点来获得ClusterInterceptor集合. 如果没有配置激活参数,默认会有一个ConsumerContextClusterInterceptor拦截器. List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key); //遍历拦截器集合,构建一个拦截器链. if (!interceptors.isEmpty()) { for(int i = interceptors.size() - 1; i >= 0; --i) { ClusterInterceptor interceptor = (ClusterInterceptor)interceptors.get(i); last = new AbstractCluster.InterceptorInvokerNode(clusterInvoker, interceptor, (AbstractClusterInvoker)last); } } return (Invoker)last; }
context=org.apache.dubbo.rpc.cluster.interceptor.ConsumerContextClusterIntercept or zone-aware=org.apache.dubbo.rpc.cluster.interceptor.ZoneAwareClusterInterceptor
Cluster.join总结
因此 Cluster.join,实际上是获得一个Invoker对象,这个Invoker实现了Directory的包装,并且配置了拦截器。至于它是干嘛的,我们后续再分析。
深入理解Dubbo-6.服务消费源码分析(下):https://developer.aliyun.com/article/1414085