不满足于RPC,详解Dubbo的服务调用链路(2)

简介: 不满足于RPC,详解Dubbo的服务调用链路

三、网络通信

1. Exchange

在进行通讯的时候,Dubbo定义了Exchange这样的部分,也就是说提供了一种统一的消息交换机,使得服务消费者和提供者可以进行可靠的通信。它封装了底层的网络细节,处理网络的传输、编解码、序列化和反序列化等操作。


通过Exchange,Dubbo可以支持多种通信协议(如Dubbo, HTTP ,RMI)和序列化协议(如Hession, JSON, Protobuf)等。其主要有两个接口和其实现类组成


  • ExchangeClient(接口):用于在消费者端发送请求和接收响应。它定义了发送请求、接收响应和关闭连接等方法。

  • ExchangeServer(接口):在提供者端用于接收请求并发送响应。它定义了启动服务器、接收请求、发送响应和关闭服务器等方法。

  • Exchanger(SPI接口):是Exchange的核心接口,用于创建客户端和服务器端的实例。

  • HeaderExchangeClient:是ExchangeClient接口的默认实现类,负责在消费者端处理请求和响应的编解码、序列化和反序列化等操作。

  • HeaderExchangeServer:是ExchangeServer接口的默认实现类,负责在提供者端处理请求和响应的编解码、序列化和反序列化等操作


2. Netty

当然,Dubbo并没有真正复写底层的通信代码,而是采用了另一个专业的网络框架——Netty,这个框架笔者看源码是也是深感佩服,对性能的强调,要求是非常高的。Dubbo 则是嫁接在其之上,利用 Netty 的客户端与服务端进行通信的,当然,作为发送请求的消费者,自然是客户端。而接收请求的服务提供方,则是服务端(概念上的)

259a23505a27411e9c904e61a07a81e3.png

当然,关于Netty的部分,我们不可能在这里进行讲解,我们只需要知道,Dubbo的通信底层是调用的高性能网络通信框架Netty即可,关于Netty框架的讲解,我们会在一个新的系列中进行。


四、服务执行

1. 消息接收

其实通过上面的网络章节就能看出来。真正的通信都是依赖于Netty实现的,因此在消息接收上,依赖的也是Netty的机制。如下图,Dubbo内置着Netty的服务端

60d1c2876b2a42179e5ba96ef12ecab1.png

那抛去这一部分。那Dubbo本身做了怎样的准备呢?消息经由一个分发器Dispatcher(SPI接口)的自适应实现类实现分发后,然后由一个handler链来处理。

fe9acd8c114c478ca31cf7af3ee21219.png

至此,我们就看到了exchange层次的另一个重要类 HeaderExchangeHandler,当消息发送来时,会触发其执行 received - handleRequest 方法

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    Object msg = req.getData();
    try {
      // 此处的handler 为 DubboProtocol.ExchangeHandler
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 此处为返回响应
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "Send result to consumer failed, channel is " + channel + ", msg is " + e);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

2. 获取服务并调用

DubboProtocol.ExchangeHandler.reply 中有一个重要方法

Invoker<?> invoker = getInvoker(channel, inv); 

我们仔细来看看这个方法:DubboProtocol.getInvoker。

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
      // ...
        String serviceKey = serviceKey(
            port,
            path,
            (String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),
            (String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY)
        );
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));
        }
        return exporter.getInvoker();
    }

如果看过之前的 Dubbo最核心功能——服务暴露的配置、使用及原理 话,看到 exporterMap 就已经明白了,我们曾经说过,服务的暴露最终就存在这样的 exporterMap 里,键为接口信息,此处为com.zhanfu.dubbo.api.DemoService:20880; 值就是对该接口实现类的包装

e4668b8b4dfb47cf8370a95c35af07ad.png

e0eb5d8081d14b4a8516969df6c64e83.png

那么没有悬念的,最终拿到了我们在服务暴露时生成的exporter,如果你没忘,当时我们举得是injvm的例子,这次我们是在进行真实的RPC调用,当然,他们invoker生成的形式是一样的,都是在 ServiceConfig.doExportUrl 里


// 前情回顾

private void doExportUrl(URL url, boolean withMetaData) {

Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); if (withMetaData) { invoker = new DelegateProviderMetaDataInvoker(invoker, this); } Exporter exporter = protocolSPI.export(invoker);

exporters.add(exporter); }


