在rocketmq中,整个同步调用主要包括两个过程:
(1)请求方生成消息,发送给响应方,并等待响应方回包;
(2)响应方收到请求消息后,消费这条消息,并发出一条响应消息给请求方。
整个过程实质上是两个消息收发过程的组合。所以这里最关键的问题是如何将异步的消息收发过程构建成一个同步的过程。其中主要有两个问题需要解决:
2.1 请求方如何同步等待回包
这个问题的解决方案中,一个关键的数据结构是RequestResponseFuture。
public class RequestResponseFuture {
private final String correlationId;
private final RequestCallback requestCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final Message requestMsg = null;
private long timeoutMillis;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile Message responseMsg = null;
private volatile boolean sendRequestOk = true;
private volatile Throwable cause = null;
}