深入理解Dubbo-7.服务消费调用源码分析(上):https://developer.aliyun.com/article/1414097
Invoker.invoke
继续回到FailoverClusterInvoker这个类中的代码来,这里会通过负载返回的invoker对象,来调用invoke方法进行远程通信。
//发起远程调用 Result result = invoker.invoke(invocation);
这个Invoker其实在 RegistryDirectory 的 toInvokers 方法中,对Invoker进行初始化时就定义好了。
RegistryDirectory.toInvokers
invoker = new RegistryDirectory.InvokerDelegate(this.protocol.refer(this.serviceType, url), url, providerUrl);
所以最终我们的Invoker对象实际上是 RegistryDirectory$InvokerDelegate() ,在debug过程中也能够发现这一点。
RegistryDirectory.InvokerDelegate
private static class InvokerDelegate<T> extends InvokerWrapper<T> { private URL providerUrl; public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) { super(invoker, url); this.providerUrl = providerUrl; } public URL getProviderUrl() { return this.providerUrl; } }
但是其没有invoke方法,所以去其父类。
public class InvokerWrapper<T> implements Invoker<T> { ... public Result invoke(Invocation invocation) throws RpcException { return this.invoker.invoke(invocation); } ... }
再回到这个
invoker = new RegistryDirectory.InvokerDelegate(this.protocol.refer(this.serviceType, url), url, providerUrl);
因为protocol 是自适应扩展点,所以会根据配置对其进行包装,具体可以扩展点SPI知识。
所以最终debug中,可以看到最终的invoker是
ProtocolFilterWrapper
在ProtocolFilterWrapper的调用中,实际会调用一个匿名内部类的invoke方法,这里构建了一个filter进行逐项的过滤
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { final Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); // 激活扩展点 if (!filters.isEmpty()) { for(int i = filters.size() - 1; i >= 0; --i) { final Filter filter = (Filter)filters.get(i); last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(last, invocation); } catch (Exception var15) { Exception e = var15; if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = (ListenableFilter)filter; try { Listener listenerx = listenableFilter.listener(invocation); if (listenerx != null) { listenerx.onError(e, invoker, invocation); } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Listener) { Listener listener = (Listener)filter; listener.onError(var15, invoker, invocation); } throw var15; } finally { ; } return asyncResult.whenCompleteWithContext((r, t) -> { if (filter instanceof ListenableFilter) { ListenableFilter listenableFilter = (ListenableFilter)filter; Listener listener = listenableFilter.listener(invocation); try { if (listener != null) { if (t == null) { listener.onResponse(r, invoker, invocation); } else { listener.onError(t, invoker, invocation); } } } finally { listenableFilter.removeListener(invocation); } } else if (filter instanceof Listener) { Listener listenerx = (Listener)filter; if (t == null) { listenerx.onResponse(r, invoker, invocation); } else { listenerx.onError(t, invoker, invocation); } } }); } public void destroy() { invoker.destroy(); } public String toString() { return invoker.toString(); } }; } } return last; }
而实际的Invoker其实通过debug是
在toInvokers这个方法中,invoker是通过 protocol.refer来构建的。那么我们再来分析一下refer里面做了什么?
protocol.refer(serviceType, url), url, providerUrl)
我们去看一下它的实现,首先protocol,是被依赖注入进来的自适应扩展点Protocol$Adaptive. ,此时传进去的数,此时url对应的地址应该是dubbo://开头的协议地址,所以最终获得的是通过包装之后的DubboProtocol象。
QosProtocolWrapper(ProtocolFilterWrapper(ProtocolListenerWrapper(DubboProtocol)))
AbstractProtocol.refer
DubboProtocol中没有refer方法,而是调用父类的refer。
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); }
AsyncToSyncInvoker.invoke
经过装饰器、过滤器对invoker进行增强和过滤之后,来到了AsyncToSyncInvoker.invoke方法,这里采用的是异步的方式来进行通信
public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = this.invoker.invoke(invocation); try { //如果配置的是同步通信,则通过get阻塞式获取返回结果 if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) { asyncResult.get(2147483647L, TimeUnit.MILLISECONDS); } return asyncResult; } ...... }
DubboInvoker.invoke
DubboInvoker继承了AbstractInvoker这个抽象类,而DubboInvoker中没有invoke这个方法,所以这里调用的是AbstractInvoker.invoke方法。
进入到DubboInvoker这个方法中,那么意味着正式进入到服务通信层面了。前面的很多细节分析,无非就是做了三件事
- 多注册中心的拦截以及分发
- 负载均衡以及集群容错
- 请求过滤和包装
public Result invoke(Invocation inv) throws RpcException { // 检查当前 Invoker 是否已经被销毁 if (this.destroyed.get()) { this.logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, , dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } // 将 Invocation 类型转换为 RpcInvocation 类型 RpcInvocation invocation = (RpcInvocation)inv; // 设置当前 Invoker 对象到 RpcInvocation 中 invocation.setInvoker(this); // 如果附加属性不为空,则添加到 RpcInvocation 对象中 if (CollectionUtils.isNotEmptyMap(this.attachment)) { invocation.addObjectAttachmentsIfAbsent(this.attachment); } // 将 RpcContext 中的上下文附加属性添加到 RpcInvocation 对象中 Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addObjectAttachments(contextAttachments); } // 设置调用模式到 RpcInvocation 中 invocation.setInvokeMode(RpcUtils.getInvokeMode(this.url, invocation)); // 如果是异步调用,则在 RpcInvocation 中附加调用 ID RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation); AsyncRpcResult asyncResult; try { // 调用 doInvoke 方法进行远程调用 asyncResult = (AsyncRpcResult)this.doInvoke(invocation); } catch (InvocationTargetException var7) { Throwable te = var7.getTargetException(); if (te == null) { asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, var7, invocation); } else { if (te instanceof RpcException) { ((RpcException)te).setCode(3); } asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, te, invocation); } } catch (RpcException var8) { if (!var8.isBiz()) { throw var8; } asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, var8, invocation); } catch (Throwable var9) { asyncResult = AsyncRpcResult.newDefaultAsyncResult((Object)null, var9, invocation); } RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture())); return asyncResult; }
DubboInvoker.doInvoke
调用doInvoke方法发起远程请求。
protected Result doInvoke(final Invocation invocation) throws Throwable { // 将 Invocation 类型转换为 RpcInvocation 类型 RpcInvocation inv = (RpcInvocation)invocation; // 获取调用方法名,并将 path 和 version 附加到 RpcInvocation 中 String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment("path", this.getUrl().getPath()); inv.setAttachment("version", this.version); ExchangeClient currentClient; // 从 clients 中选择一个 ExchangeClient 进行远程调用 if (this.clients.length == 1) { currentClient = this.clients[0]; } else { // 轮询 currentClient = this.clients[this.index.getAndIncrement() % this.clients.length]; } try { // 判断是否是单向调用 boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation); // 计算超时时间 int timeout = this.calculateTimeout(invocation, methodName); if (isOneway) { // 如果是单向调用,则将 RpcInvocation 对象发送到服务端 boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false); currentClient.send(inv, isSent); // 返回 AsyncRpcResult,表示调用结果还未返回 return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { // 否则,创建一个线程池 executor,并将执行结果封装到 CompletableFuture<AppResponse> 对象中 ExecutorService executor = this.getCallbackExecutor(this.getUrl(), inv); CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply((obj) -> { return (AppResponse)obj; }); // 将 FutureContext 中的 Future 对象设置为 appResponseFuture FutureContext.getContext().setCompatibleFuture(appResponseFuture); // 返回 AsyncRpcResult,表示调用结果还未返回 AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (Throwable e) { ...... } }
currentClient还记得是一个什么对象吗?
在DubboProtocol里的initClient()中
private ExchangeClient initClient(URL url) { ..... Object client; if (url.getParameter("lazy", false)) { client = new LazyConnectExchangeClient(url, this.requestHandler); } else { client = Exchangers.connect(url, this.requestHandler); } ...... } } public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { ..... url = url.addParameterIfAbsent("codec", "exchange"); return getExchanger(url).connect(url, handler); } } public static Exchanger getExchanger(URL url) { String type = url.getParameter("exchanger", "header"); return getExchanger(type); } public static Exchanger getExchanger(String type) { return (Exchanger)ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
由于是扩展点,所以相当于它实际是一个ReferenceCountExchangeClient(HeaderExchangeClient())
所以它的调用链路是
ReferenceCountExchangeClient->HeaderExchangeClient->HeaderExchangeChannel->(request方法)
ReferenceCountExchangeClient.request
最终,把构建好的RpcInvocation,组装到一个Request对象中进行传递
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { return this.client.request(request, timeout, executor); }
- ReferenceCountExchangeClient 用来记录调用次数
- HeaderExchangeClient 用来开启心跳机制、以及启动失败重连任务
HeaderExchangeClient.request
public HeaderExchangeClient(Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null"); this.client = client; this.channel = new HeaderExchangeChannel(client); if (startTimer) { URL url = client.getUrl(); this.startReconnectTask(url); this.startHeartBeatTask(url); } } public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { return this.channel.request(request, timeout, executor); }
HeaderExchangeChannel.request
进入到HeaderExchangeChannel.request 来发起请求,这个类的主要职责就是和服务端进行数据交互
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { if (this.closed) { throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } else { // 创建请求对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(this.channel, req, timeout, executor); try { //发送请求 this.channel.send(req); return future; } catch (RemotingException var7) { future.cancel(); throw var7; } } }
服务端接收数据的处理流程
客户端请求发出去之后,服务端会收到这个请求的消息,然后触发调用。
服务端接收到消息
服务端这边接收消息的处理链路,也比较复杂,我们回到NettServer中创建io的过程。
bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),getUrl(), NettyServer.this); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } });
服务端启动的时候,配置的消息处理是handler配置的是nettyServerHandler
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);
所以,服务端收到消息之后,会调用NettyServerHandler中的channelRead方法
深入理解Dubbo-7.服务消费调用源码分析(下):https://developer.aliyun.com/article/1414106