public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), this.url, this.handler); this.handler.received(channel, msg); }
服务端收到读的请求是,会进入这个方法。接着通过handler.received来处理msg ,而这个handler 是在服务发布的时候构建得。
server = Exchangers.bind(url, this.requestHandler);
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } else if (handler == null) { throw new IllegalArgumentException("handler == null"); } else { url = url.addParameterIfAbsent("codec", "exchange"); return getExchanger(url).bind(url, handler); } }
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))})); }
MultiMessageHandler(复合消息处理) —> HeartbeatHandle(心跳消息处理,接收心跳并发送心跳响应) —> AllChannelHandler (业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任
务给线程池处理)—> DecodeHandler (业务解码处理器)—> HeaderExchangeHandler —> DubboProtocol#requestHandler(new ExchangeHandlerAdapter())
而在构建 NettyServerHandler 得时候将 this 传了进去。this 即 NettyServer 。NettyServer是 AbstractPeer 得子类。所以 handler.received 此时会调用AbsstractPeer.received方法,这个方法用来判断服务端是否关闭了,如果关闭就直接返回,否则,通过handler处理链进行层层调用。
public void received(Channel ch, Object msg) throws RemotingException { if (closed) { return; } handler.received(ch, msg); }
- handlerRequest,双向请求
- handler.received 单向请求
- handleResponse 响应消息
public void received(Channel channel, Object message) throws RemotingException { ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // 如果接收到的消息是请求类型 if (message instanceof Request) { Request request = (Request)message; // 如果请求是事件类型 if (request.isEvent()) { this.handlerEvent(channel, request); // 如果请求是双向类型 } else if (request.isTwoWay()) { this.handleRequest(exchangeChannel, request); // 如果请求不是事件类型也不是双向类型 } else { this.handler.received(exchangeChannel, request.getData()); } // 如果接收到的消息是响应类型 } else if (message instanceof Response) { handleResponse(channel, (Response)message); // 如果接收到的消息是字符串类型 } else if (message instanceof String) { // 如果是客户端发送的字符串消息,则报错 if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); // 如果是服务器端接收到的字符串消息,则进行处理 } else { String echo = this.handler.telnet(channel, (String)message); if (echo != null && echo.length() > 0) { channel.send(echo); } } // 如果接收到的消息不是请求、响应或字符串类型 } else { this.handler.received(exchangeChannel, message);//单向请求 } }
接着调用handleRequest方法。这个方法中,构建返回的对象Response,并且最终会通过异步的方式来把msg传递到invoker中进行调用 handler.reply
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); Object data; if (req.isBroken()) { data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable)data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus((byte)40); channel.send(res); } else { data = req.getData(); try { CompletionStage<Object> future = this.handler.reply(channel, data);// 可以返回一个结果 future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus((byte)20); res.setResult(appResult); } else { res.setStatus((byte)70); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException var5) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var5); } }); } catch (Throwable var6) { res.setStatus((byte)70); res.setErrorMessage(StringUtils.toString(var6)); channel.send(res); } } }
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { //如果消息类型不是invocation,则抛出异常表示无法识别 if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } //获得请求参数 Invocation inv = (Invocation) message; // 获取 invoker 领域对象,这个对象是在发布服务的时候构建,然后封装成 exporter 存在map里面的。 //根据key从发布的服务列表中查找到指定的服务端invoke,这个就是之前在讲服务发布时,涉及到的invoke对象。 Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //发起请求调用,此时得到的invoker对象 Result result = invoker.invoke(inv); // 发起对应调用 return result.thenApply(Function.identity()); } //......省略代码 };
Invoker<?> invoker = getInvoker(channel, inv);
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap); // 构建好了之后,把key 和 value存进去 this.exporterMap.put(key, exporter); } // 而getInvoker也会从map中拿到这个值 Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { ...... DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv)); } else { return exporter.getInvoker(); } }
在 ServiceConfig#doExportUrlsFor1Protocol 构建包装。最后的调用链路如下:
RegistryProtocol.InvokerDelegate.invoke —> DelegateProviderMetaDataInvoker.invoke —> AbstractProxyInvoker.invoke —> AbstractProxyInvoker(JavassistProxyFactory#getInvoker)
InvokerDelegate 未实现父类 InvokerWrapper invoke方法。进入到InvokerWrapper.invoke方法,这个是一个Invoker包装类,包装了URL地址信息和真正的Invoker代理对象。
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
public Result invoke(Invocation invocation) throws RpcException { try { Object value = this.doInvoke(this.proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = this.wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException var5) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { this.logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", var5); } return AsyncRpcResult.newDefaultAsyncResult((Object)null, var5.getTargetException(), invocation); } catch (Throwable var6) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6); } }
最后进入到具体的子类,也就是在服务的发布的时候通过 构建的
@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。
ow new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6);
#### JavassistProxyFactory.doInvoke 最后进入到具体的子类,也就是在服务的发布的时候通过 构建的 ```java @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。