请求发起,等待响应
首先前面两节我们都说到了客户端用户线程的等待,也就是一次请求在等待响应。
这个等待在代码里面是怎么体现的呢?
答案藏在这个方法里面:
org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke
首先你看这个类名,AsyncToSyncInvoker,异步调用转同步调用,就感觉不简单,里面肯定搞事情了。
标号为 ① 的地方,是 invoker 调用,调用之后的返回是一个AsyncRpcResult。
在这个方法继续往下 Debug,没几步就可以走到这个地方:
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int, java.util.concurrent.ExecutorService)
135 行就是 channel.send(req)。在往外发请求了,在发请求之前构建了一个 DefaultFuture。然后在请求发送出去之后,140 行返回了这个 future 。
最关键的秘密就藏在 133 行的这个 newFuture 里面。
看一看对应代码:
这个 newFuture 主要干了两件事:
- 初始化 DefaultFuture 。
- 检测是否超时。
我们看看初始化 DefaultFuture 的时候干了啥事:
这个 id 是 AtomicLong 从 0 开始自增出来的。
代码里面还给了这样一行注释:
getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
说这个方法当增加到 MAX_VALUE 后再次调用会变成 MIN_VALUE。但是没有关系,负数也是可以当做 ID 来用的。
这个 DefaultFuture 对象构建完成后是返回回去了。
返回到哪里去呢?
就是 DubboInvoker 的 doInvoker 方法中下面框起来的代码:
接着说说标号为 ② 的地方。
首先是判断当前调用模式是否是同步调用。我们这里就是同步调用,于是进入到 if 判断里面的逻辑。在这里面一看,调用的 get 方法,还带有超时时间。
看一下这个 get 方法是怎么样的:
可以看到这个 get 方法不是一个简单的异步编程的 CompletableFuture.get 。里面还包含了一个 ThreadlessExecutor 的 waitAndDrain 方法的逻辑。
这个方法一进来就是 queue.take 方法,阻塞等待。
这个队列里面装的是什么东西?
全局查找往这个队列里面放东西的逻辑,只有下面这一处:
说明这个队列里面扔的是一个 runable 的任务。
这个任务是什么呢?
我们这里先买个关子,放到下一小节里面去讲。
你只要知道:如果队列里面没有任务,那么用户线程就会一直在 take 这里阻塞等待。