深入理解Dubbo-7.服务消费调用源码分析(中)

简介: 深入理解Dubbo-7.服务消费调用源码分析

深入理解Dubbo-7.服务消费调用源码分析(上):https://developer.aliyun.com/article/1414097


Invoker.invoke


继续回到FailoverClusterInvoker这个类中的代码来,这里会通过负载返回的invoker对象,来调用invoke方法进行远程通信。

//发起远程调用
Result result = invoker.invoke(invocation);

这个Invoker其实在 RegistryDirectory 的 toInvokers 方法中,对Invoker进行初始化时就定义好了。


RegistryDirectory.toInvokers


invoker = new RegistryDirectory.InvokerDelegate(this.protocol.refer(this.serviceType, url), url, providerUrl);

所以最终我们的Invoker对象实际上是 RegistryDirectory$InvokerDelegate() ,在debug过程中也能够发现这一点。


RegistryDirectory.InvokerDelegate


private static class InvokerDelegate<T> extends InvokerWrapper<T> {
        private URL providerUrl;
        public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {
            super(invoker, url);
            this.providerUrl = providerUrl;
        }
        public URL getProviderUrl() {
            return this.providerUrl;
        }
    }

但是其没有invoke方法,所以去其父类。

public class InvokerWrapper<T> implements Invoker<T> {
...
    public Result invoke(Invocation invocation) throws RpcException {
        return this.invoker.invoke(invocation);
    }
...
}

再回到这个

invoker = new RegistryDirectory.InvokerDelegate(this.protocol.refer(this.serviceType, url), url, providerUrl);

因为protocol 是自适应扩展点,所以会根据配置对其进行包装,具体可以扩展点SPI知识。

所以最终debug中,可以看到最终的invoker是


ProtocolFilterWrapper


在ProtocolFilterWrapper的调用中,实际会调用一个匿名内部类的invoke方法,这里构建了一个filter进行逐项的过滤

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        final Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); // 激活扩展点
        if (!filters.isEmpty()) {
            for(int i = filters.size() - 1; i >= 0; --i) {
                final Filter filter = (Filter)filters.get(i);
                last = new Invoker<T>() {
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(last, invocation);
                        } catch (Exception var15) {
                            Exception e = var15;
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = (ListenableFilter)filter;
                                try {
                                    Listener listenerx = listenableFilter.listener(invocation);
                                    if (listenerx != null) {
                                        listenerx.onError(e, invoker, invocation);
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof Listener) {
                                Listener listener = (Listener)filter;
                                listener.onError(var15, invoker, invocation);
                            }
                            throw var15;
                        } finally {
                            ;
                        }
                        return asyncResult.whenCompleteWithContext((r, t) -> {
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = (ListenableFilter)filter;
                                Listener listener = listenableFilter.listener(invocation);
                                try {
                                    if (listener != null) {
                                        if (t == null) {
                                            listener.onResponse(r, invoker, invocation);
                                        } else {
                                            listener.onError(t, invoker, invocation);
                                        }
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof Listener) {
                                Listener listenerx = (Listener)filter;
                                if (t == null) {
                                    listenerx.onResponse(r, invoker, invocation);
                                } else {
                                    listenerx.onError(t, invoker, invocation);
                                }
                            }
                        });
                    }
                    public void destroy() {
                        invoker.destroy();
                    }
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

而实际的Invoker其实通过debug是

在toInvokers这个方法中,invoker是通过 protocol.refer来构建的。那么我们再来分析一下refer里面做了什么?

protocol.refer(serviceType, url), url, providerUrl)

我们去看一下它的实现,首先protocol,是被依赖注入进来的自适应扩展点Protocol$Adaptive. ,此时传进去的数,此时url对应的地址应该是dubbo://开头的协议地址,所以最终获得的是通过包装之后的DubboProtocol象。

QosProtocolWrapper(ProtocolFilterWrapper(ProtocolListenerWrapper(DubboProtocol)))


AbstractProtocol.refer


DubboProtocol中没有refer方法,而是调用父类的refer。

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}


AsyncToSyncInvoker.invoke


经过装饰器、过滤器对invoker进行增强和过滤之后,来到了AsyncToSyncInvoker.invoke方法,这里采用的是异步的方式来进行通信

public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult = this.invoker.invoke(invocation);
        try {
            //如果配置的是同步通信,则通过get阻塞式获取返回结果
            if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) {
                asyncResult.get(2147483647L, TimeUnit.MILLISECONDS);
            }
            return asyncResult;
        } 
    ......
    }


