深入理解Dubbo-7.服务消费调用源码分析(下)

简介: 深入理解Dubbo-7.服务消费调用源码分析

深入理解Dubbo-7.服务消费调用源码分析(中):https://developer.aliyun.com/article/1414100


handler.channelRead()


服务端收到读的请求是,会进入这个方法。


接着通过handler.received来处理msg,这个handle的链路很长,比较复杂,我们需要逐步剖析

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), this.url, this.handler);
        this.handler.received(channel, msg);
    }

服务端收到读的请求是,会进入这个方法。接着通过handler.received来处理msg ,而这个handler 是在服务发布的时候构建得。


DubboProtocol.createServer:


server = Exchangers.bind(url, this.requestHandler);


Exchanger.bind


public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        } else if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        } else {
            url = url.addParameterIfAbsent("codec", "exchange");
            return getExchanger(url).bind(url, handler);
        }
    }


通过扩展点选择到HeaderExchanger


public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
    }

起链路如下:


MultiMessageHandler(复合消息处理) —> HeartbeatHandle(心跳消息处理,接收心跳并发送心跳响应) —> AllChannelHandler (业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任

务给线程池处理)—> DecodeHandler (业务解码处理器)—> HeaderExchangeHandler —> DubboProtocol#requestHandler(new ExchangeHandlerAdapter())


而在构建 NettyServerHandler 得时候将 this 传了进去。this 即 NettyServer 。NettyServer是 AbstractPeer 得子类。所以 handler.received 此时会调用AbsstractPeer.received方法,这个方法用来判断服务端是否关闭了,如果关闭就直接返回,否则,通过handler处理链进行层层调用。

public void received(Channel ch, Object msg) throws RemotingException {
  if (closed) {
    return;
  }
  handler.received(ch, msg);
}


HeaderExchangeHandler.received


交互层请求响应处理,有三种处理方式


  1. handlerRequest,双向请求
  2. handler.received 单向请求
  3. handleResponse 响应消息
public void received(Channel channel, Object message) throws RemotingException {
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
      // 如果接收到的消息是请求类型    
      if (message instanceof Request) {
            Request request = (Request)message;
             // 如果请求是事件类型
            if (request.isEvent()) {
                this.handlerEvent(channel, request);
            // 如果请求是双向类型
            } else if (request.isTwoWay()) {
                this.handleRequest(exchangeChannel, request);
            // 如果请求不是事件类型也不是双向类型
            } else {
                this.handler.received(exchangeChannel, request.getData());
            }
        // 如果接收到的消息是响应类型
        } else if (message instanceof Response) {
            handleResponse(channel, (Response)message);
        // 如果接收到的消息是字符串类型
        } else if (message instanceof String) {
            // 如果是客户端发送的字符串消息,则报错
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            // 如果是服务器端接收到的字符串消息,则进行处理
            } else {
                String echo = this.handler.telnet(channel, (String)message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
          // 如果接收到的消息不是请求、响应或字符串类型
        } else {
            this.handler.received(exchangeChannel, message);//单向请求
        }
    }


handleRequest


接着调用handleRequest方法。这个方法中,构建返回的对象Response,并且最终会通过异步的方式来把msg传递到invoker中进行调用 handler.reply

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        Object data;
        if (req.isBroken()) {
            data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable)data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus((byte)40);
            channel.send(res);
        } else {
            data = req.getData();
            try {
                CompletionStage<Object> future = this.handler.reply(channel, data);// 可以返回一个结果
                future.whenComplete((appResult, t) -> {
                    try {
                        if (t == null) {
                            res.setStatus((byte)20);
                            res.setResult(appResult);
                        } else {
                            res.setStatus((byte)70);
                            res.setErrorMessage(StringUtils.toString(t));
                        }
                        channel.send(res);
                    } catch (RemotingException var5) {
                        logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + var5);
                    }
                });
            } catch (Throwable var6) {
                res.setStatus((byte)70);
                res.setErrorMessage(StringUtils.toString(var6));
                channel.send(res);
            }
        }
    }

