Dubbo服务调用过程

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
简介: 前几篇文章讲述了 Dubbo 的注册和消费的流程、今天继续接着说下调用的流程(只谈及单注册中心、多注册中心只是前面多了一步、根据区域或者配置选择出其中一个注册中心而已)

前几篇文章讲述了 Dubbo 的注册和消费的流程、今天继续接着说下调用的流程(只谈及单注册中心、多注册中心只是前面多了一步、根据区域或者配置选择出其中一个注册中心而已)

网络异常,图片无法展示
|


我们在消费者端调用就是该代理工厂返回的对象

@Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Set<Class<?>> interfaces = new HashSet<>();
    String config = invoker.getUrl().getParameter(INTERFACES);
    if (config != null && config.length() > 0) {
        String[] types = COMMA_SPLIT_PATTERN.split(config);
        for (String type : types) {
            // TODO can we load successfully for a different classloader?.
            interfaces.add(ReflectUtils.forName(type));
        }
    }
    if (generic) {
      .........
    }
    interfaces.add(invoker.getInterface());
    interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
    return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
}
JavassistProxyFactory
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
  return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
复制代码

invoker.getInterface() 返回的就是我们引用的类型


DubboProtocol

@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
  optimizeSerialization(url);
  // create rpc invoker.
  DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  invokers.add(invoker);
  return invoker;
}
public Class<T> getInterface() {
    return type;
}
复制代码

Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));


这个就是我们熟悉的 JDK 动态代理。InvokerInvocationHandler 的 invoke

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (parameterTypes.length == 0) {
        if ("toString".equals(methodName)) {
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
            return invoker.hashCode();
        }
    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
        return invoker.equals(args[0]);
    }
    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
    String serviceKey = invoker.getUrl().getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);
    if (consumerModel != null) {
        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
    }
    return invoker.invoke(rpcInvocation).recreate();
}
复制代码


这个 Invoker 我们从消费者启动流程中可以知道、最外层是 MockClusterInvoker

用于服务的降级熔断

  • 正常调用
  • 熔断、直接使用 mock 相关配置
  • 降级、服务提供方异常后使用 mock 相关配置
@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
            //fix:#4585
            if(result.getException() != null && result.getException() instanceof RpcException){
                RpcException rpcException= (RpcException)result.getException();
                if(rpcException.isBiz()){
                    throw  rpcException;
                }else {
                    result = doMockInvoke(invocation, rpcException);
                }
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}
复制代码
private Result doMockInvoke(Invocation invocation, RpcException e) {
    Result result = null;
    Invoker<T> minvoker;
    List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
    if (CollectionUtils.isEmpty(mockInvokers)) {
        minvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface());
    } else {
        minvoker = mockInvokers.get(0);
    }
    try {
        result = minvoker.invoke(invocation);
    } catch (RpcException me) {
        if (me.isBiz()) {
            result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
        } else {
            throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
        }
    } catch (Throwable me) {
        throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
    }
    return result;
}
复制代码


再看一下他是如何选择出 invoker 的

private List<Invoker<T>> selectMockInvoker(Invocation invocation) {
    List<Invoker<T>> invokers = null;
    //TODO generic invoker?
    if (invocation instanceof RpcInvocation) {
        //Note the implicit contract (although the description is added to the interface declaration, but extensibility is a problem. The practice placed in the attachment needs to be improved)
        ((RpcInvocation) invocation).setAttachment(INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
        //directory will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return.
        try {
            invokers = directory.list(invocation);
        } catch (RpcException e) {
            if (logger.isInfoEnabled()) {
                logger.info("Exception when try to invoke mock. Get mock invokers error for service:"
                        + getUrl().getServiceInterface() + ", method:" + invocation.getMethodName()
                        + ", will construct a new mock with 'new MockInvoker()'.", e);
            }
        }
    }
    return invokers;
}
复制代码

这里设置了一个 attachment ((RpcInvocation) invocation).setAttachment(INVOCATION_NEED_MOCK, Boolean.TRUE.toString());


我们再来看一下 MockInvokersSelector、入参 invokers 是我们在 RegistryDireotry 中正常的 invoker

@Override
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
                                  URL url, final Invocation invocation) throws RpcException {
    if (CollectionUtils.isEmpty(invokers)) {
        return invokers;
    }
    if (invocation.getObjectAttachments() == null) {
        return getNormalInvokers(invokers);
    } else {
        String value = (String) invocation.getObjectAttachments().get(INVOCATION_NEED_MOCK);
        if (value == null) {
            return getNormalInvokers(invokers);
        } else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
            return getMockedInvokers(invokers);
        }
    }
    return invokers;
}
复制代码


