引言
本文主要从源码角度分析RocketMQ
的底层通信机制以及RPC
调用的过程。对于RocketMQ
通信机制的深入理解,是我们分析和领会整个RocketMQ
系统消息流转流程的基石。
- 消息
- 消息底层如何流转
- 总结
一、消息
我们可以设想一下,如果自己是RocketMQ的设计者,我们该如何设计一个消息系统。大家都知道RocketMQ是消息中间件,那么首先要解决的问题就是消息本身该如何设计。因为整个系统中它是传递的对象,是数据的载体。那么对于消息本身来说,我们需要定义消息的格式,这样客户端与服务端之间可以遵循定义好的消息格式来进行通信。在RocketMQ体系中,通过RemotingCommand对象来进行交互,,它对数据进行了封装。
(1)消息协议设计以及编解码
我们看下RemotingCommand
中的部分属性如下所示:
private int code; //操作码 private LanguageCode language = LanguageCode.JAVA; //实现语言 private int version = 0; //程序版本 private int opaque = requestId.getAndIncrement(); //reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 private int flag = 0; //区分是普通RPC还是onewayRPC的标志 private String remark; //传输自定义文本信息 private HashMap<String, String> extFields; //自定义扩展信息 private transient CommandCustomHeader customHeader;
我们再看下具体的消息的格式是怎样的,如下图所示:
消息长度:总长度,四个字节存储,占用一个int类型;
序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
消息头数据:经过序列化后的消息头数据;
消息主体数据:消息主体的二进制字节数据内容。
所以消息本身结构以及消息如何传递,都是我们需要了解的
图片来源于网络
我们看下remoting模块中的protocal文件夹下的相关定义类,如下所示:
对于消息还需要进行编码以及解码操作来提高消息传递的效率,所以RemotingCommand 中还提供了编码、解码的方法。RemotingCommand 的encode()源码如下所示:
public ByteBuffer encode() { // 1> header length size int length = 4; //存储头部长度 // 2> header data length byte[] headerData = this.headerEncode(); //报文头部数据 length += headerData.length; // 3> body data length if (this.body != null) { length += body.length; } ByteBuffer result = ByteBuffer.allocate(4 + length); //分配字节缓冲区 // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); // body data; if (this.body != null) { result.put(this.body); } result.flip(); //将缓冲区翻转,用于将ByteBuffer放到网络通道中进行传输 return result; }
我们再看下消息的解码操作,源码中的实现如下所示:
//解码操作 public static RemotingCommand decode(final byte[] array) { ByteBuffer byteBuffer = ByteBuffer.wrap(array); return decode(byteBuffer); } // public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }
(2)消息传输方式
在RocketMQ
系统中,消息的通信方式主要有三种:
1、可靠的同步传输;
2、可靠的异步传输;
3、单向传输;
二、消息如何流转
我们知道RocketMQ底层的通信是通过Netty实现。其中NettyRemotingClient与NettyRemotingServer是RemotingClient和RemotingServer这两个接口的实现类,也是比较重要的实现类。NettyRemotingClient与NettyRemotingServer也同时继承了NettyRemotingAbstract抽象类,该类抽象了invokeSync、invokeOneway等公用的方法实现。
(1)客户端请求发送
首先我们来看下 NettyRemotingClient
中的重要源码分析,如下所示:
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { ... }
我们可以看出客户端继承了NettyRemotingAbstract抽象类同时实现了RemotingClient接口。其中比较重要的方法包括同步调用、异步调用以及单向调用。我么通过异步调用的方式来举例,NettyRemotingClient 根据地址信息获取或者创建channel,接着会进行invokeAsyncImpl方法的调用,此时客户端会将数据流转给公共的抽象类NettyRemotingAbstract 进行统一处理,在此类中完成真正的请求发送动作。发送消息的源码如下所示:
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); //请求的ID final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); } //构建ResponseFuture final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); //将ResponseFuture放入responseTable this.responseTable.put(opaque, responseFuture); try { //使用Netty的channel进行请求数据的发送 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { //消息发送后执行动作 @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { //如果消息发送成功,则直接将属性置为true,同时返回 responseFuture.setSendRequestOK(true); return; } //如果失败 requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { //异常处理过程 responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
此处有需要留意的部分,如下:
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(256);
opaque 表示请求码,用来标识不同的请求。这个请求码会与ResponseFuture
映射在一起。
(2)服务端响应流程
服务端代码如下,继承了NettyRemotingAbstract
同时实现了RemotingServer
。
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { ... }
在该类中的start方法如下所示,它的主要作用是启动Netty服务器
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { //向管道中添加数据处理逻辑 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
三、总结
本文主要介绍了RocketMQ
在消息传递过程中底层通信的过程,同时阐述了消息的格式以及相应的编解码的过程。下篇文章会和大家来一起继续拆解RocketMQ
上层的流程。