此时的handler.reply,应该是DubboProtocol中构建的匿名内部类

所以调用handler.reply方法,自然就进入到了该匿名内部类中的reply方法中来。


DubboProtocol$requestHandler


private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
        //如果消息类型不是invocation,则抛出异常表示无法识别
            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
        //获得请求参数
            Invocation inv = (Invocation) message;
          // 获取 invoker 领域对象,这个对象是在发布服务的时候构建,然后封装成 exporter 存在map里面的。
            //根据key从发布的服务列表中查找到指定的服务端invoke,这个就是之前在讲服务发布时,涉及到的invoke对象。
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //发起请求调用,此时得到的invoker对象
            Result result = invoker.invoke(inv); // 发起对应调用
            return result.thenApply(Function.identity());
        }
  //......省略代码
};


getInvoker


Invoker<?> invoker = getInvoker(channel, inv);

相当于根据key来获取一个value值


回顾下之前,在发布的时候,调用了一个DubboProtocol.export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
      // 构建好了之后,把key 和 value存进去
        this.exporterMap.put(key, exporter);
    }
// 而getInvoker也会从map中拿到这个值
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ......
        DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv));
        } else {
            return exporter.getInvoker();
        }
    }


invoker.invoke()


invoker.invoke,发起本地服务调用,但是此时调用之前,invoke并不是一个直接调用的对象,而是包装过的。


在 ServiceConfig#doExportUrlsFor1Protocol 构建包装。最后的调用链路如下:


RegistryProtocol.InvokerDelegate.invoke —> DelegateProviderMetaDataInvoker.invoke —> AbstractProxyInvoker.invoke —> AbstractProxyInvoker(JavassistProxyFactory#getInvoker)

InvokerDelegate 未实现父类 InvokerWrapper invoke方法。进入到InvokerWrapper.invoke方法,这个是一个Invoker包装类,包装了URL地址信息和真正的Invoker代理对象。

public Result invoke(Invocation invocation) throws RpcException {
  return invoker.invoke(invocation);
}


DelegateProviderMetaDataInvoker


这里是一个委派类,它提供了服务提供者的元数序信息。

public Result invoke(Invocation invocation) throws RpcException {
  return invoker.invoke(invocation);
}


AbstractProxyInvoker


接着进入到AbstractProxyInvoker的invoke方法,在这个方法中,我们可以看到它会调用子类的doInvoke方法,获得返回结果。


其中proxy,表示服务端的对象实例,这个实例很显然是在构建动态代理Invoker对象时保存进来的。

public Result invoke(Invocation invocation) throws RpcException {
        try {
            Object value = this.doInvoke(this.proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            CompletableFuture<Object> future = this.wrapWithFuture(value);
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse();
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                return result;
            });
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException var5) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                this.logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", var5);
            }
            return AsyncRpcResult.newDefaultAsyncResult((Object)null, var5.getTargetException(), invocation);
        } catch (Throwable var6) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6);
        }
    }


JavassistProxyFactory.doInvoke


最后进入到具体的子类,也就是在服务的发布的时候通过 构建的

@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
}


服务实例是什么时候生成的


从上面的代码中可以看到,getInvoker中传递的proxy,实际就是对象实例,而这个参数是在serviceConfig中,

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);

而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。


总结


至此,服务消费的处理流程就分析完了。

ow new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + this.getUrl() + ", cause: " + var6.getMessage(), var6);

}

}