因为我们设置了  INVOCATION_NEED_MOCK 的值为 true

private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) {
    if (!hasMockProviders(invokers)) {
        return null;
    }
    List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1);
    for (Invoker<T> invoker : invokers) {
        if (invoker.getUrl().getProtocol().equals(MOCK_PROTOCOL)) {
            sInvokers.add(invoker);
        }
    }
    return sInvokers;
}
复制代码

找到以 mock 作为协议的 invoker


如果没有的话、则会到上方调用方那里、自己去创建一个 MockInvoker。然后根据配置信息返回

if (CollectionUtils.isEmpty(mockInvokers)) {
    minvoker = (Invoker<T>) new MockInvoker(getUrl(), directory.getInterface());  
复制代码

网络异常,图片无法展示
|


如果不是 mock 的话、我们再进入到 org.apache.dubbo.rpc.cluster.support.wrapper.AbstractCluster.InterceptorInvokerNode 的 invoke 方法中

@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        interceptor.before(next, invocation);
        asyncResult = interceptor.intercept(next, invocation);
    } catch (Exception e) {
        // onError callback
        if (interceptor instanceof ClusterInterceptor.Listener) {
            ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
            listener.onError(e, clusterInvoker, invocation);
        }
        throw e;
    } finally {
        interceptor.after(next, invocation);
    }
    return asyncResult.whenCompleteWithContext((r, t) -> {
        // onResponse callback
        if (interceptor instanceof ClusterInterceptor.Listener) {
            ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
            if (t == null) {
                listener.onMessage(r, clusterInvoker, invocation);
            } else {
                listener.onError(t, clusterInvoker, invocation);
            }
        }
    });
}
复制代码


这里的拦截器最终会调用 FailoverClusterInvoker 的 invoker 方法

FailoverClusterInvoker 的父类 AbstractClusterInvoker

@Override
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    // binding attachments into invocation.
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
    List<Invoker<T>> invokers = list(invocation);
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
复制代码


这里的 list 会默认返回所有注册的服务提供者列表

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
    return directory.list(invocation);
}
复制代码


initLoadBalance 则会选择出一个负载均衡策略、默认是随机

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    }
}
复制代码


然后进入到 FailoverClusterInvoker 的 doInvoke 中

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode()......);
}
复制代码

Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);


这个就是从一堆 Invoker 中选出一个、这里涉及到失败重试的机制、如果不是业务异常、那么就会选择另一个 Invoker 进行重试

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
    //ignore overloaded method
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}
复制代码
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rInvoker != null) {
                invoker = rInvoker;
            } else {
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                }
            }
        } catch (Throwable t) {
        }
    }
    return invoker;
}
复制代码

selected.contains(invoker)  selected 是已经调用过的 invoker、参数 invoker 则是新选出来的。如果已经是选过的 invoker 的、那么就 reselect

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
    //Allocating one in advance, this list is certain to be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
    // First, try picking a invoker not in `selected`.
    for (Invoker<T> invoker : invokers) {
        if (availablecheck && !invoker.isAvailable()) {
            continue;
        }
        if (selected == null || !selected.contains(invoker)) {
            reselectInvokers.add(invoker);
        }
    }
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }
    // Just pick an available invoker using loadbalance policy
    if (selected != null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) // available first
                    && !reselectInvokers.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
    }
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }
    return null;
}
复制代码


逻辑也是比较简单、有没调用过的就用没调用过的、没有只好选择已经调用过但是是可用状态的

我们继续往下调用。这个是 RegistryDiretory 的部分代码、详细可以看上几篇文章

invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
复制代码


InvokerDelegate 其实就是个代理类、啥都不做

