Dubbo Consumer 初始化连接过程

简介:

开篇

  • 这篇文章的目的在于梳理Dubbo Consumer的连接过程,在这个过程中会涉及Exchanger、Transporter、NettyClient的整个过程梳理。
  • 在这个过程中会涉及到ChannelHandler的封装过程,调用过程中会涉及ChannelHandler的调用过程分析。
  • Dubbo Consumer的初始化连接过程本质就是对配置中的reference引入的服务进行连接建立,按照service+注册中心的维度,与某个注册中心下某个service的所有的provider建立连接的过程。


调用关系图

Dubbo Consumer 连接建立过程

  • Dubbo Consumer初始化连接过程的时序图如上图。

Dubbo Consumer 调用栈

  • Dubbo Consumer初始化连接过程的调用栈如上图。


RegistryDirectory

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    // dubbo://192.168.1.5:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=77830&side=provider&timestamp=1578407866021
    // dubbo://192.168.1.5:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=77845&side=provider&timestamp=1578407879523
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);

        for (URL providerUrl : urls) {           
            URL url = mergeUrl(providerUrl);
            String key = url.toFullString(); // The parameter urls are sorted
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (enabled) {
                        // protocol.refer(serviceType, url)负责和对应的provider建立连接过程
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {

                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }
}
  • 每个reference + 注册中心 包含一个RegistryDirectory对象,一个RegistryDirectory对象包含某个注册中心下某个service的所有providers。
  • toInvokers()参数中的urls就是provider的url地址,针对某个服务的reference过程就是和这个服务的所有provider建立连接。
  • protocol.refer(serviceType, url)实现和服务下的一个provider建立连接的过程,Dubbo协议对应的protocol对象为DubboProtocol。


DubboProtocol

public class DubboProtocol extends AbstractProtocol {

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    private ExchangeClient[] getClients(URL url) {
        // 判断是否共享连接,判断依据url当中的connections参数
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // 判断是否共享连接
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }
        // 如果非共享连接按照的connections的值创建指定的连接数
        // 如果是共享连接就创建一个连接数
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }


    private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>

    private ExchangeClient getSharedClient(URL url) {
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);

        // 省略非核心代码

        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            if (referenceClientMap.containsKey(key)) {
                return referenceClientMap.get(key);
            }
            // initClient负责创建ExchangeClient
            ExchangeClient exchangeClient = initClient(url);
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            locks.remove(key);
            return client;
        }
    }

    private ExchangeClient initClient(URL url) {
        // 按照先client后server的顺序,DEFAULT_REMOTING_CLIENT = "netty";
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        // DubboCodec.NAME为dubbo
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // DEFAULT_HEARTBEAT = 60 * 1000
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
        }
        return client;
    }
}
  • DubboProtocol的refer执行getClients()返回ExchangeClient对象。
  • getClients()会根据是否共享连接或配置的连接数决定和一个provider创建多少连接数。
  • getClients()方法内部的initClient()方法会执行Exchangers.connect()方法返回ExchangeClient对象。
  • Exchangers.connect()的参数requestHandler是ExchangeHandlerAdapter对象,是Handler封装过程中最底层的ChannelHandler。
public class DubboProtocol extends AbstractProtocol {

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isInfoEnabled()) {
                logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };
}
  • ExchangeHandlerAdapter内部包含通用的处理的连接建立的方法。


Exchanger层

public class Exchangers {

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }
}


public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}
  • Exchangers的connect()方法最终执行HeaderExchanger的connect()方法,返回HeaderExchangeClient对象。
  • Transporters.connect()的参数new DecodeHandler(new HeaderExchangeHandler(handler))),这里的handler为requestHandler = new ExchangeHandlerAdapter。
  • 按照DecodeHandler => HeaderExchangeHandler => ExchangeHandlerAdapter封装。


Transporter层

public class Transporters {

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }
}


public class NettyTransporter implements Transporter {

    public static final String NAME = "netty3";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}
  • Transporters的connect()方法最终执行的是NettyTransporter的connect()方法。
  • NettyTransporter的connect()的方法负责生成NettyClient对象,参数对象ChannelHandler为DecodeHandler。


NettyClient层

public class NettyClient extends AbstractClient {

    private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
            Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
            Constants.DEFAULT_IO_THREADS);
    private ClientBootstrap bootstrap;

    private volatile Channel channel; // volatile, please copy reference to use

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        // wrapChannelHandler负责包装ChannelHandler对象
        super(url, wrapChannelHandler(url, handler));
    }

    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

    @Override
    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try {
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

            if (ret && future.isSuccess()) {
                Channel newChannel = future.getChannel();
                newChannel.setInterestOps(Channel.OP_READ_WRITE);
                try {
                    // Close old channel
                    Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            oldChannel.close();
                        } finally {
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                } finally {
                    if (NettyClient.this.isClosed()) {
                        try {
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                        NettyClient.this.channel = newChannel;
                    }
                }
            } 
    }

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }
}


public abstract class AbstractClient extends AbstractEndpoint implements Client {

    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);

        send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

        shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

        // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
        reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

        try {
            doOpen();
        } catch (Throwable t) {
            // 省略无关代码
        }

        try {
            // connect.
            connect();
        } catch (RemotingException t) {
            // 省略无关代码
        } catch (Throwable t) {
            // 省略无关代码            
        }

        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
        ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    }
}
  • NettyClient在其父类AbstractClient的构造函数中执行doOpen()和connect()方法,具体的方法在子类NettyClient具体实现。
  • NettyClient的实现本质上就是Netty的通用的客户端实现,doOpen()负责创建Netty客户端对象,connect()负责建立连接。