#### JavassistProxyFactory.doInvoke
最后进入到具体的子类,也就是在服务的发布的时候通过 构建的
```java
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
}


服务实例是什么时候生成的


从上面的代码中可以看到,getInvoker中传递的proxy,实际就是对象实例,而这个参数是在serviceConfig中,

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);

而 ref这个成员变量,是在spring启动时创建bean对象时,会注入这个对象的实例保存到ref中。


总结


至此,服务消费的处理流程就分析完了。

目录
相关文章
|
7月前
|
XML Dubbo Java
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
【Dubbo3高级特性】「框架与服务」服务的异步调用实践以及开发模式
170 0
|
2月前
|
监控 Dubbo Java
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
这篇文章详细介绍了如何将Spring Boot与Dubbo和Zookeeper整合,并通过Dubbo管理界面监控服务注册情况。
176 0
dubbo学习三:springboot整合dubbo+zookeeper,并使用dubbo管理界面监控服务是否注册到zookeeper上。
|
4月前
|
JSON Dubbo Java
【Dubbo协议指南】揭秘高性能服务通信,选择最佳协议的终极攻略!
【8月更文挑战第24天】在分布式服务架构中,Apache Dubbo作为一款高性能的Java RPC框架,支持多种通信协议,包括Dubbo协议、HTTP协议及Hessian协议等。Dubbo协议是默认选择,采用NIO异步通讯,适用于高要求的内部服务通信。HTTP协议通用性强,利于跨语言调用;Hessian协议则在数据传输效率上有优势。选择合适协议需综合考虑性能需求、序列化方式、网络环境及安全性等因素。通过合理配置,可实现服务性能最优化及系统可靠性提升。
67 3
|
4月前
|
缓存 Dubbo Java
Dubbo服务消费者启动与订阅原理
该文章主要介绍了Dubbo服务消费者启动与订阅的原理,包括服务消费者的启动时机、启动过程以及订阅和感知最新提供者信息的方式。
Dubbo服务消费者启动与订阅原理
|
4月前
|
Dubbo 网络协议 Java
深入掌握Dubbo服务提供者发布与注册原理
该文章主要介绍了Dubbo服务提供者发布与注册的原理,包括服务发布的流程、多协议发布、构建Invoker、注册到注册中心等过程。
深入掌握Dubbo服务提供者发布与注册原理
|
4月前
|
负载均衡 Dubbo Java
Dubbo服务Spi机制和原理
该文章主要介绍了Dubbo中的SPI(Service Provider Interface)机制和原理,包括SPI的基本概念、Dubbo中的SPI分类以及SPI机制的实现细节。
Dubbo服务Spi机制和原理
|
4月前
|
C# 开发者 Windows
勇敢迈出第一步:手把手教你如何在WPF开源项目中贡献你的第一行代码,从选择项目到提交PR的全过程解析与实战技巧分享
【8月更文挑战第31天】本文指导您如何在Windows Presentation Foundation(WPF)相关的开源项目中贡献代码。无论您是初学者还是有经验的开发者,参与这类项目都能加深对WPF框架的理解并拓展职业履历。文章推荐了一些适合入门的项目如MvvmLight和MahApps.Metro,并详细介绍了从选择项目、设置开发环境到提交代码的全过程。通过具体示例,如添加按钮点击事件处理程序,帮助您迈出第一步。此外,还强调了提交Pull Request时保持专业沟通的重要性。参与开源不仅能提升技能,还能促进社区交流。
52 0
|
6月前
|
Dubbo 前端开发 Java
Dubbo3 服务原生支持 http 访问,兼具高性能与易用性
本文展示了 Dubbo3 triple 协议是如何简化从协议规范与实现上简化开发测试、入口流量接入成本的,同时提供高性能通信、面向接口的易用性编码。
16651 13
|
4月前
|
缓存 负载均衡 Dubbo
Dubbo服务集群容错原理(重要)
该文章主要介绍了Dubbo服务集群容错的原理,包括集群容错技术的概念、Dubbo中使用的集群容错技术种类及其原理。
|
4月前
|
负载均衡 Dubbo 算法
Dubbo服务负载均衡原理
该文章主要介绍了Dubbo服务负载均衡的原理,包括Dubbo中负载均衡的实现位置、为什么需要负载均衡机制、Dubbo支持的负载均衡算法以及随机负载均衡策略的源码分析。