深入理解Dubbo-7.服务消费调用源码分析(下)

简介: 深入理解Dubbo-7.服务消费调用源码分析

深入理解Dubbo-7.服务消费调用源码分析(中):https://developer.aliyun.com/article/1414100


handler.channelRead()


服务端收到读的请求是,会进入这个方法。


接着通过handler.received来处理msg,这个handle的链路很长,比较复杂,我们需要逐步剖析

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), this.url, this.handler);
        this.handler.received(channel, msg);
    }

服务端收到读的请求是,会进入这个方法。接着通过handler.received来处理msg ,而这个handler 是在服务发布的时候构建得。


DubboProtocol.createServer:


server = Exchangers.bind(url, this.requestHandler);


Exchanger.bind


public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        } else if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        } else {
            url = url.addParameterIfAbsent("codec", "exchange");
            return getExchanger(url).bind(url, handler);
        }
    }


通过扩展点选择到HeaderExchanger


public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
    }

起链路如下:


MultiMessageHandler(复合消息处理) —> HeartbeatHandle(心跳消息处理,接收心跳并发送心跳响应) —> AllChannelHandler (业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任

务给线程池处理)—> DecodeHandler (业务解码处理器)—> HeaderExchangeHandler —> DubboProtocol#requestHandler(new ExchangeHandlerAdapter())


而在构建 NettyServerHandler 得时候将 this 传了进去。this 即 NettyServer 。NettyServer是 AbstractPeer 得子类。所以 handler.received 此时会调用AbsstractPeer.received方法,这个方法用来判断服务端是否关闭了,如果关闭就直接返回,否则,通过handler处理链进行层层调用。

public void received(Channel ch, Object msg) throws RemotingException {
  if (closed) {
    return;
  }
  handler.received(ch, msg);
}


HeaderExchangeHandler.received


交互层请求响应处理,有三种处理方式


  1. handlerRequest,双向请求
  2. handler.received 单向请求
  3. handleResponse 响应消息
public void received(Channel channel, Object message) throws RemotingException {
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
      // 如果接收到的消息是请求类型    
      if (message instanceof Request) {
            Request request = (Request)message;
             // 如果请求是事件类型
            if (request.isEvent()) {
                this.handlerEvent(channel, request);
            // 如果请求是双向类型
            } else if (request.isTwoWay()) {
                this.handleRequest(exchangeChannel, request);
            // 如果请求不是事件类型也不是双向类型
            } else {
                this.handler.received(exchangeChannel, request.getData());
            }
        // 如果接收到的消息是响应类型
        } else if (message instanceof Response) {
            handleResponse(channel, (Response)message);
        // 如果接收到的消息是字符串类型
        } else if (message instanceof String) {
            // 如果是客户端发送的字符串消息,则报错
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            // 如果是服务器端接收到的字符串消息,则进行处理
            } else {
                String echo = this.handler.telnet(channel, (String)message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
          // 如果接收到的消息不是请求、响应或字符串类型
        } else {
            this.handler.received(exchangeChannel, message);//单向请求
        }
    }


handleRequest


接着调用handleRequest方法。这个方法中,构建返回的对象Response,并且最终会通过异步的方式来把msg传递到invoker中进行调用 handler.reply

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        Object data;
        if (req.isBroken()) {
            data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable)data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus((byte)40);
            channel.send(res);
        } else {
            data = req.getData();
            try {
                CompletionStage<Object> future = this.handler.reply(channel, data);// 可以返回一个结果
                future.whenComplete((appResult, t) -> {
                    try {
                        if (t == null) {
                            res.setStatus((byte)20);
                            res.setResult(appResult);
                        } else {
                            res.setStatus((byte)70);
                            res.setErrorMessage(StringUtils.toString(t));
                        }
                        channel.send(res);
                    } catch (RemotingException var5) {
                        logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var5);
                    }
                });
            } catch (Throwable var6) {
                res.setStatus((byte)70);
                res.setErrorMessage(StringUtils.toString(var6));
                channel.send(res);
            }
        }
    }

此时的handler.reply,应该是DubboProtocol中构建的匿名内部类

所以调用handler.reply方法,自然就进入到了该匿名内部类中的reply方法中来。


DubboProtocol$requestHandler


private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
        //如果消息类型不是invocation,则抛出异常表示无法识别
            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
        //获得请求参数
            Invocation inv = (Invocation) message;
          // 获取 invoker 领域对象,这个对象是在发布服务的时候构建,然后封装成 exporter 存在map里面的。
            //根据key从发布的服务列表中查找到指定的服务端invoke,这个就是之前在讲服务发布时,涉及到的invoke对象。
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //发起请求调用,此时得到的invoker对象
            Result result = invoker.invoke(inv); // 发起对应调用
            return result.thenApply(Function.identity());
        }
  //......省略代码
};