DubboInvoker.invoke


DubboInvoker继承了AbstractInvoker这个抽象类,而DubboInvoker中没有invoke这个方法,所以这里调用的是AbstractInvoker.invoke方法。


进入到DubboInvoker这个方法中,那么意味着正式进入到服务通信层面了。前面的很多细节分析,无非就是做了三件事


  • 多注册中心的拦截以及分发
  • 负载均衡以及集群容错
  • 请求过滤和包装
public Result invoke(Invocation inv) throws RpcException {
      // 检查当前 Invoker 是否已经被销毁
        if (this.destroyed.get()) {
            this.logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, , dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
    // 将 Invocation 类型转换为 RpcInvocation 类型
        RpcInvocation invocation = (RpcInvocation)inv;
      // 设置当前 Invoker 对象到 RpcInvocation 中
        invocation.setInvoker(this);
      // 如果附加属性不为空,则添加到 RpcInvocation 对象中
        if (CollectionUtils.isNotEmptyMap(this.attachment)) {
            invocation.addObjectAttachmentsIfAbsent(this.attachment);
        }
    // 将 RpcContext 中的上下文附加属性添加到 RpcInvocation 对象中
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            invocation.addObjectAttachments(contextAttachments);
        }
    // 设置调用模式到 RpcInvocation 中
        invocation.setInvokeMode(RpcUtils.getInvokeMode(this.url, invocation));
      // 如果是异步调用,则在 RpcInvocation 中附加调用 ID
        RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
        AsyncRpcResult asyncResult;
        try {
            // 调用 doInvoke 方法进行远程调用
            asyncResult = (AsyncRpcResult)this.doInvoke(invocation);
        } catch (InvocationTargetException var7) {
            Throwable te = var7.getTargetException();
            if (te == null) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, var7, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException)te).setCode(3);
                }
                asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, te, invocation);
            }
        } catch (RpcException var8) {
            if (!var8.isBiz()) {
                throw var8;
            }
            asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, var8, invocation);
        } catch (Throwable var9) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, var9, invocation);
        }
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;
    }


DubboInvoker.doInvoke


调用doInvoke方法发起远程请求。

protected Result doInvoke(final Invocation invocation) throws Throwable {
    // 将 Invocation 类型转换为 RpcInvocation 类型
    RpcInvocation inv = (RpcInvocation)invocation;
    // 获取调用方法名,并将 path 和 version 附加到 RpcInvocation 中
    String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment("path", this.getUrl().getPath());
    inv.setAttachment("version", this.version);
    ExchangeClient currentClient;
    // 从 clients 中选择一个 ExchangeClient 进行远程调用
    if (this.clients.length == 1) {
        currentClient = this.clients[0];
    } else {
        // 轮询
        currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
    }
    try {
        // 判断是否是单向调用
        boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
        // 计算超时时间
        int timeout = this.calculateTimeout(invocation, methodName);
        if (isOneway) {
            // 如果是单向调用,则将 RpcInvocation 对象发送到服务端
            boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
            currentClient.send(inv, isSent);
            // 返回 AsyncRpcResult,表示调用结果还未返回
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            // 否则,创建一个线程池 executor,并将执行结果封装到 CompletableFuture<AppResponse> 对象中
            ExecutorService executor = this.getCallbackExecutor(this.getUrl(), inv);
            CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply((obj) -> {
                return (AppResponse)obj;
            });
            // 将 FutureContext 中的 Future 对象设置为 appResponseFuture
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            // 返回 AsyncRpcResult,表示调用结果还未返回
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (Throwable e) {
        ......
    }
}

currentClient还记得是一个什么对象吗?


在DubboProtocol里的initClient()中

private ExchangeClient initClient(URL url) {
        .....
                Object client;
                if (url.getParameter("lazy", false)) {
                    client = new LazyConnectExchangeClient(url, this.requestHandler);
                } else {
                    client = Exchangers.connect(url, this.requestHandler);
                }
               ......
        }
    }
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
       .....
            url = url.addParameterIfAbsent("codec", "exchange");
            return getExchanger(url).connect(url, handler);
        }
    }
  public static Exchanger getExchanger(URL url) {
        String type = url.getParameter("exchanger", "header");
        return getExchanger(type);
    }
    public static Exchanger getExchanger(String type) {
        return (Exchanger)ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

由于是扩展点,所以相当于它实际是一个ReferenceCountExchangeClient(HeaderExchangeClient())


所以它的调用链路是


ReferenceCountExchangeClient->HeaderExchangeClient->HeaderExchangeChannel->(request方法)


ReferenceCountExchangeClient.request


最终,把构建好的RpcInvocation,组装到一个Request对象中进行传递

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        return this.client.request(request, timeout, executor);
    }
  • ReferenceCountExchangeClient 用来记录调用次数
  • HeaderExchangeClient 用来开启心跳机制、以及启动失败重连任务