而此刻,我们获得了这个invoker,一般情况下,这个 invoker 也是被FilterChain层层包裹着的,比如我们在消费端就见过的 CallbackRegistrationInvoker ,以及profileServerFilter 、EchoFilter等等很多中间层

8e04db7a59b64934a1edc68ca059b198.png

最终,到达了我们想要的那一层,也就是所谓的代理Invoker —— AbstractProxyInvoker,这个Invoker对象内包含了我们服务的实现类,如下图:

b4e3a0147e324c2a96c141617b59499b.png

而该代理其实是由javassistProxyFactory负责创建的,所以这个代理的调用方法是早就被定义好的,就是执行目标方法的调用,而所谓目标自然就是我们的服务实现类

return new AbstractProxyInvoker<T>(proxy, type, url) {
    @Override
    protected Object doInvoke(T proxy, String methodName,
                              Class<?>[] parameterTypes,
                              Object[] arguments) throws Throwable {
        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    }
}


e2a0de090ece4c9989a7fb3bea7c6b47.png

五、返回结果

1. 提供方返回

终于走到了整个流程的最后一个步骤,也就是服务执行完的结果返回。

对于服务提供者来说,要做的就是把刚刚的嵌套再走一遍,栈帧一层层退出并把结果返回。直到我们的Exchange层,我们再来看一遍 HeaderExchangeHandler 的核心代码

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
  // ...
  CompletionStage<Object> future = handler.reply(channel, msg);
  // 在异步任务完成后会触发下列方法,最终返回数据
  future.whenComplete((appResult, t) -> {
      try {
          if (t == null) {
             res.setStatus(Response.OK);
              res.setResult(appResult);
          } else {
              res.setStatus(Response.SERVICE_ERROR);
              res.setErrorMessage(StringUtils.toString(t));
          }
          channel.send(res);
      } catch (RemotingException e) {
          logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "Send result to consumer failed, channel is " + channel + ", msg is " + e);
      }
  });
  // ...
}

很直观的可以看见,处理完的结果会用同一个channel往回进行消息发送。发送的内容就包含了方法的返回值mResult。


091809709e0e4e7caa7b50ff2c144a23.png

2. 消费方获取结果

当然,我们更关心的部分在于,对于服务消费者,收到这个消息该怎么返回至原方法,这一切通信是同步还是异步的?如果是异步,又该怎么找到这一笔返回结果对应着哪次请求呢?

别急,我们回到上文出现过的DubboInvoker的这张图

aff8ab79e7e24dc7ac6bb6c7976628d7.png

我们可以看到,请求发送后形成的是一个CompletableFuture 的子类 DefaultFuture,这个对象属于个即刻返回的对象,但并不代表该对象里有返回值。不了解这个的也可以去看 JUC基础(二)—— Future接口 及其实现,我们先看这个Future的创建,可以看到,我们在这个Future静态属性里对象里面保存了请求id与本Future的关系,这一点很关键。

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    // put into waiting map.
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

紧接着,我们把这个 Future 放进了关键类 AsyncRpcResult 中,至此,还没有什么问题,然后DubboInvoker就把这个 AsyncRpcResult返回给上层 AbstractInvoker了,而这里才是真正获取结果的位置

public Result invoke(Invocation inv) throws RpcException {
    RpcInvocation invocation = (RpcInvocation) inv;
    // 准备调用
    prepareInvocation(invocation);
    // 调用并返回同步结果
    AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
    // 如果是同步的话等待远程结果返回
    waitForResultIfSync(asyncResult, invocation);
    return asyncResult;
}

我们关注 waitForResultIfSync方法,其中的核心代码就两个:如果是采用同步调用模式,或者本次调用时同步调用,就需要先获取远程值,即最终进入就是我们说的那个 AsyncRpcResult进行获取。如果都不是同步,则直接把半成品 asyncResult 返回。默认都是采用的同步调用,怎么设置成异步呢?


可以通过注解@DubboReference,@DubboService 的 async 属性可以进行设置,又或者如下的xml配置


<!-- 服务提供者配置 -->
<dubbo:service interface="com.zhanfu.dubbo.api" ref="demoService" async="true"/>
<!-- 服务消费者配置 -->
<dubbo:reference id="demoService" interface="com.zhanfu.dubbo.api" async="true"/>

因为默认采用的都是同步,在我们的Demo中,随后会进入阻塞获取阶段

private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
    if (InvokeMode.SYNC != invocation.getInvokeMode()) {
        return;
    }
    try {
        Object timeoutKey = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
        long timeout = RpcUtils.convertToNumber(timeoutKey, Integer.MAX_VALUE);
        asyncResult.get(timeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) 
        // ...
    } 
}