getInvoker


Invoker<?> invoker = getInvoker(channel, inv);

相当于根据key来获取一个value值


回顾下之前,在发布的时候,调用了一个DubboProtocol.export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
      // 构建好了之后,把key 和 value存进去
        this.exporterMap.put(key, exporter);
    }
// 而getInvoker也会从map中拿到这个值
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ......
        DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv));
        } else {
            return exporter.getInvoker();
        }
    }


invoker.invoke()


invoker.invoke,发起本地服务调用,但是此时调用之前,invoke并不是一个直接调用的对象,而是包装过的。


在 ServiceConfig#doExportUrlsFor1Protocol 构建包装。最后的调用链路如下:


RegistryProtocol.InvokerDelegate.invoke —> DelegateProviderMetaDataInvoker.invoke —> AbstractProxyInvoker.invoke —> AbstractProxyInvoker(JavassistProxyFactory#getInvoker)

InvokerDelegate 未实现父类 InvokerWrapper invoke方法。进入到InvokerWrapper.invoke方法,这个是一个Invoker包装类,包装了URL地址信息和真正的Invoker代理对象。

public Result invoke(Invocation invocation) throws RpcException {
  return invoker.invoke(invocation);
}


DelegateProviderMetaDataInvoker


这里是一个委派类,它提供了服务提供者的元数序信息。

public Result invoke(Invocation invocation) throws RpcException {
  return invoker.invoke(invocation);
}


AbstractProxyInvoker


接着进入到AbstractProxyInvoker的invoke方法,在这个方法中,我们可以看到它会调用子类的doInvoke方法,获得返回结果。


其中proxy,表示服务端的对象实例,这个实例很显然是在构建动态代理Invoker对象时保存进来的。

public Result invoke(Invocation invocation) throws RpcException {
        try {
            Object value = this.doInvoke(this.proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            CompletableFuture<Object> future = this.wrapWithFuture(value);
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse();
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                return result;
            });
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException var5) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                this.logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", var5);
            }
            return AsyncRpcResult.newDefaultAsyncResult((Object)null, var5.getTargetException(), invocation);
        } catch (Throwable var6) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6);
        }
    }


JavassistProxyFactory.doInvoke


最后进入到具体的子类,也就是在服务的发布的时候通过 构建的

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
}


服务实例是什么时候生成的


从上面的代码中可以看到,getInvoker中传递的proxy,实际就是对象实例,而这个参数是在serviceConfig中,

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);

而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。


总结


至此,服务消费的处理流程就分析完了。

ow new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6);

}

}

#### JavassistProxyFactory.doInvoke
最后进入到具体的子类,也就是在服务的发布的时候通过 构建的
```java
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
}


服务实例是什么时候生成的


从上面的代码中可以看到,getInvoker中传递的proxy,实际就是对象实例,而这个参数是在serviceConfig中,

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);

而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。


总结


至此,服务消费的处理流程就分析完了。

目录
相关文章
|
29天前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
29 0
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo服务暴露机制解密:深入探讨服务提供者的奥秘【九】
Dubbo服务暴露机制解密:深入探讨服务提供者的奥秘【九】
23 0
|
2月前
|
缓存 运维 监控
Dubbo服务降级:保障稳定性的终极指南【六】
Dubbo服务降级:保障稳定性的终极指南【六】
36 0
|
3月前
|
Dubbo Java 应用服务中间件
Spring Boot Dubbo 构建分布式服务
Spring Boot Dubbo 构建分布式服务
47 0
|
3月前
|
存储 负载均衡 监控
深入理解Dubbo-6.服务消费源码分析(下)
深入理解Dubbo-6.服务消费源码分析
32 0
|
1月前
|
SpringCloudAlibaba Dubbo Java
SpringCloud Alibaba集成Dubbo实现远程服务间调用
SpringCloud Alibaba集成Dubbo实现远程服务间调用
|
29天前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
39 0
|
29天前
|
Kubernetes Dubbo 应用服务中间件
【Dubbo3终极特性】「流量治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin
【Dubbo3终极特性】「流量治理体系」一文教你如何搭建Dubbo3的控制台服务Dubbo-Admin
50 0
|
2月前
|
Dubbo Java 应用服务中间件
Dubbo 第四节: Spring与Dubbo整合原理与源码分析
DubboConfigConfigurationRegistrar的主要作⽤就是对propties⽂件进⾏解析并根据不同的配置项项⽣成对应类型的Bean对象。
|
3月前
|
Dubbo Java 应用服务中间件
Dubbo 3.x结合Zookeeper实现远程服务基本调用
ZooKeeper和Dubbo是两个在分布式系统中常用的开源框架,它们可以协同工作,提供服务注册与发现、分布式协调等功能。