HeaderExchangeClient.request


public HeaderExchangeClient(Client client, boolean startTimer) {
        Assert.notNull(client, "Client can't be null");
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);
        if (startTimer) {
            URL url = client.getUrl();
            this.startReconnectTask(url);
            this.startHeartBeatTask(url);
        }
    }
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        return this.channel.request(request, timeout, executor);
    }


HeaderExchangeChannel.request


进入到HeaderExchangeChannel.request 来发起请求,这个类的主要职责就是和服务端进行数据交互

 public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (this.closed) {
            throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        } else {
            // 创建请求对象
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = DefaultFuture.newFuture(this.channel, req, timeout, executor);
            try {
                //发送请求
                this.channel.send(req);
                return future;
            } catch (RemotingException var7) {
                future.cancel();
                throw var7;
            }
        }
    }


服务端接收数据的处理流程


客户端请求发出去之后,服务端会收到这个请求的消息,然后触发调用。


服务端接收到消息


服务端这边接收消息的处理链路,也比较复杂,我们回到NettServer中创建io的过程。

bootstrap.group(bossGroup, workerGroup)
  .channel(NettyEventLoopFactory.serverSocketChannelClass())
  .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
  .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
      // FIXME: should we use getTimeout()?
      int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
      NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),getUrl(), NettyServer.this);
      if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
        ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
      }
      ch.pipeline()
        .addLast("decoder", adapter.getDecoder())
        .addLast("encoder", adapter.getEncoder())
        .addLast("server-idle-handler", new IdleStateHandler(0, 0,
idleTimeout, MILLISECONDS))
        .addLast("handler", nettyServerHandler);
      }
    });

服务端启动的时候,配置的消息处理是handler配置的是nettyServerHandler

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);

所以,服务端收到消息之后,会调用NettyServerHandler中的channelRead方法


深入理解Dubbo-7.服务消费调用源码分析(下):https://developer.aliyun.com/article/1414106

目录
相关文章
|
29天前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
30 0
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo服务暴露机制解密:深入探讨服务提供者的奥秘【九】
Dubbo服务暴露机制解密:深入探讨服务提供者的奥秘【九】
23 0
|
2月前
|
缓存 运维 监控
Dubbo服务降级:保障稳定性的终极指南【六】
Dubbo服务降级:保障稳定性的终极指南【六】
36 0
|
3月前
|
Dubbo Java 应用服务中间件
Spring Boot Dubbo 构建分布式服务
Spring Boot Dubbo 构建分布式服务
47 0
|
3月前
|
存储 负载均衡 监控
深入理解Dubbo-6.服务消费源码分析(下)
深入理解Dubbo-6.服务消费源码分析
33 0
|
1月前
|
SpringCloudAlibaba Dubbo Java
SpringCloud Alibaba集成Dubbo实现远程服务间调用
SpringCloud Alibaba集成Dubbo实现远程服务间调用
|
29天前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
39 0
|
29天前
|
Kubernetes Dubbo 应用服务中间件
【Dubbo3终极特性】「流量治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin
【Dubbo3终极特性】「流量治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin
52 0
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo 第四节: Spring与Dubbo整合原理与源码分析
DubboConfigConfigurationRegistrar的主要作⽤就是对propties⽂件进⾏解析并根据不同的配置项项⽣成对应类型的Bean对象。
|
3月前
|
Dubbo Java 应用服务中间件
Dubbo 3.x结合Zookeeper实现远程服务基本调用
ZooKeeper和Dubbo是两个在分布式系统中常用的开源框架,它们可以协同工作,提供服务注册与发现、分布式协调等功能。