不出所料,最后进行的其实还是利用了 Future.get 来进行阻塞获取。但这里,我们还需要留心上方的 waitAndDrain 方法,这个方法会去驱动线程池执行代办任务,所谓的代办其实就是如果有消息,得有个线程去处理消息。


public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (executor != null && executor instanceof ThreadlessExecutor) {
        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
        threadlessExecutor.waitAndDrain();
    }
    return responseFuture.get(timeout, unit);
}

但是远程的结果不可能突然就准备好了,这中间一定还有一个事件触发,最终准备好返回结果,然后 Future.get 才得以返回。


3. 事件触发

信道事件本身属于通信的范畴,我们此次并不细说,但是我们知道信道事件的发生,最终会触发一系列我们设置的处理器去执行,其中又包含了我们屡次提到的 HeaderExchangeHandler,不过当时我们看的还是 handleRequest,现在我们要看 handleResponse了


static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}


而此时的Response就是返回值的包装了

e2a830c2c555481d8410d280e3580d76.png

最终的设置结果,通过 complete 方法把结果值塞进 CompletableFuture 的 result 属性,然后唤醒阻塞的线程

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
    // ...
}

六、总结

至此,我们完整的看完了服务暴露、引用、调用的方法。不难体会到,Dubbo本身很好的完成了一个RPC框架的任务,除此以外,它利用SPI提供的众多扩展点形成了庞大的自定义体系,而且具有服务注册、发现、容错、路由、负载均衡、异步调用等诸多功能,这些都给予了Dubbo突破一般RPC框架的底气,而且其通讯底层采用了高性能框架Netty,网络通信效率亦是有所保障。当然,对于开发者来说,最舒心的还是它与Spring框架的适配非常融洽,使得我们可以靠注解完成大部分配置

b86be30a2eed455984017042f6aad494.png


不过,我们的学习才刚刚开始,了解了整体的流程以及基本原理后,我们还会对Dubbo 的配置、监控、功能细节、新特性进行讲解;当然,也会进行一些面试题的QA,感兴趣的可以直接订阅Dubbo 专栏获取最新的解读


目录
相关文章
|
7月前
|
Dubbo Java 应用服务中间件
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用
|
7月前
|
Dubbo Cloud Native 网络协议
【Dubbo3技术专题】「服务架构体系」第一章之Dubbo3新特性要点之RPC协议分析介绍
【Dubbo3技术专题】「服务架构体系」第一章之Dubbo3新特性要点之RPC协议分析介绍
100 1
|
1月前
|
Dubbo 安全 应用服务中间件
Apache Dubbo 正式发布 HTTP/3 版本 RPC 协议,弱网效率提升 6 倍
在 Apache Dubbo 3.3.0 版本之后,官方推出了全新升级的 Triple X 协议,全面支持 HTTP/1、HTTP/2 和 HTTP/3 协议。本文将围绕 Triple 协议对 HTTP/3 的支持进行详细阐述,包括其设计目标、实际应用案例、性能测试结果以及源码架构分析等内容。
|
3月前
|
XML 负载均衡 监控
分布式-dubbo-简易版的RPC框架
分布式-dubbo-简易版的RPC框架
|
4月前
|
负载均衡 Dubbo 应用服务中间件
Dubbo服务调用过程原理
该文章主要介绍了Dubbo服务调用过程的原理,包括服务调用的主要阶段和服务调用的具体步骤。
Dubbo服务调用过程原理
|
7月前
|
Dubbo Java 应用服务中间件
Spring Boot整合Dubbo+Zookeeper实现RPC调用
Spring Boot整合Dubbo+Zookeeper实现RPC调用 技术栈说明 Dubbo:Dubbo作为RPC框架,能在多个服务之间实现远程服务的调用。比如有两个独立的微服务A和B,A服务想要调用B服务时,因为两者不在同个内存空间中,不能直接调用,所以可以通过Dubbo实现这点。 功能和Spring Cloud的Feign相同,两者都是应用于微服务架构的远程调用框架 Zookeeper:作为注册中心去管理Dubbo服务,这点和Eureka、Nacos相同。 概述 通过一个示例说明Dubbo+Zookeeper在Spring Boot中的应用。 现有两个服务provider和con
196 4
|
7月前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
166 0
|
7月前
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
299 0
|
1月前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
4月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC