深入理解Dubbo-7.服务消费调用源码分析(中):https://developer.aliyun.com/article/1414100
handler.channelRead()
服务端收到读的请求是,会进入这个方法。
接着通过handler.received来处理msg,这个handle的链路很长,比较复杂,我们需要逐步剖析
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), this.url, this.handler); this.handler.received(channel, msg); }
服务端收到读的请求是,会进入这个方法。接着通过handler.received来处理msg ,而这个handler 是在服务发布的时候构建得。
DubboProtocol.createServer:
server = Exchangers.bind(url, this.requestHandler);
Exchanger.bind
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } else if (handler == null) { throw new IllegalArgumentException("handler == null"); } else { url = url.addParameterIfAbsent("codec", "exchange"); return getExchanger(url).bind(url, handler); } }
通过扩展点选择到HeaderExchanger
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))})); }
起链路如下:
MultiMessageHandler(复合消息处理) —> HeartbeatHandle(心跳消息处理,接收心跳并发送心跳响应) —> AllChannelHandler (业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任
务给线程池处理)—> DecodeHandler (业务解码处理器)—> HeaderExchangeHandler —> DubboProtocol#requestHandler(new ExchangeHandlerAdapter())
而在构建 NettyServerHandler 得时候将 this 传了进去。this 即 NettyServer 。NettyServer是 AbstractPeer 得子类。所以 handler.received 此时会调用AbsstractPeer.received方法,这个方法用来判断服务端是否关闭了,如果关闭就直接返回,否则,通过handler处理链进行层层调用。
public void received(Channel ch, Object msg) throws RemotingException { if (closed) { return; } handler.received(ch, msg); }
HeaderExchangeHandler.received
交互层请求响应处理,有三种处理方式
- handlerRequest,双向请求
- handler.received 单向请求
- handleResponse 响应消息
public void received(Channel channel, Object message) throws RemotingException { ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // 如果接收到的消息是请求类型 if (message instanceof Request) { Request request = (Request)message; // 如果请求是事件类型 if (request.isEvent()) { this.handlerEvent(channel, request); // 如果请求是双向类型 } else if (request.isTwoWay()) { this.handleRequest(exchangeChannel, request); // 如果请求不是事件类型也不是双向类型 } else { this.handler.received(exchangeChannel, request.getData()); } // 如果接收到的消息是响应类型 } else if (message instanceof Response) { handleResponse(channel, (Response)message); // 如果接收到的消息是字符串类型 } else if (message instanceof String) { // 如果是客户端发送的字符串消息,则报错 if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); // 如果是服务器端接收到的字符串消息,则进行处理 } else { String echo = this.handler.telnet(channel, (String)message); if (echo != null && echo.length() > 0) { channel.send(echo); } } // 如果接收到的消息不是请求、响应或字符串类型 } else { this.handler.received(exchangeChannel, message);//单向请求 } }
handleRequest
接着调用handleRequest方法。这个方法中,构建返回的对象Response,并且最终会通过异步的方式来把msg传递到invoker中进行调用 handler.reply
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); Object data; if (req.isBroken()) { data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable)data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus((byte)40); channel.send(res); } else { data = req.getData(); try { CompletionStage<Object> future = this.handler.reply(channel, data);// 可以返回一个结果 future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus((byte)20); res.setResult(appResult); } else { res.setStatus((byte)70); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException var5) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var5); } }); } catch (Throwable var6) { res.setStatus((byte)70); res.setErrorMessage(StringUtils.toString(var6)); channel.send(res); } } }
此时的handler.reply,应该是DubboProtocol中构建的匿名内部类
所以调用handler.reply方法,自然就进入到了该匿名内部类中的reply方法中来。
DubboProtocol$requestHandler
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { //如果消息类型不是invocation,则抛出异常表示无法识别 if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } //获得请求参数 Invocation inv = (Invocation) message; // 获取 invoker 领域对象,这个对象是在发布服务的时候构建,然后封装成 exporter 存在map里面的。 //根据key从发布的服务列表中查找到指定的服务端invoke,这个就是之前在讲服务发布时,涉及到的invoke对象。 Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //发起请求调用,此时得到的invoker对象 Result result = invoker.invoke(inv); // 发起对应调用 return result.thenApply(Function.identity()); } //......省略代码 };
getInvoker
Invoker<?> invoker = getInvoker(channel, inv);
相当于根据key来获取一个value值
回顾下之前,在发布的时候,调用了一个DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap); // 构建好了之后,把key 和 value存进去 this.exporterMap.put(key, exporter); } // 而getInvoker也会从map中拿到这个值 Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { ...... DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv)); } else { return exporter.getInvoker(); } }
invoker.invoke()
invoker.invoke,发起本地服务调用,但是此时调用之前,invoke并不是一个直接调用的对象,而是包装过的。
在 ServiceConfig#doExportUrlsFor1Protocol 构建包装。最后的调用链路如下:
RegistryProtocol.InvokerDelegate.invoke —> DelegateProviderMetaDataInvoker.invoke —> AbstractProxyInvoker.invoke —> AbstractProxyInvoker(JavassistProxyFactory#getInvoker)
InvokerDelegate 未实现父类 InvokerWrapper invoke方法。进入到InvokerWrapper.invoke方法,这个是一个Invoker包装类,包装了URL地址信息和真正的Invoker代理对象。
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
DelegateProviderMetaDataInvoker
这里是一个委派类,它提供了服务提供者的元数序信息。
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
AbstractProxyInvoker
接着进入到AbstractProxyInvoker的invoke方法,在这个方法中,我们可以看到它会调用子类的doInvoke方法,获得返回结果。
其中proxy,表示服务端的对象实例,这个实例很显然是在构建动态代理Invoker对象时保存进来的。
public Result invoke(Invocation invocation) throws RpcException { try { Object value = this.doInvoke(this.proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = this.wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException var5) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { this.logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", var5); } return AsyncRpcResult.newDefaultAsyncResult((Object)null, var5.getTargetException(), invocation); } catch (Throwable var6) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6); } }
JavassistProxyFactory.doInvoke
最后进入到具体的子类,也就是在服务的发布的时候通过 构建的
@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
服务实例是什么时候生成的
从上面的代码中可以看到,getInvoker中传递的proxy,实际就是对象实例,而这个参数是在serviceConfig中,
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。
总结
至此,服务消费的处理流程就分析完了。
ow new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6);
}
}
#### JavassistProxyFactory.doInvoke 最后进入到具体的子类,也就是在服务的发布的时候通过 构建的 ```java @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
服务实例是什么时候生成的
从上面的代码中可以看到,getInvoker中传递的proxy,实际就是对象实例,而这个参数是在serviceConfig中,
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。
总结
至此,服务消费的处理流程就分析完了。