前言 上篇文章分析了Rocketmq的nameServer的源码,在继续分析源码之前,先考虑一个问题,设计一个mq并且是高性能的mq最最核心的问题是什么,我个人认为主要是有俩个方面,1:消息的网络传输,2:消息的读写,这两个决定了mq的高性能。
本文主要分析Rocketmq的网络通信部分,源码位于remoting模块下,Rocketmq通信模块是基于Netty建设的,在阅读源码之前最好对Netty有个系统性的认知,这样在读起来更加迅速,另外要学会看类图和。 先看下Remote模块的核心类结构图
RemotingServer解读(RemotingClient代码类似就不说了)
public interface RemotingServer extends RemotingService { //注册时间对应处理器 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); //注册默认处理器 void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); //本地监听端口 int localListenPort(); Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); //同步发送 RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; //异步发送 void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; //只发送 不关心结果 void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; } 复制代码
RemotingCommand解读,它是Netty传输的载体,并且承载着编解码的工作,其属性如下:
//请求命令编码 private int code; //语言 每日Java private LanguageCode language = LanguageCode.JAVA; //版本号 private int version = 0; //请求客户端的序列号 private int opaque = requestId.getAndIncrement(); //标记 表示请求类型 0 :request, 1 : response private int flag = 0; //描述 private String remark; //扩展属性 private HashMap<String, String> extFields; // 每个请求的请求头 private transient CommandCustomHeader customHeader; 复制代码
消息格式:
网络异常,图片无法展示
|
消息编码encode解读
public ByteBuffer encode() { //消息总长度 int length = 4; //消息头部数据 byte[] headerData = this.headerEncode(); //加上消息头长度 length += headerData.length; //加上body体长度 if (this.body != null) { length += body.length; } //分配ByteBuffer +4是因为加上消息头部的长度 ByteBuffer result = ByteBuffer.allocate(4 + length); //放入4字节的消息总长度 result.putInt(length); //将消息头长度放入ByteBuffer result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); //将消息头数据放入ByteBuffer result.put(headerData); // 将消息主体放入ByteBuffer if (this.body != null) { result.put(this.body); } //重置position位置 result.flip(); return result; } //source是消息 public static byte[] markProtocolType(int source, SerializeType type) { byte[] result = new byte[4]; //序列化的类型 result[0] = type.getCode(); //source右移16位&11111111 获取16-23位的值 result[1] = (byte) ((source >> 16) & 0xFF); //右移8位 获取8-15位的值 result[2] = (byte) ((source >> 8) & 0xFF); //获取0-7位的值 result[3] = (byte) (source & 0xFF); return result; } 复制代码
start()方法解读
@Override public void start() { //初始化默认线程池,用与处理多个hander this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); ServerBootstrap childHandler = //设置boss,worker线程池 this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) //使用epoll还是select .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) //tcp的缓冲队列个数包括已经建立的链接和处于三次握手过程中的链接 .option(ChannelOption.SO_BACKLOG, 1024) //允许重复使用本地地址和端口 .option(ChannelOption.SO_REUSEADDR, true) //keepalive .option(ChannelOption.SO_KEEPALIVE, false) //禁止使用Nagle算法,使用于小数据即时传输 .childOption(ChannelOption.TCP_NODELAY, true) //发送缓冲区 .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) //接收缓冲区大小 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) //绑定地址 .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(),//编码器 new NettyDecoder(),//解码器 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//netty的心跳 //连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理 new NettyConnectManageHandler(), //当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发 new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { //异步绑定地址端口 ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } //定时清理responseTable的超时结果 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); } 复制代码
通过start()方法可以看出Rocketmq的通信方式使用Netty的主从多线程模型
结合这上图大致说下主从多线程模型