RocketMQ源码分析-Rpc通信模块(remoting)一

简介: 上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。

微信截图_20220531121135.png

前言 上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。

本文主要分析Rocketmq的网络通信部分,源码位于remoting模块下,Rocketmq通信模块是基于Netty建设的,在阅读源码之前最好对Netty有个系统性的认知,这样在读起来更加迅速,另外要学会看类图和。 先看下Remote模块的核心类结构图 微信截图_20220531121030.png

RemotingServer解读(RemotingClient代码类似就不说了)
public interface RemotingServer extends RemotingService {
    //注册时间对应处理器
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
                           final ExecutorService executor);
    //注册默认处理器
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
    //本地监听端口
    int localListenPort();
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    //同步发送
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
                               final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
            RemotingTimeoutException;
    //异步发送    
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
                     final InvokeCallback invokeCallback) throws InterruptedException,
            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    //只发送 不关心结果    
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
            RemotingSendRequestException;
}
复制代码
RemotingCommand解读,它是Netty传输的载体,并且承载着编解码的工作,其属性如下:
//请求命令编码
private int code;
//语言 每日Java
private LanguageCode language = LanguageCode.JAVA;
//版本号
private int version = 0;
//请求客户端的序列号
private int opaque = requestId.getAndIncrement();
//标记 表示请求类型 0 :request, 1 : response 
private int flag = 0;
//描述
private String remark;
//扩展属性
private HashMap<String, String> extFields;
// 每个请求的请求头
private transient CommandCustomHeader customHeader; 
复制代码
消息格式:

网络异常,图片无法展示
|

消息编码encode解读
public ByteBuffer encode() {
   //消息总长度
   int length = 4;
   //消息头部数据
   byte[] headerData = this.headerEncode();
   //加上消息头长度
   length += headerData.length;
   //加上body体长度
   if (this.body != null) {
       length += body.length;
   }
   //分配ByteBuffer +4是因为加上消息头部的长度
   ByteBuffer result = ByteBuffer.allocate(4 + length);
   //放入4字节的消息总长度
   result.putInt(length);
   //将消息头长度放入ByteBuffer
   result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
   //将消息头数据放入ByteBuffer
   result.put(headerData);
   // 将消息主体放入ByteBuffer
   if (this.body != null) {
       result.put(this.body);
   }
   //重置position位置
   result.flip();
   return result;
}
//source是消息
public static byte[] markProtocolType(int source, SerializeType type) {
   byte[] result = new byte[4];
   //序列化的类型
   result[0] = type.getCode();
   //source右移16位&11111111 获取16-23位的值
   result[1] = (byte) ((source >> 16) & 0xFF);
   //右移8位 获取8-15位的值
   result[2] = (byte) ((source >> 8) & 0xFF);
   //获取0-7位的值
   result[3] = (byte) (source & 0xFF);
   return result;
}
复制代码
start()方法解读
@Override
public void start() {
    //初始化默认线程池,用与处理多个hander
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
    ServerBootstrap childHandler =
            //设置boss,worker线程池
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    //使用epoll还是select
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    //tcp的缓冲队列个数包括已经建立的链接和处于三次握手过程中的链接
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //允许重复使用本地地址和端口
                    .option(ChannelOption.SO_REUSEADDR, true)
                    //keepalive
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    //禁止使用Nagle算法,使用于小数据即时传输
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //发送缓冲区
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    //接收缓冲区大小
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    //绑定地址
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                            new HandshakeHandler(TlsSystemConfig.tlsMode))
                                    .addLast(defaultEventExecutorGroup,
                                            new NettyEncoder(),//编码器
                                            new NettyDecoder(),//解码器
                                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//netty的心跳
                                            //连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理
                                            new NettyConnectManageHandler(),
                                            //当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发 
                                            new NettyServerHandler()
                                    );
                        }
                    });
    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }
    try {
        //异步绑定地址端口
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
    //定时清理responseTable的超时结果
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}
复制代码
通过start()方法可以看出Rocketmq的通信方式使用Netty的主从多线程模型

微信截图_20220531120939.png结合这上图大致说下主从多线程模型

1.Server端eventLoopGroupBoss线程池监听TCP连接请求
2.连接建立好之后,转发给eventLoopGroupSelector中的线程,eventLoopGroupSelector将建立连接的socket注册到selector上
3.eventLoopGroupSelector中的selector监听I/O事件,同时处理I/O事件,经过PipleLine使用defaultEventExecutorGroup一个一个的handler处理下去
4.NettyServerHandler负责使用业务线程池处理对应的业务事件启动结束后,开始看下client和server端是怎么交互的但是在此之前看下二者共同的父类
下个章节继续解读NettyRemotingAbstract以及NettyEventExecutor的实现。
后话 源码解读是一个漫长而且乏味的过程,这也是很多人坚持不下去的原因,不忘初心,方得始终。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
3月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
108 16
|
7月前
|
消息中间件 存储 RocketMQ
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
|
4月前
|
物联网 C# Windows
看看如何使用 C# 代码让 MQTT 进行完美通信
看看如何使用 C# 代码让 MQTT 进行完美通信
648 0
|
4月前
|
物联网 网络性能优化 Python
"掌握MQTT协议,开启物联网通信新篇章——揭秘轻量级消息传输背后的力量!"
【8月更文挑战第21天】MQTT是一种轻量级的消息传输协议,以其低功耗、低带宽的特点在物联网和移动应用领域广泛应用。基于发布/订阅模型,MQTT支持三种服务质量级别,非常适合受限网络环境。本文详细阐述了MQTT的工作原理及特点,并提供了使用Python `paho-mqtt`库实现的发布与订阅示例代码,帮助读者快速掌握MQTT的应用技巧。
94 0
|
7月前
|
消息中间件 存储 JSON
服务器的异步通信——RabbitMQ2
服务器的异步通信——RabbitMQ
58 0
|
7月前
|
消息中间件 缓存 中间件
服务器的异步通信——RabbitMQ1
服务器的异步通信——RabbitMQ
61 0
|
7月前
|
消息中间件 缓存 API
下一篇
DataWorks