@Override
public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}
复制代码

protocol.refer(serviceType, url) 则会被几个 Wrapper 类所装饰

ProtocolFilterWrapper 会调用所有的 Filter 接口方法、然后才回调用下一个 Invoker

ListenerInvokerWrapper 则是将 invoker 给到 listener、看实现类基本是没啥用的这个 Listener


经过完 Wrapper 之后、来到了 AsyncToSyncInvoker

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
复制代码


顾名思义、就是一步转同步的

@Override
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);
    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
      .....
    }
    return asyncResult;
}
复制代码


最后终于来到了 DubboInvoker

@Override
public Result invoke(Invocation inv) throws RpcException {
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (CollectionUtils.isNotEmptyMap(attachment)) {
        invocation.addObjectAttachmentsIfAbsent(attachment);
    }
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
        invocation.addObjectAttachments(contextAttachments);
    }
    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    AsyncRpcResult asyncResult;
    try {
        asyncResult = (AsyncRpcResult) doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
      .......
    }
    RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
    return asyncResult;
}
复制代码


这里就通过 Netty 与 provider 进行通信了。currentClient.request

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
       .....
    }
}
复制代码

整个调用链的流程大致就是如此、整个流程事实上就是消费者端创建代理的过程



相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
7月前
|
Dubbo Java 应用服务中间件
从源码全面解析 dubbo 服务端服务调用的来龙去脉
从源码全面解析 dubbo 服务端服务调用的来龙去脉
|
4月前
|
负载均衡 Dubbo 应用服务中间件
Dubbo服务调用过程原理
该文章主要介绍了Dubbo服务调用过程的原理,包括服务调用的主要阶段和服务调用的具体步骤。
Dubbo服务调用过程原理
|
自然语言处理 Dubbo 应用服务中间件
Higress和dubbo-go-pixiu都与网关和服务调用有关
Higress和dubbo-go-pixiu都与网关和服务调用有关
125 1
|
7月前
|
缓存 负载均衡 Dubbo
从源码全面解析 dubbo 消费端服务调用的来龙去脉
从源码全面解析 dubbo 消费端服务调用的来龙去脉
|
XML 编解码 JSON
不满足于RPC,详解Dubbo的服务调用链路(2)
不满足于RPC,详解Dubbo的服务调用链路
230 0
|
负载均衡 Dubbo 应用服务中间件
不满足于RPC,详解Dubbo的服务调用链路(1)
不满足于RPC,详解Dubbo的服务调用链路
365 0
|
开发框架 Dubbo 网络协议
SpringCloud + Gateway(网关) + Nacos(注册中心+配置中心)+ Dubbo(内部服务调用)
SpringCloud + Gateway(网关) + Nacos(注册中心+配置中心)+ Dubbo(内部服务调用)
2174 0
SpringCloud + Gateway(网关) + Nacos(注册中心+配置中心)+ Dubbo(内部服务调用)
|
Dubbo Java 应用服务中间件
我发现一个关于Dubbo服务调用的一个BUG
我发现一个关于Dubbo服务调用的一个BUG
我发现一个关于Dubbo服务调用的一个BUG
|
负载均衡 监控 Dubbo
分布式RPC服务调用框架选型:使用Dubbo实现分布式服务调用
本文是一篇详细介绍分布式RPC调用框架Dubbo的文章,介绍了Dubbo服务治理和服务调用的实现。分析了Dubbo中的核心功能,包括Remoting,Cluster和RetRegistry的作用和功能。详细说明了Dubbo中几个角色以及各个角色之间的调用关系。通过这篇文章,可以快速了解Dubbo框架的基本面貌和重要原理,为以后更加深入细致的学习RPC调用框架做出准备。
392 0
分布式RPC服务调用框架选型:使用Dubbo实现分布式服务调用
|
监控 Dubbo 应用服务中间件
这么说吧,dubbo很简单,其实就是一个远程服务调用的框架
极简教程,五分钟快速入门之dubbo,为后面的dubbo实战以及dubbo源码分析做铺垫。 一、dubbo是什么? 1)本质:一个Jar包,一个分布式框架,,一个远程服务调用的分布式框架。