本文代码摘录的时候,将一些与本流程无关的内容去掉了,如有需要请看源码。
如果大家对Dubbo RPC原理原理感兴趣,可以看我之前写过的另外一篇博客《 Dubbo RPC源码解读》。
一、 思考与目标
1. 思考
并发情况下,dubbo的RPC模型如下图所示:
如图所示,Consumer端可能同时有多个线程调用Provider的服务,此时Provider会启动多个线程来分别处理这些并发调用,处理完以后将数据返回。在这种多线程环境中,Dubbo是如何做到Consumer Thread A不会拿到其他线程的请求结果?
其实这是一个非常普遍的问题,例如我们的服务器与MQ服务器、数据库、缓存等等第三方服务器通信时,都需要确保并发环境下数据不会错乱,并且要尽可能降低服务器的性能损耗。
2. 目标
l  探究Dubbo RPC多线程通信原理。
l  写一个简单的示例,实现多线程数据交换
二、 Dubbo多线程通信原理
1. 流程图
如上图所示,消费端多线程并行消费服务的场景,主要流程如下
- 获取DubboInvoker对象;
- 将请求体信息封装在一个Request对象中,Request中会包括一个自增的id;然后将Request存到一个ConcurrentHashMap中(key=id,value= DefaultFuture)
- 将request数据写入Channel,
- Consumer Thread执行Defaultfuture#get()方法等待返回结果;
- 服务提供方创建多线程处理用户请求,并将放回结果封装在Response中(包括Request#id)
- 将response写入Channel
- 消费方从Channel中收到数据以后,解析出id,从Map中解析出DefaultFuture
- 唤醒Consumer Thread,返回结果;
- DefaultFuture也会启动一个定时程序,检查在timeout内,结果是否返回,如果未返回,将DefaultFuture从map中移除,并抛出超时异常。
2. 多线程下的通信
DubboInvoker#doInvoke方法中,在ExchangeClient#request(inv, timeout)调用时,返回一个DefaultFuture对象,接着会调用DefaultFuture.get()方法(等待返回结果)。
对于consumer端而言,服务器会为每一个请求创建一个线程,因为rpc操作是一个慢动作,为了节省资源,当线程发送rpc请求后,需要让当前线程释放资源、进入等待队列,当获取到返回结果以后,再唤醒这个线程。
RPC请求的过程为:每一个RPC请求都有一个唯一的id,RPC请求的时候,会将此id也发送给provider;provider处理完请求后会将此id和返回结果一同返回给consumer;consumer收到返回信息以后解析出id,然后从FUTURES中找到相对应的DefaultFuture,并通过DefaultFuture.done#signal()唤醒之前等待线程。
下面根据源码详细讨论一下多线程情况下rpc请求的细节,即dubbo多线程模型的实现。
1) DefaultFuture#field
这里列出了与多线程相关的几个重要的属性
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
2) DefaultFuture#构造函数
创建好DefaultFuture对象以后,将DefaultFuture存入了FUTURES中。其实每一次请求,多会生成一个唯一的id,即对于每个服务器而言,id唯一。
public DefaultFuture(Channel channel, Request request, int timeout){
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
3) DefaultFuture#get
主要逻辑是:获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,要么被唤醒;如果被唤醒,则返回rpc的结果。
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
4) DefaultFuture#received
收到返回结果时,调用此方法。首先从FUTURES中根据id获取DefaultFuture,如果不存在,打印一条日志;如果存在则通过signal释放一个唤醒信号,将线程从等待队列中唤醒。
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at ")。
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
5) DefaultFuture#received
以下代码是用来从FUTURES清理rpc请求超时的DefaultFuture
private static class RemotingInvocationTimeoutScan implements Runnable {
public void run() {
while (true) {
try {
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
logger.error("Exception when scan the timeout invocation of remoting.", e);
}
}
}
}
static {
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
th.setDaemon(true);
th.start();
}
6) HeaderExchangeHandler#handleRequest
创建Response对象时,带上请求id。
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
三、 示例
根据dubbo的实现方案,我写了一个简单的示例,以方便理解,见https://gitee.com/wuzhengfei/great-truth中ConcurrentRWTester代码