public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
  • ChannelHandlers的wrapInternal负责针对ChannelHandler进行封装,整个顺序为
    MultiMessageHandler => HeartbeatHandler => AllChannelHandler => DecodeHandler => HeaderExchangeHandler => ExchangeHandlerAdapter。


Consumer handler封装关系

handler封装关系


AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}
  • AllChannelHandler的事件处理是通过线程池cexecutor去完成的,而cexecutor的实现是cached类型线程池,所以在大量响应需要处理的时候会有大量的线程池产生。
目录
相关文章
|
5月前
|
Dubbo 应用服务中间件 Nacos
bug篇之基于docker安装nacos(2.1.1)使用dubbo连接不上的问题
bug篇之基于docker安装nacos(2.1.1)使用dubbo连接不上的问题
|
负载均衡 监控 Dubbo
Dubbo连接注册中心和直连的区别
Dubbo连接注册中心和直连的区别
213 0
|
Dubbo Java 应用服务中间件
Dubbo新版本zk注册中心连接问题
目录 一、使用zkclient作为zk连接客户端问题 1、Maven依赖如下 2、服务提供者配置文件 3、启动服务提供者 4、原因分析 5、解决办法 (1)降低dubbo版本 (2)添加ZookeeperTransporter拓展实现 二、使用curator作为zk连接客户端版本问题 1、Maven依赖如下 2、服务提供者配置文件 3、启动服务提供者 4、curator作为zk连接客户端可行依赖 三、总结
Dubbo新版本zk注册中心连接问题
|
开发框架 Dubbo 应用服务中间件
《Dubbo 如何成为连接各种异构微服务体系的服务开发框架》电子版地址
Dubbo 如何成为连接各种异构微服务体系的服务开发框架
84 0
《Dubbo 如何成为连接各种异构微服务体系的服务开发框架》电子版地址
|
负载均衡 Dubbo 安全
Dubbo3 源码解读-宋小生-2:启动服务前服务配置ServiceConfig类型是如何初始化的?
## 2.1 示例源码回顾: 为了方便我们理解记忆,这里先来回顾下上一章我们说的示例代码,如下所示: ```java public class Application { public static void main(String[] args) throws Exception { startWithBootstrap(); } private
163 0
Dubbo3 源码解读-宋小生-2:启动服务前服务配置ServiceConfig类型是如何初始化的?
|
存储 运维 Dubbo
Dubbo3 源码解读-宋小生-3:框架,应用程序,模块领域模型Model对象的初始化
> Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双 11 基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。 > 本文
465 0
Dubbo3 源码解读-宋小生-3:框架,应用程序,模块领域模型Model对象的初始化
|
开发框架 Dubbo Cloud Native
Dubbo 如何成为连接异构微服务体系的最佳服务开发框架
从编程开发的角度来说,Dubbo 首先是一款 RPC 服务框架,它最大的优势在于提供了面向接口代理的服务编程模型,对开发者屏蔽了底层的远程通信细节。同时 Dubbo 也是一款服务治理框架,它为分布式部署的微服务提供了服务发现、流量调度等服务治理解决方案。 在这篇文章中,我们将以以上基础能力为背景,尝试突破 Dubbo 体系自身,探索如何利用 Dubbo 对多协议、多服务发现模型的支持,来实现
178 0
Dubbo 如何成为连接异构微服务体系的最佳服务开发框架
|
运维 Dubbo Cloud Native
Dubbo3 源码解读-宋小生-8:Dubbo启动器DubboBootstrap借助双重校验锁的单例模式进行对象的初始化
> Dubbo3 已经全面取代 HSF2 成为阿里的下一代服务框架,2022 双十一基于 Dubbo3 首次实现了关键业务不停推、不降级的全面用户体验提升,从技术上,大幅提高研发与运维效率的同时地址推送等关键资源利用率提升超 40%,基于三位一体的开源中间件体系打造了阿里在云上的单元化最佳实践和统一标准,同时将规模化实践经验与技术创新贡献开源社区,极大的推动了开源技术与标准的发展。 > 本文是
246 0
|
Dubbo 应用服务中间件 容器
问题记录:Dubbo服务实现类的成员变量未初始化问题
开发时碰到了一个这种问题:在类中定义了三个boolean类型的成员变量,这个类是Dubbo的服务实现类,也就是在类上使用DubboService注解的类,笔者在这个类里面声明了三个boolean类型的成员变量,默认初始值都是false,但是当程序进入到方法以后,还没开始操作这三个变量,其中有两个就变成了true,然后笔者不得不在方法内从新给他们初始化一次,因为这个问题导致了程序的流程出现了问题,因为笔者是使用他们来控制一个流程走向的。
90 0
|
存储 Dubbo 应用服务中间件
Dubbo Consumer 发送请求
在 Dubbo 系列文章的最后,我们回过头来看一下整个 RPC 过程是如何运作起来的,本文着重介绍整个调用链路中 Consumer 发送请求的执行过程。
下一篇
无影云桌面