前几篇文章讲述了 Dubbo 的注册和消费的流程、今天继续接着说下调用的流程(只谈及单注册中心、多注册中心只是前面多了一步、根据区域或者配置选择出其中一个注册中心而已)
我们在消费者端调用就是该代理工厂返回的对象
@Override public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Set<Class<?>> interfaces = new HashSet<>(); String config = invoker.getUrl().getParameter(INTERFACES); if (config != null && config.length() > 0) { String[] types = COMMA_SPLIT_PATTERN.split(config); for (String type : types) { // TODO can we load successfully for a different classloader?. interfaces.add(ReflectUtils.forName(type)); } } if (generic) { ......... } interfaces.add(invoker.getInterface()); interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES)); return getProxy(invoker, interfaces.toArray(new Class<?>[0])); } JavassistProxyFactory @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } 复制代码
invoker.getInterface() 返回的就是我们引用的类型
DubboProtocol
@Override public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } public Class<T> getInterface() { return type; } 复制代码
Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
这个就是我们熟悉的 JDK 动态代理。InvokerInvocationHandler 的 invoke
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return invoker.toString(); } else if ("$destroy".equals(methodName)) { invoker.destroy(); return null; } else if ("hashCode".equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); if (consumerModel != null) { rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel); rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method)); } return invoker.invoke(rpcInvocation).recreate(); } 复制代码
这个 Invoker 我们从消费者启动流程中可以知道、最外层是 MockClusterInvoker
用于服务的降级熔断
- 正常调用
- 熔断、直接使用 mock 相关配置
- 降级、服务提供方异常后使用 mock 相关配置
@Override public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || "false".equalsIgnoreCase(value)) { //no mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); //fix:#4585 if(result.getException() != null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if(rpcException.isBiz()){ throw rpcException; }else { result = doMockInvoke(invocation, rpcException); } } } catch (RpcException e) { if (e.isBiz()) { throw e; } result = doMockInvoke(invocation, e); } } return result; } 复制代码
private Result doMockInvoke(Invocation invocation, RpcException e) { Result result = null; Invoker<T> minvoker; List<Invoker<T>> mockInvokers = selectMockInvoker(invocation); if (CollectionUtils.isEmpty(mockInvokers)) { minvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface()); } else { minvoker = mockInvokers.get(0); } try { result = minvoker.invoke(invocation); } catch (RpcException me) { if (me.isBiz()) { result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation); } else { throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause()); } } catch (Throwable me) { throw new RpcException(getMockExceptionMessage(e, me), me.getCause()); } return result; } 复制代码
再看一下他是如何选择出 invoker 的
private List<Invoker<T>> selectMockInvoker(Invocation invocation) { List<Invoker<T>> invokers = null; //TODO generic invoker? if (invocation instanceof RpcInvocation) { //Note the implicit contract (although the description is added to the interface declaration, but extensibility is a problem. The practice placed in the attachment needs to be improved) ((RpcInvocation) invocation).setAttachment(INVOCATION_NEED_MOCK, Boolean.TRUE.toString()); //directory will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return. try { invokers = directory.list(invocation); } catch (RpcException e) { if (logger.isInfoEnabled()) { logger.info("Exception when try to invoke mock. Get mock invokers error for service:" + getUrl().getServiceInterface() + ", method:" + invocation.getMethodName() + ", will construct a new mock with 'new MockInvoker()'.", e); } } } return invokers; } 复制代码
这里设置了一个 attachment ((RpcInvocation) invocation).setAttachment(INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
我们再来看一下 MockInvokersSelector、入参 invokers 是我们在 RegistryDireotry 中正常的 invoker
@Override public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, URL url, final Invocation invocation) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return invokers; } if (invocation.getObjectAttachments() == null) { return getNormalInvokers(invokers); } else { String value = (String) invocation.getObjectAttachments().get(INVOCATION_NEED_MOCK); if (value == null) { return getNormalInvokers(invokers); } else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { return getMockedInvokers(invokers); } } return invokers; } 复制代码
因为我们设置了 INVOCATION_NEED_MOCK 的值为 true
private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) { if (!hasMockProviders(invokers)) { return null; } List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1); for (Invoker<T> invoker : invokers) { if (invoker.getUrl().getProtocol().equals(MOCK_PROTOCOL)) { sInvokers.add(invoker); } } return sInvokers; } 复制代码
找到以 mock 作为协议的 invoker
如果没有的话、则会到上方调用方那里、自己去创建一个 MockInvoker。然后根据配置信息返回
if (CollectionUtils.isEmpty(mockInvokers)) { minvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface()); 复制代码
如果不是 mock 的话、我们再进入到 org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster.InterceptorInvokerNode 的 invoke 方法中
@Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { interceptor.before(next, invocation); asyncResult = interceptor.intercept(next, invocation); } catch (Exception e) { // onError callback if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; listener.onError(e, clusterInvoker, invocation); } throw e; } finally { interceptor.after(next, invocation); } return asyncResult.whenCompleteWithContext((r, t) -> { // onResponse callback if (interceptor instanceof ClusterInterceptor.Listener) { ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; if (t == null) { listener.onMessage(r, clusterInvoker, invocation); } else { listener.onError(t, clusterInvoker, invocation); } } }); } 复制代码
这里的拦截器最终会调用 FailoverClusterInvoker 的 invoker 方法
FailoverClusterInvoker 的父类 AbstractClusterInvoker
@Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); // binding attachments into invocation. Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addObjectAttachments(contextAttachments); } List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } 复制代码
这里的 list 会默认返回所有注册的服务提供者列表
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { return directory.list(invocation); } 复制代码
initLoadBalance 则会选择出一个负载均衡策略、默认是随机
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) { if (CollectionUtils.isNotEmpty(invokers)) { return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE)); } else { return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE); } } 复制代码
然后进入到 FailoverClusterInvoker 的 doInvoke 中
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode()......); } 复制代码
Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);
这个就是从一堆 Invoker 中选出一个、这里涉及到失败重试的机制、如果不是业务异常、那么就会选择另一个 Invoker 进行重试
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null; } String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl() .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY); //ignore overloaded method if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); if (sticky) { stickyInvoker = invoker; } return invoker; } 复制代码
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return null; } if (invokers.size() == 1) { return invokers.get(0); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rInvoker != null) { invoker = rInvoker; } else { int index = invokers.indexOf(invoker); try { //Avoid collision invoker = invokers.get((index + 1) % invokers.size()); } catch (Exception e) { } } } catch (Throwable t) { } } return invoker; } 复制代码
selected.contains(invoker) selected 是已经调用过的 invoker、参数 invoker 则是新选出来的。如果已经是选过的 invoker 的、那么就 reselect
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException { //Allocating one in advance, this list is certain to be used. List<Invoker<T>> reselectInvokers = new ArrayList<>( invokers.size() > 1 ? (invokers.size() - 1) : invokers.size()); // First, try picking a invoker not in `selected`. for (Invoker<T> invoker : invokers) { if (availablecheck && !invoker.isAvailable()) { continue; } if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } // Just pick an available invoker using loadbalance policy if (selected != null) { for (Invoker<T> invoker : selected) { if ((invoker.isAvailable()) // available first && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } return null; } 复制代码
逻辑也是比较简单、有没调用过的就用没调用过的、没有只好选择已经调用过但是是可用状态的
我们继续往下调用。这个是 RegistryDiretory 的部分代码、详细可以看上几篇文章
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); 复制代码
InvokerDelegate 其实就是个代理类、啥都不做
@Override public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); } 复制代码
protocol.refer(serviceType, url) 则会被几个 Wrapper 类所装饰
ProtocolFilterWrapper 会调用所有的 Filter 接口方法、然后才回调用下一个 Invoker
ListenerInvokerWrapper 则是将 invoker 给到 listener、看实现类基本是没啥用的这个 Listener
经过完 Wrapper 之后、来到了 AsyncToSyncInvoker
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); } 复制代码
顾名思义、就是一步转同步的
@Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { ..... } return asyncResult; } 复制代码
最后终于来到了 DubboInvoker
@Override public Result invoke(Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addObjectAttachmentsIfAbsent(attachment); } Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addObjectAttachments(contextAttachments); } invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); AsyncRpcResult asyncResult; try { asyncResult = (AsyncRpcResult) doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception ....... } RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture())); return asyncResult; } 复制代码
这里就通过 Netty 与 provider 进行通信了。currentClient.request
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { ..... } } 复制代码
整个调用链的流程大致就是如此、整个流程事实上就是消费者端创建代理的过程