RocketMQ源码分析-Rpc通信模块(remoting)二

简介: 今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。

微信截图_20220531121135.png

前言

今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。微信截图_20220531121636.png再看看其核心属性:

//oneway方式发送的限流控制
    protected final Semaphore semaphoreOneway;
    //异步发送的限流控制
    protected final Semaphore semaphoreAsync;
//缓存是已经发送,但是还未收到回应的map
    protected final ConcurrentMapresponseTable =new ConcurrentHashMap(256);
//事件code对应的处理器
    protected final HashMap<integer *="" request="" code="" ,="" pair> processorTable =
            new HashMap<integer, pair>(64);
    //Netty事件处理如心跳,连接,关闭等
    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
//默认处理器,上篇已经介绍使用方式
    protected PairdefaultRequestProcessor; //消息处理器
    //ssl相关
    protected volatile SslContext sslContext;
复制代码

具体的交互流程:在RocketMQ消息队列中支持通信的方式主要同步接口(invokeSync),异步接口(invokeAsync) 直接发送(invokeOneway),以异步发送为例流程图如下所示:微信截图_20220531121717.png整个RPC交互流程成分为上边几个步骤,以异步调用为例分析下整个流程接下来一起看看主要的方法,

@1客户端调用NettyRemotingClient.invokeAsync
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException
//发送的开始时间
long beginStartTime = System.currentTimeMillis();
//@2根据地址获取Channel对象
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
    try {
        if (this.rpcHook != null) {
            //存在rpcHook的话执行Hook函数
            this.rpcHook.doBeforeRequest(addr, request);
        }
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            throw new RemotingTooMuchRequestException("invokeAsync call timeout");
        }
        //@3 发送消息
        this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
    } catch (RemotingSendRequestException e) {
        log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
        this.closeChannel(addr, channel);
        throw e;
    }
} else {
    this.closeChannel(addr, channel);
    throw new RemotingConnectException(addr);
}
复制代码
@2:根据addr获取到Netty的通道channel,如果通道不存在,就创建一个新的
@3:调用NettyRemotingAbstract.invokeAsyncImpl()方法发送异步消息
//异步发送消息
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
                            final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    //startTime    
    long beginStartTime = System.currentTimeMillis();
    //获取请求Id
    final int opaque = request.getOpaque();
    //@4 异步发送用信息量作为并发控制
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
        }
        //@5 生成返回值并将回调函数设置到responseFuture存入responseTable中
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
        this.responseTable.put(opaque, responseFuture);
        try {
            //@6 发送消息
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    }
                    requestFail(opaque);
                    log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
        } else {
            String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                            timeoutMillis,
                            this.semaphoreAsync.getQueueLength(),
                            this.semaphoreAsync.availablePermits()
                    );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}
复制代码
@4:申请信号量,异步发送是通过信号量来控制流量,主要靠ResponseFuture类
@5:构建ResponseFuture对象并放入responseTable中,responseTable保存了ResponseFuture返回响应对象,在收到服务端的响应时通过从responseTable获取到响应对象来控制锁的释放以及回调函数的触发
@6:channel.writeAndFlush方法发送到Server端服务端接收Client端的消NettyServerHandler.channelRead0()
@7:NettyRemotingAbstract.processMessageReceived处理接收到的消息,可以看到方法内部是区分request和response的,根据流程来,进入@8
@8:处理消息请求,这个方法在上篇文章那解析过,主要内容就是根据RemotingCommand的请求业务码来匹配到相应的业务处理器,然后生成一个新的线程提交至对应的业务线程池进行异步处理,并在处理完成后发送响应给客户端,进入@9
NettyClientHandler.channelRead0和server端处理方式一样调用processMessageReceived处理进入@10rocessResponseCommand@10:首先获取到opaque,从responseTable获取到对应的ResponseFuture,异步请求的话触发回调函数,同步的话就释放锁。
总结
Rpc通信是MQ的基石,RocketMQ的通信模型是一套很标准的实时通讯模型,涉及到到网络部分很值得去学习,笔者水平有限,理解不到位的地方希望能多多指出,感谢!继续努力。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java 应用服务中间件
详解rocketMq通信模块&升级构想(下)
详解rocketMq通信模块&升级构想(下)
430 0
详解rocketMq通信模块&升级构想(下)
|
6月前
|
消息中间件 Java 中间件
详解rocketMq通信模块&升级构想(上)
详解rocketMq通信模块&升级构想(上)
191 0
|
Dubbo Java 应用服务中间件
由浅入深RPC通信原理实战1
由浅入深RPC通信原理实战1
73 0
|
6月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
6月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
3月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
|
消息中间件 编解码 网络协议
聊聊 RocketMQ 网络通讯模块
RocketMQ 的网络通讯模块负责生产者、消费者与 Broker 之间的网络通信。 笔者学习 RocketMQ 也是从通讯模块源码开始的,并且从源码里汲取了很多营养。
37135 3
聊聊 RocketMQ 网络通讯模块
|
6月前
|
存储 消息中间件 对象存储
RocketMQ 中冷热分离的随机索引模块详解
本文主要介绍了RocketMQ 中冷热分离的随机索引特点、具体内容、与其他系统对比等内容。
102650 11
|
6月前
|
消息中间件 缓存 API
下一篇
无影云桌面