RocketMQ源码分析-Rpc通信模块(remoting)一

简介: 上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。

微信截图_20220531121135.png

前言 上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。

本文主要分析Rocketmq的网络通信部分,源码位于remoting模块下,Rocketmq通信模块是基于Netty建设的,在阅读源码之前最好对Netty有个系统性的认知,这样在读起来更加迅速,另外要学会看类图和。 先看下Remote模块的核心类结构图 微信截图_20220531121030.png

RemotingServer解读(RemotingClient代码类似就不说了)
public interface RemotingServer extends RemotingService {
    //注册时间对应处理器
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
                           final ExecutorService executor);
    //注册默认处理器
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
    //本地监听端口
    int localListenPort();
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    //同步发送
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
                               final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
            RemotingTimeoutException;
    //异步发送    
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
                     final InvokeCallback invokeCallback) throws InterruptedException,
            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    //只发送 不关心结果    
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
            RemotingSendRequestException;
}
复制代码
RemotingCommand解读,它是Netty传输的载体,并且承载着编解码的工作,其属性如下:
//请求命令编码
private int code;
//语言 每日Java
private LanguageCode language = LanguageCode.JAVA;
//版本号
private int version = 0;
//请求客户端的序列号
private int opaque = requestId.getAndIncrement();
//标记 表示请求类型 0 :request, 1 : response 
private int flag = 0;
//描述
private String remark;
//扩展属性
private HashMap<String, String> extFields;
// 每个请求的请求头
private transient CommandCustomHeader customHeader; 
复制代码
消息格式:

网络异常,图片无法展示
|

消息编码encode解读
public ByteBuffer encode() {
   //消息总长度
   int length = 4;
   //消息头部数据
   byte[] headerData = this.headerEncode();
   //加上消息头长度
   length += headerData.length;
   //加上body体长度
   if (this.body != null) {
       length += body.length;
   }
   //分配ByteBuffer +4是因为加上消息头部的长度
   ByteBuffer result = ByteBuffer.allocate(4 + length);
   //放入4字节的消息总长度
   result.putInt(length);
   //将消息头长度放入ByteBuffer
   result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
   //将消息头数据放入ByteBuffer
   result.put(headerData);
   // 将消息主体放入ByteBuffer
   if (this.body != null) {
       result.put(this.body);
   }
   //重置position位置
   result.flip();
   return result;
}
//source是消息
public static byte[] markProtocolType(int source, SerializeType type) {
   byte[] result = new byte[4];
   //序列化的类型
   result[0] = type.getCode();
   //source右移16位&11111111 获取16-23位的值
   result[1] = (byte) ((source >> 16) & 0xFF);
   //右移8位 获取8-15位的值
   result[2] = (byte) ((source >> 8) & 0xFF);
   //获取0-7位的值
   result[3] = (byte) (source & 0xFF);
   return result;
}
复制代码
start()方法解读
@Override
public void start() {
    //初始化默认线程池,用与处理多个hander
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
    ServerBootstrap childHandler =
            //设置boss,worker线程池
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    //使用epoll还是select
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    //tcp的缓冲队列个数包括已经建立的链接和处于三次握手过程中的链接
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //允许重复使用本地地址和端口
                    .option(ChannelOption.SO_REUSEADDR, true)
                    //keepalive
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    //禁止使用Nagle算法,使用于小数据即时传输
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //发送缓冲区
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    //接收缓冲区大小
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    //绑定地址
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                            new HandshakeHandler(TlsSystemConfig.tlsMode))
                                    .addLast(defaultEventExecutorGroup,
                                            new NettyEncoder(),//编码器
                                            new NettyDecoder(),//解码器
                                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//netty的心跳
                                            //连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理
                                            new NettyConnectManageHandler(),
                                            //当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发 
                                            new NettyServerHandler()
                                    );
                        }
                    });
    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }
    try {
        //异步绑定地址端口
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
    //定时清理responseTable的超时结果
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}
复制代码
通过start()方法可以看出Rocketmq的通信方式使用Netty的主从多线程模型

微信截图_20220531120939.png结合这上图大致说下主从多线程模型

1.Server端eventLoopGroupBoss线程池监听TCP连接请求
2.连接建立好之后,转发给eventLoopGroupSelector中的线程,eventLoopGroupSelector将建立连接的socket注册到selector上
3.eventLoopGroupSelector中的selector监听I/O事件,同时处理I/O事件,经过PipleLine使用defaultEventExecutorGroup一个一个的handler处理下去
4.NettyServerHandler负责使用业务线程池处理对应的业务事件启动结束后,开始看下client和server端是怎么交互的但是在此之前看下二者共同的父类
下个章节继续解读NettyRemotingAbstract以及NettyEventExecutor的实现。
后话 源码解读是一个漫长而且乏味的过程,这也是很多人坚持不下去的原因,不忘初心,方得始终。
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java 应用服务中间件
详解rocketMq通信模块&升级构想(下)
详解rocketMq通信模块&升级构想(下)
429 0
详解rocketMq通信模块&升级构想(下)
|
6月前
|
消息中间件 Java 中间件
详解rocketMq通信模块&升级构想(上)
详解rocketMq通信模块&升级构想(上)
186 0
|
Dubbo Java 应用服务中间件
由浅入深RPC通信原理实战1
由浅入深RPC通信原理实战1
64 0
|
6月前
|
Java 应用服务中间件 API
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
干翻RPC系列之HesssionRPC:HesssionRPC的开发体验和源码分析
|
3月前
|
XML 存储 JSON
(十二)探索高性能通信与RPC框架基石:Json、ProtoBuf、Hessian序列化详解
如今这个分布式风靡的时代,网络通信技术,是每位技术人员必须掌握的技能,因为无论是哪种分布式技术,都离不开心跳、选举、节点感知、数据同步……等机制,而究其根本,这些技术的本质都是网络间的数据交互。正因如此,想要构建一个高性能的分布式组件/系统,不得不思考一个问题:怎么才能让数据传输的速度更快?
|
消息中间件 编解码 网络协议
聊聊 RocketMQ 网络通讯模块
RocketMQ 的网络通讯模块负责生产者、消费者与 Broker 之间的网络通信。 笔者学习 RocketMQ 也是从通讯模块源码开始的,并且从源码里汲取了很多营养。
37119 3
聊聊 RocketMQ 网络通讯模块
|
6月前
|
消息中间件 缓存 API
|
6月前
|
存储 消息中间件 对象存储
RocketMQ 中冷热分离的随机索引模块详解
本文主要介绍了RocketMQ 中冷热分离的随机索引特点、具体内容、与其他系统对比等内容。
102644 8
|
消息中间件 负载均衡 中间件
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
185 3
【Alibaba中间件技术系列】「RocketMQ技术专题」让我们一起探索一下DefaultMQPullConsumer的实现原理及源码分析
|
6月前
|
消息中间件 关系型数据库 MySQL
使用Nginx的stream模块实现MySQL反向代理与RabbitMQ负载均衡
使用Nginx的stream模块实现MySQL反向代理与RabbitMQ负载均衡
383 0

热门文章

最新文章