客户端发起调用
在上一节课中,我们已经服务消费者在启动时被注入一个动态代理类的实现过程,大家再来回顾一下服务消费者启动过程中做了什么事情呢?
服务启动过程中,主要会构建一个动态代理类,并且在构建动态代理之前,会从注册中心上获取服务提供者的地址,并且会订阅服务提供者的状态。
然后,采用DubboProtocol协议,和服务端建立一个远程通信,并保存到Invoker中进行返回。那接下来,我们再去看服务调用的时候,请求的执行过程。
JavassistProxyFactory.getProxy
在创建代理对象时,会执行下面这段代码,一旦代码被调用,就会触发InvokerInvocationHandler。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
当调用sayHello方法时,会触发handler.invoker
public java.lang.String sayHello(java.lang.String arg0){ Object[] args = new Object[1]; args[0] = ($w)$1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String)ret; }
InvokerInvocationHandler.invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(this.invoker, args); } else { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // 如果判断是属于Object的方法,就不用反射调用了 if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return this.invoker.toString(); } if ("$destroy".equals(methodName)) { this.invoker.destroy(); return null; } if ("hashCode".equals(methodName)) { return this.invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return this.invoker.equals(args[0]); } //数据传输对象 RpcInvocation rpcInvocation = new RpcInvocation(method, this.invoker.getInterface().getName(), args); String serviceKey = this.invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); if (this.consumerModel != null) { rpcInvocation.put("consumerModel", this.consumerModel); rpcInvocation.put("methodModel", this.consumerModel.getMethodModel(method)); } // 此时的invoker取决于我们传递过来的invoker是什么 return this.invoker.invoke(rpcInvocation).recreate(); } }
进入到InvokerInvocationHandler.invoke方法。
其中invoker这个对象, 是在启动注入动态代理类时,初始化的一个调用器对象,我们得先要知道它是谁,才能知道它下一步调用的是哪个对象的方法.
它应该是: MockClusterInvoker,因为它是通过MockClusterWrapper来进行包装的。这个可以看前面的cluster.join()部分,就能够发现。
MockClusterWrapper
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker(directory, this.cluster.join(directory)); }
MockClusterInvoker
public Result invoke(Invocation invocation) throws RpcException { Result result = null; // mock配置参数 String value = this.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim(); if (value.length() != 0 && !"false".equalsIgnoreCase(value)) { // 如果 mock 参数以 "force" 开头,则强制进行 mock 操作 if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.getUrl()); } // 执行 mock 操作 result = this.doMockInvoke(invocation, (RpcException)null); } else { try { // 执行服务方法 result = this.invoker.invoke(invocation); // 如果结果出现异常,则进行 mock 操作 if (result.getException() != null && result.getException() instanceof RpcException) { RpcException rpcException = (RpcException)result.getException(); if (rpcException.isBiz()) { throw rpcException; } result = this.doMockInvoke(invocation, rpcException); } } catch (RpcException var5) { // 如果出现异常,则进行 mock 操作 if (var5.isBiz()) { throw var5; } if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.getUrl(), var5); } // 执行 mock 操作 result = this.doMockInvoke(invocation, var5); } } } else { // 直接执行服务方法 // 无mock result = this.invoker.invoke(invocation); } return result; }
AbstractCluster$InterceptorInvokerNode.invoker
拦截器链的组成是:ConsumerContextClusterInterceptor -> ZoneAwareClusterInvoker。
在调用服务接口之前,ConsumerContextClusterInterceptor会负责设置上下文信息,以确保上下文在整个调用链中可用。
然后,调用interceptor.intercept方法来进行拦截处理。这个方法会依次调用拦截器链中的每个拦截器的intercept方法。
通过拦截器链的处理,可以在调用服务接口前后进行一些额外的操作,如参数校验、日志记录等。它提供了对服务调用过程的灵活控制和扩展能力。
public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { this.interceptor.before(this.next, invocation); asyncResult = this.interceptor.intercept(this.next, invocation); } ......
其中before方法是设置上下文信息,接着调用interceptor.interceppt方法进行拦截处理
ClusterInterceptor.intercept
调用ClusterInterceptor的默认方法。
default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException { return clusterInvoker.invoke(invocation); }
此时传递过来的clusterInvoker对象,是拦截器链中的第二个节点 ZoneAwareClusterInvoker
AbstractClusterInvoker.invoke
因为ZoneAwareClusterInvoker 中没有invoke方法,所以实际上是调用其父类的AbstractClusterInvoker.invoke
public Result invoke(final Invocation invocation) throws RpcException { this.checkWhetherDestroyed(); // 绑定attachment到invocation中 Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation)invocation).addObjectAttachments(contextAttachments); } //获取invoker列表,这里的列表应该是直接从directory中获取 List<Invoker<T>> invokers = this.list(invocation); //初始化负载均衡算法 LoadBalance loadbalance = this.initLoadBalance(invokers, invocation); //调用子类的doInvoke方法 RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation); return this.doInvoke(invocation, invokers, loadbalance); }
ZoneAwareClusterInvoker.doInvoke
ZonAwareCluster,就是之前我们说过的,如果一个服务注册在多个注册中心,那么消费者去消费时,会根据区域进行路由,选择一个注册中心进行服务消费。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { //遍历注册中心 Iterator var4 = invokers.iterator(); Invoker balancedInvoker; while(var4.hasNext()) { balancedInvoker = (Invoker)var4.next(); // 判断是否需要通过mockInvoker来触发调用 MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker)balancedInvoker; if (mockClusterInvoker.isAvailable() && mockClusterInvoker.getRegistryUrl().getParameter("registry.preferred", false)) { return mockClusterInvoker.invoke(invocation); } } // 是否制定了zone进行调用 String zone = invocation.getAttachment("registry_zone"); if (StringUtils.isNotEmpty(zone)) { Iterator var10 = invokers.iterator(); while(var10.hasNext()) { Invoker<T> invoker = (Invoker)var10.next(); MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker)invoker; if (mockClusterInvoker.isAvailable() && zone.equals(mockClusterInvoker.getRegistryUrl().getParameter("registry.zone"))) { return mockClusterInvoker.invoke(invocation); } } String force = invocation.getAttachment("registry_zone_force"); if (StringUtils.isNotEmpty(force) && "true".equalsIgnoreCase(force)) { throw new IllegalStateException("No registry instance in zone or no available providers in the registry, zone: " + zone + ", registries: " + (String)invokers.stream().map((invokerx) -> { return ((MockClusterInvoker)invokerx).getRegistryUrl().toString(); }).collect(Collectors.joining(","))); } } // 通过负载均衡算法,从多个注册中心中随机选择一个节点 balancedInvoker = this.select(loadbalance, invocation, invokers, (List)null); if (balancedInvoker.isAvailable()) {//进入到指定注册中心的服务列表进行调用 return balancedInvoker.invoke(invocation); } else { Iterator var13 = invokers.iterator(); MockClusterInvoker mockClusterInvoker; //如果没有一个invoker通过负载均衡算法被指定,则选择第一个有效的invoker进行调用。 do { if (!var13.hasNext()) { throw new RpcException("No provider available in " + invokers); } Invoker<T> invoker = (Invoker)var13.next(); mockClusterInvoker = (MockClusterInvoker)invoker; //选择指定的一个区域的invoker进行调用 } while(!mockClusterInvoker.isAvailable()); return mockClusterInvoker.invoke(invocation); } }
调用链路又会经过一遍 MockClusterInvoker - > AbstractCluster$InterceptorInvokerNode
AbstractCluster$InterceptorInvokerNode.invoker
再次进入到这个方法中,不过此时的调用链路发生了变化。
这个拦截器是的组成是: ConsumerContextClusterInterceptor -> FailoverClusterInvoker
继续进入到AbstractClusterInvoker中的invoke,但是此时AbstractClusterInvoker是通过
FailoverClusterInvoker来实现的,所以再次调用doInvoke时,会调用FailoverClusterInvoker中的doInvoke方法
AbstractClusterInvoker.invoke
因为FailoverClusterInvoker中没有invoke方法,所以实际上是调用其父类的AbstractClusterInvoker.invoke
public Result invoke(final Invocation invocation) throws RpcException { this.checkWhetherDestroyed(); // 绑定attachment到invocation中 Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation)invocation).addObjectAttachments(contextAttachments); } //获取invoker列表,这里的列表应该是直接从directory中获取 List<Invoker<T>> invokers = this.list(invocation); //初始化负载均衡算法 LoadBalance loadbalance = this.initLoadBalance(invokers, invocation); //调用子类的doInvoke方法 RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation); return this.doInvoke(invocation, invokers, loadbalance); }
FailoverClusterInvoker.doInvoke
FailoverClusterInvoker,顾名思义,就是集群容错的处理,默认的集群容错策略是重试,所以也不难猜出这里面的实现方式。
这段代码逻辑也很好理解,因为我们之前在讲Dubbo的时候说过容错机制,而failover是失败重试,所以这里面应该会实现容错的逻辑
- 获得重试的次数,并且进行循环
- 获得目标服务,并且记录当前已经调用过的目标服务防止下次继续将请求发送过去
- 如果执行成功,则返回结果
- 如果出现异常,判断是否为业务异常,如果是则抛出,否则,进行下一次重试
- 这里的 Invoker 是 Provider 的一个可调用 Service 的抽象, Invoker 封装了 Provider 地址及 Service 接口信息
- Directory 代表多个 Invoker ,可以把它看成 List ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
- Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker ,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
- Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡
- 算法,调用失败后,需要重选
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { //获取服务提供者的协议invoker List<Invoker<T>> copyInvokers = invokers; // 校验invoker this.checkInvokers(invokers, invocation); //获取调用的目标方法名 String methodName = RpcUtils.getMethodName(invocation); //获得重试次数 int len = this.getUrl().getMethodParameter(methodName, "retries", 2) + 1; if (len <= 0) { len = 1; } RpcException le = null; List<Invoker<T>> invoked = new ArrayList(invokers.size()); Set<String> providers = new HashSet(len); //for循环进行重试 for(int i = 0; i < len; ++i) { if (i > 0) { this.checkWhetherDestroyed(); copyInvokers = this.list(invocation); this.checkInvokers(copyInvokers, invocation); } //从多个invoker中通过负载均衡算法,选择一个inovke进行调用。 Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker);//记录已经调用过的目标服务,如果重试时,已经调用过的目标服务不再发起调用。 RpcContext.getContext().setInvokers(invoked); try { //发起远程调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + this.getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } Result var13 = result; return var13; } catch (RpcException var18) { if (var18.isBiz()) { throw var18; } le = var18; } catch (Throwable var19) { le = new RpcException(var19.getMessage(), var19); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + this.getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), (Throwable)(le.getCause() != null ? le.getCause() : le)); }
负载均衡算法
负载均衡初始
//初始化负载均衡算法 LoadBalance loadbalance = this.initLoadBalance(invokers, invocation); // 扩展点 protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) { return CollectionUtils.isNotEmpty(invokers) ? (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")) : (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random"); }
select
Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
- loadbalance 表示具体的负载均衡算法实例
- invocation 表示请求的参数
- invokers,表示服务提供者的实例列表,如果有多个,这里就是一个集合
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { // 如果服务提供者列表为空,返回null if (CollectionUtils.isEmpty(invokers)) { return null; } else { // 获取调用的方法名 String methodName = invocation == null ? "" : invocation.getMethodName(); // 获取sticky参数的值,默认为false boolean sticky = ((Invoker)invokers.get(0)).getUrl().getMethodParameter(methodName, "sticky", false); // 如果stickyInvoker不为空,并且它不在服务提供者列表中,则将stickyInvoker置为null。stickyInvoker是之前选中的服务提供者。 if (this.stickyInvoker != null && !invokers.contains(this.stickyInvoker)) { this.stickyInvoker = null; } // 如果sticky为true,并且stickyInvoker不为空,并且selected为空或者不包含stickyInvoker,并且进行可用性检查并且stickyInvoker是可用的,则返回stickyInvoker if (sticky && this.stickyInvoker != null && (selected == null || !selected.contains(this.stickyInvoker)) && this.availablecheck && this.stickyInvoker.isAvailable()) { return this.stickyInvoker; } else { // 如果不满足上述条件,则调用doSelect方法选择一个合适的服务提供者 Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected); // 如果sticky为true,则将选中的服务提供者赋值给stickyInvoker if (sticky) { this.stickyInvoker = invoker; } // 返回选中的服务提供者 return invoker; } } }
AbstractClusterInvoker.doSelect
- 如果invokers只有一个,则直接返回
- 否则,调用负载均衡算法获得一个目标invoker
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { // 如果服务提供者列表为空,返回null if (CollectionUtils.isEmpty(invokers)) { return null; // 如果只有一个服务提供者,直接返回该服务提供者 } else if (invokers.size() == 1) { return (Invoker)invokers.get(0); } else { // 使用负载均衡算法选择一个服务提供者 Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation); //如果selected集合中包含这次选择出来的invoker, 或这invoker是一个失效的服务,则重新选择一个新的invoker返回。 if (selected != null && selected.contains(invoker) || !invoker.isAvailable() && this.getUrl() != null && this.availablecheck) { try { // 重新选择一个合适的服务提供者 Invoker<T> rInvoker = this.reselect(loadbalance, invocation, invokers, selected, this.availablecheck); if (rInvoker != null) { invoker = rInvoker; } else { int index = invokers.indexOf(invoker); try { // 如果无法重新选择,则选择下一个服务提供者 invoker = (Invoker)invokers.get((index + 1) % invokers.size()); } catch (Exception var9) { logger.warn(var9.getMessage() + " may because invokers list dynamic change, ignore.", var9); } } } catch (Throwable var10) { logger.error("cluster reselect fail reason is :" + var10.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", var10); } } return invoker; } }
Invoker invoker = loadbalance.select(invokers, this.getUrl(), invocation);
因为前面介绍过,在负载均衡初始的时候,使用了扩展点,所以loadbalance 其实是 RandomLoadBalance
RandomLoadBalance.doSelect
执行随机负载均衡算法。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); boolean sameWeight = true; int[] weights = new int[length]; // 下面这个循环有两个作用,第一是计算总权重 totalWeight, // 第二是检测每个服务提供者的权重是否相同 int firstWeight = this.getWeight((Invoker)invokers.get(0), invocation); weights[0] = firstWeight; int totalWeight = firstWeight; int offset; int i; for(offset = 1; offset < length; ++offset) { i = this.getWeight((Invoker)invokers.get(offset), invocation); weights[offset] = i; // 累加权重 totalWeight += i; // 检测当前服务提供者的权重与上一个服务提供者的权重是否相同, // 不相同的话,则将 sameWeight 置为 false。 if (sameWeight && i != firstWeight) { sameWeight = false; } } // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上 if (totalWeight > 0 && !sameWeight) { // 随机获取一个 [0, totalWeight) 区间内的数字 offset = ThreadLocalRandom.current().nextInt(totalWeight); // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。 // 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset =7。 // 第一次循环,offset - 5 = 2 > 0,即 offset > 5, // 表明其不会落在服务器 A 对应的区间上。 // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8, // 表明其会落在服务器 B 对应的区间上 for(i = 0; i < length; ++i) { // 让随机值 offset 减去权重值 offset -= weights[i]; if (offset < 0) { // 返回相应的 Invoker return (Invoker)invokers.get(i); } } } // 如果所有服务提供者权重值相同,此时直接随机返回一个即可 return (Invoker)invokers.get(ThreadLocalRandom.current().nextInt(length)); }
以上代码也就是抽奖的核心思路。
抽奖
总结
两台服务器的通信,用最基本的常识去思考,如果要保证一定成功,对客户端来说就是不断重试,对服务端来说要避免多次重试所带来的数据变更的问题,要考虑幂等的问题。
深入理解Dubbo-7.服务消费调用源码分析(中):https://developer.aliyun.com/article/1414100