服务消费者端代码
@DubboReference(version = "1.0.0") private DemoService demoService; public static void main(String[] args) { SpringApplication.run(DubboAutoConfigurationConsumerBootstrap.class).close(); } @Bean public ApplicationRunner runner() { return new ApplicationRunner() { public void run(ApplicationArguments args) throws Exception { System.out.println(demoService.sayHello("mercyblitz")); } }; } 复制代码
我们再次来到 DubboComponentScanRegistrar 中、
@Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { Set<String> packagesToScan = getPackagesToScan(importingClassMetadata); registerServiceAnnotationBeanPostProcessor(packagesToScan, registry); // @since 2.7.6 Register the common beans registerCommonBeans(registry); } static void registerCommonBeans(BeanDefinitionRegistry registry) { // Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME, ReferenceAnnotationBeanPostProcessor.class); // Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093 registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME, DubboConfigAliasPostProcessor.class); // Since 2.7.5 Register DubboLifecycleComponentApplicationListener as an infrastructure Bean registerInfrastructureBean(registry, DubboLifecycleComponentApplicationListener.BEAN_NAME, DubboLifecycleComponentApplicationListener.class); // Since 2.7.4 Register DubboBootstrapApplicationListener as an infrastructure Bean registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class); // Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME, DubboConfigDefaultPropertyValueBeanPostProcessor.class); } 复制代码
我们看到了 ReferenceAnnotationBeanPostProcessor、该类同时也实现了 InstantiationAwareBeanPostProcessor 接口。
该方法会在 Spring 填充属性的时候被调用、通过该方法返回注入的对象
@Override protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectionMetadata.InjectedElement injectedElement) throws Exception { /** * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext} */ String referencedBeanName = buildReferencedBeanName(attributes, injectedType); /** * The name of bean that is declared by {@link Reference @Reference} annotation injection */ String referenceBeanName = getReferenceBeanName(attributes, injectedType); ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType); boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes); prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean); registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType); cacheInjectedReferenceBean(referenceBean, injectedElement); return referenceBean.get(); } 复制代码
我们可以看到 最终调用 ReferenceBean 的 get 方法
public synchronized T get() { if (destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!"); } if (ref == null) { init(); } return ref; } 复制代码
ref 此时为 null 进入到 init 方法中
public synchronized void init() { ....... // 构造参数 map ref = createProxy(map); ....... } private T createProxy(Map<String, String> map) { if (shouldJvmRefer(map)) { ........ } else { urls.clear(); if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. ......//点对点通信 } else { // assemble URL from register center's configuration // if protocols not injvm checkRegistry if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) { checkRegistry(); List<URL> us = ConfigValidationUtils.loadRegistries(this, false); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u); if (monitorUrl != null) { map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config."); } } } // 单个注册中心 if (urls.size() == 1) { invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { // 多个注册中心 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (UrlUtils.isRegistry(url)) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // for multi-subscription scenario, use 'zone-aware' policy by default URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME); // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. invoker = CLUSTER.join(new StaticDirectory(invokers)); } } } ......... // create service proxy return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)); } 复制代码
无论配置了单个注册中心、还是多个、都需要从注册中心中获取服务提供者的地址
REF_PROTOCOL.refer(interfaceClass, url)
RegistryProtocol
@Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { url = getRegistryUrl(url); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } return doRefer(cluster, registry, type, url); } 复制代码
直接进入到 doRefer 中干实事
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(toSubscribeUrl(subscribeUrl)); Invoker<T> invoker = cluster.join(directory); List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper; } 复制代码
进入到订阅服务注册中心的路径、会在此处获取到服务提供者的真实地址
public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); } 复制代码
url 的地址如下。
consumer://192.168.1.103/com.demo.api.DemoService?application=dubbo-auto-configure-consumer-sample&category=providers,configurators,routers&dubbo=2.0.2&init=false&interface=com.demo.api.DemoService&metadata-type=remote&methods=sayHello&pid=85321&qos.enable=false&release=2.7.8&revision=1.0.0&side=consumer&sticky=false×tamp=1645947096408&version=1.0.0
订阅了 providers,configurators,routers 这三个目录
进入到 ZookeeperRegistry
@Override public void doSubscribe(final URL url, final NotifyListener listener) { try { if (ANY_VALUE.equals(url.getServiceInterface())) { ...... } else { List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } 复制代码
path 分别为
/dubbo/com.demo.api.DemoService/providers
/dubbo/com.demo.api.DemoService/configurators
/dubbo/com.demo.api.DemoService/routers
providers 获取到到值
dubbo%3A%2F%2F127.0.0.1%3A12345%2Fcom.demo.api.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-auto-configuration-provider-demo%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcom.demo.api.DemoService%26methods%3DsayHello%26pid%3D85689%26release%3D2.7.7%26revision%3D1.0.0%26side%3Dprovider%26timestamp%3D1645947588375%26version%3D1.0.0
@Override protected void notify(URL url, NotifyListener listener, List<URL> urls) { ....... doNotify(url, listener, urls); } protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { super.notify(url, listener, urls); } protected void notify(URL url, NotifyListener listener, List<URL> urls) { ....... Map<String, List<URL>> result = new HashMap<>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); categoryList.add(u); } } Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); listener.notify(categoryList); saveProperties(url); } } 复制代码
listener.notify(categoryList); 复制代码
这里的 listener 就是我们一开始的 DiretoryRegistry
@Override public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); /** * 3.x added for extend URL address */ ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners != null && !supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } refreshOverrideAndInvoker(providerURLs); } 复制代码
进入到 refreshOverrideAndInvoker 中
private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); refreshInvoker(urls); } 复制代码
private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { ....... } else { ...... Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls .toString())); return; } List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } 复制代码
toInvokers(invokerUrls)
将 url 转化成 Invoker
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<>(); String queryProtocols = this.queryMap.get(PROTOCOL_KEY); for (URL providerUrl : urls) { // If protocol is configured at the reference side, only the matching protocol is selected ............ if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { ........ continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key); // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(DISABLED_KEY)) { enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } if (enabled) { invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; } 复制代码
根据协议调用 refer 方法
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
ProtocolFilterWrapper 这里也会创建一系列的 Filter、在调用真正的 Invoker 之前执行
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); } 复制代码
AbstractProtocol 异步转同步
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); } 复制代码
DubboProtocol 这里直接返回一个 DubboInvoker
@Override public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } 复制代码
getClients 则会去与服务提供者建立连接
private ExchangeClient[] getClients(URL url) { boolean useShareConnect = false; int connections = url.getParameter(CONNECTIONS_KEY, 0); List<ReferenceCountExchangeClient> shareClients = null; if (connections == 0) { useShareConnect = true; String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null); connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); shareClients = getSharedClient(url, connections); } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (useShareConnect) { clients[i] = shareClients.get(i); } else { clients[i] = initClient(url); } } return clients; } 复制代码
我们现在回到 RegistryProtocol 中 directory.subscribe(toSubscribeUrl(subscribeUrl));
的流程上面我们已经讲了
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(toSubscribeUrl(subscribeUrl)); Invoker<T> invoker = cluster.join(directory); List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper; 复制代码
Invoker<T> invoker = cluster.join(directory);
下面进入到这个方法中
MockClusterWrapper 就是我们常用的 mock 功能
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } 复制代码
AbstractCluster 则会创建一系列的 Interceptor 、在真正调用 Invoker 之前执行
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)); } 复制代码
FailoverCluster 我们默认是 Failover 策略
@Override public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<>(directory); } 复制代码
再回到我们的 org.apache.dubbo.config.ReferenceConfig#createProxy 中、如果是多个注册中心、则会再 join 一次、因为也要对不同的注册中心进行选择对应的服务提供者。多一层负载均衡
在 org.apache.dubbo.config.ReferenceConfig#createProxy 最后
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)); 复制代码
通过代理工厂创建一个代理 JavassistProxyFactory
@SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } 复制代码
至于真正发起调用的逻辑、我们放到下一个文章去写