三、网络通信
1. Exchange
在进行通讯的时候,Dubbo定义了Exchange这样的部分,也就是说提供了一种统一的消息交换机,使得服务消费者和提供者可以进行可靠的通信。它封装了底层的网络细节,处理网络的传输、编解码、序列化和反序列化等操作。
通过Exchange,Dubbo可以支持多种通信协议(如Dubbo, HTTP ,RMI)和序列化协议(如Hession, JSON, Protobuf)等。其主要有两个接口和其实现类组成
- ExchangeClient(接口):用于在消费者端发送请求和接收响应。它定义了发送请求、接收响应和关闭连接等方法。
- ExchangeServer(接口):在提供者端用于接收请求并发送响应。它定义了启动服务器、接收请求、发送响应和关闭服务器等方法。
- Exchanger(SPI接口):是Exchange的核心接口,用于创建客户端和服务器端的实例。
- HeaderExchangeClient:是ExchangeClient接口的默认实现类,负责在消费者端处理请求和响应的编解码、序列化和反序列化等操作。
- HeaderExchangeServer:是ExchangeServer接口的默认实现类,负责在提供者端处理请求和响应的编解码、序列化和反序列化等操作
2. Netty
当然,Dubbo并没有真正复写底层的通信代码,而是采用了另一个专业的网络框架——Netty,这个框架笔者看源码是也是深感佩服,对性能的强调,要求是非常高的。Dubbo 则是嫁接在其之上,利用 Netty 的客户端与服务端进行通信的,当然,作为发送请求的消费者,自然是客户端。而接收请求的服务提供方,则是服务端(概念上的)
当然,关于Netty的部分,我们不可能在这里进行讲解,我们只需要知道,Dubbo的通信底层是调用的高性能网络通信框架Netty即可,关于Netty框架的讲解,我们会在一个新的系列中进行。
四、服务执行
1. 消息接收
其实通过上面的网络章节就能看出来。真正的通信都是依赖于Netty实现的,因此在消息接收上,依赖的也是Netty的机制。如下图,Dubbo内置着Netty的服务端
那抛去这一部分。那Dubbo本身做了怎样的准备呢?消息经由一个分发器Dispatcher(SPI接口)的自适应实现类实现分发后,然后由一个handler链来处理。
至此,我们就看到了exchange层次的另一个重要类 HeaderExchangeHandler,当消息发送来时,会触发其执行 received - handleRequest 方法
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); Object msg = req.getData(); try { // 此处的handler 为 DubboProtocol.ExchangeHandler CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } // 此处为返回响应 channel.send(res); } catch (RemotingException e) { logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
2. 获取服务并调用
DubboProtocol.ExchangeHandler.reply 中有一个重要方法
Invoker<?> invoker = getInvoker(channel, inv);
我们仔细来看看这个方法:DubboProtocol.getInvoker。
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { // ... String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY), (String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY) ); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } return exporter.getInvoker(); }
如果看过之前的 Dubbo最核心功能——服务暴露的配置、使用及原理 话,看到 exporterMap 就已经明白了,我们曾经说过,服务的暴露最终就存在这样的 exporterMap 里,键为接口信息,此处为com.zhanfu.dubbo.api.DemoService:20880; 值就是对该接口实现类的包装
那么没有悬念的,最终拿到了我们在服务暴露时生成的exporter,如果你没忘,当时我们举得是injvm的例子,这次我们是在进行真实的RPC调用,当然,他们invoker生成的形式是一样的,都是在 ServiceConfig.doExportUrl 里
// 前情回顾
private void doExportUrl(URL url, boolean withMetaData) {
Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); if (withMetaData) { invoker = new DelegateProviderMetaDataInvoker(invoker, this); } Exporter exporter = protocolSPI.export(invoker);
exporters.add(exporter); }
而此刻,我们获得了这个invoker,一般情况下,这个 invoker 也是被FilterChain层层包裹着的,比如我们在消费端就见过的 CallbackRegistrationInvoker ,以及profileServerFilter 、EchoFilter等等很多中间层
最终,到达了我们想要的那一层,也就是所谓的代理Invoker —— AbstractProxyInvoker,这个Invoker对象内包含了我们服务的实现类,如下图:
而该代理其实是由javassistProxyFactory负责创建的,所以这个代理的调用方法是早就被定义好的,就是执行目标方法的调用,而所谓目标自然就是我们的服务实现类
return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }
五、返回结果
1. 提供方返回
终于走到了整个流程的最后一个步骤,也就是服务执行完的结果返回。
对于服务提供者来说,要做的就是把刚刚的嵌套再走一遍,栈帧一层层退出并把结果返回。直到我们的Exchange层,我们再来看一遍 HeaderExchangeHandler 的核心代码
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { // ... CompletionStage<Object> future = handler.reply(channel, msg); // 在异步任务完成后会触发下列方法,最终返回数据 future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn(TRANSPORT_FAILED_RESPONSE, "", "", "Send result to consumer failed, channel is " + channel + ", msg is " + e); } }); // ... }
很直观的可以看见,处理完的结果会用同一个channel往回进行消息发送。发送的内容就包含了方法的返回值mResult。
2. 消费方获取结果
当然,我们更关心的部分在于,对于服务消费者,收到这个消息该怎么返回至原方法,这一切通信是同步还是异步的?如果是异步,又该怎么找到这一笔返回结果对应着哪次请求呢?
别急,我们回到上文出现过的DubboInvoker的这张图
我们可以看到,请求发送后形成的是一个CompletableFuture 的子类 DefaultFuture,这个对象属于个即刻返回的对象,但并不代表该对象里有返回值。不了解这个的也可以去看 JUC基础(二)—— Future接口 及其实现,我们先看这个Future的创建,可以看到,我们在这个Future静态属性里对象里面保存了请求id与本Future的关系,这一点很关键。
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>(); private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); CHANNELS.put(id, channel); }
紧接着,我们把这个 Future 放进了关键类 AsyncRpcResult 中,至此,还没有什么问题,然后DubboInvoker就把这个 AsyncRpcResult返回给上层 AbstractInvoker了,而这里才是真正获取结果的位置
public Result invoke(Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; // 准备调用 prepareInvocation(invocation); // 调用并返回同步结果 AsyncRpcResult asyncResult = doInvokeAndReturn(invocation); // 如果是同步的话等待远程结果返回 waitForResultIfSync(asyncResult, invocation); return asyncResult; }
我们关注 waitForResultIfSync方法,其中的核心代码就两个:如果是采用同步调用模式,或者本次调用时同步调用,就需要先获取远程值,即最终进入就是我们说的那个 AsyncRpcResult进行获取。如果都不是同步,则直接把半成品 asyncResult 返回。默认都是采用的同步调用,怎么设置成异步呢?
可以通过注解@DubboReference,@DubboService 的 async 属性可以进行设置,又或者如下的xml配置
<!-- 服务提供者配置 --> <dubbo:service interface="com.zhanfu.dubbo.api" ref="demoService" async="true"/> <!-- 服务消费者配置 --> <dubbo:reference id="demoService" interface="com.zhanfu.dubbo.api" async="true"/>
因为默认采用的都是同步,在我们的Demo中,随后会进入阻塞获取阶段
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) { if (InvokeMode.SYNC != invocation.getInvokeMode()) { return; } try { Object timeoutKey = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY); long timeout = RpcUtils.convertToNumber(timeoutKey, Integer.MAX_VALUE); asyncResult.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) // ... } }
不出所料,最后进行的其实还是利用了 Future.get 来进行阻塞获取。但这里,我们还需要留心上方的 waitAndDrain 方法,这个方法会去驱动线程池执行代办任务,所谓的代办其实就是如果有消息,得有个线程去处理消息。
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } return responseFuture.get(timeout, unit); }
但是远程的结果不可能突然就准备好了,这中间一定还有一个事件触发,最终准备好返回结果,然后 Future.get 才得以返回。
3. 事件触发
信道事件本身属于通信的范畴,我们此次并不细说,但是我们知道信道事件的发生,最终会触发一系列我们设置的处理器去执行,其中又包含了我们屡次提到的 HeaderExchangeHandler,不过当时我们看的还是 handleRequest,现在我们要看 handleResponse了
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
而此时的Response就是返回值的包装了
最终的设置结果,通过 complete 方法把结果值塞进 CompletableFuture 的 result 属性,然后唤醒阻塞的线程
private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // ... }
六、总结
至此,我们完整的看完了服务暴露、引用、调用的方法。不难体会到,Dubbo本身很好的完成了一个RPC框架的任务,除此以外,它利用SPI提供的众多扩展点形成了庞大的自定义体系,而且具有服务注册、发现、容错、路由、负载均衡、异步调用等诸多功能,这些都给予了Dubbo突破一般RPC框架的底气,而且其通讯底层采用了高性能框架Netty,网络通信效率亦是有所保障。当然,对于开发者来说,最舒心的还是它与Spring框架的适配非常融洽,使得我们可以靠注解完成大部分配置
不过,我们的学习才刚刚开始,了解了整体的流程以及基本原理后,我们还会对Dubbo 的配置、监控、功能细节、新特性进行讲解;当然,也会进行一些面试题的QA,感兴趣的可以直接订阅Dubbo 专栏获取最新的解读