前言
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。再看看其核心属性:
//oneway方式发送的限流控制 protected final Semaphore semaphoreOneway; //异步发送的限流控制 protected final Semaphore semaphoreAsync; //缓存是已经发送,但是还未收到回应的map protected final ConcurrentMapresponseTable =new ConcurrentHashMap(256); //事件code对应的处理器 protected final HashMap<integer *="" request="" code="" ,="" pair> processorTable = new HashMap<integer, pair>(64); //Netty事件处理如心跳,连接,关闭等 protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); //默认处理器,上篇已经介绍使用方式 protected PairdefaultRequestProcessor; //消息处理器 //ssl相关 protected volatile SslContext sslContext; 复制代码
具体的交互流程:在RocketMQ消息队列中支持通信的方式主要同步接口(invokeSync),异步接口(invokeAsync) 直接发送(invokeOneway),以异步发送为例流程图如下所示:整个RPC交互流程成分为上边几个步骤,以异步调用为例分析下整个流程接下来一起看看主要的方法,
@1客户端调用NettyRemotingClient.invokeAsync
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException //发送的开始时间 long beginStartTime = System.currentTimeMillis(); //@2根据地址获取Channel对象 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { //存在rpcHook的话执行Hook函数 this.rpcHook.doBeforeRequest(addr, request); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTooMuchRequestException("invokeAsync call timeout"); } //@3 发送消息 this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback); } catch (RemotingSendRequestException e) { log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } 复制代码
@2:根据addr获取到Netty的通道channel,如果通道不存在,就创建一个新的
@3:调用NettyRemotingAbstract.invokeAsyncImpl()方法发送异步消息
//异步发送消息 public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { //startTime long beginStartTime = System.currentTimeMillis(); //获取请求Id final int opaque = request.getOpaque(); //@4 异步发送用信息量作为并发控制 boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout"); } //@5 生成返回值并将回调函数设置到responseFuture存入responseTable中 final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { //@6 发送消息 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } } 复制代码