高效的Reactor线程模型
1.Reactor单线程模型
所有IO操作都在同一个NIO线程上面完成的,NIO线程责任如下
1)作为NIO服务端,接收客户端的TCP连接.
2)作为NIO客户端,向服务端发起TCP连接.
3)读取通信对端的请求或者应答消息.
4)向通信对端发送请求或者应答消息.
对于小容量应用场景可以使用单线程模型,一个NIO线程无法满足海量信息的编码,解码,读取和发送.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, bossGroup )
.channel(NioServerSocketChannel.class)
2.Reactor多线程模型
与单线程模型最大的区别在于有一组NIO线程处理IO操作
1)有一个专门的NIO线程---Acceptor线程用于监听服务端,接收客户端的TCP的连接请求.
2)网络的IO操作----读写等由一个NIO线程池负责,负责消息的解码,读取和发送.
3)1个NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题.
在绝大数场景下,Reactor多线程模型都可以满足性能需求,但是,在极特殊应用场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题.
如: 百万客户端并发连接,或者服务端需要对客户端的握手消息进行安全认证,认证本身很损耗性能
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
3.主从线程模型
服务端接收客户端连接不再是一个单独线程,而是一个独立的线程池.
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
Acceptor接收到客户端TCP连接请求并完成处理后,将新创建的socketChannel注册到IO线程池的某个线程上,
由他负责socketchannel的读写和编解码工作。Acceptor线程池仅仅用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操作。
Netty用于接收客户端请求的线程池职责如下。
(1)接收客户端TCP连接,初始化Channel参数;
(2)将链路状态变更事件通知给ChannelPipeline。
Netty处理I/O操作的Reactor线程池职责如下。
(1)异步读取通信对端的数据报,发送读事件到ChannelPipeline;
(2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;
(3)执行系统调用Task;
(4)执行定时任务Task,例如链路空闲状态监测定时任务。
Netty的零拷贝
传统意义上的拷贝
是在发送数据的时候,
1.File.read(bytes)
2.Socket.send(bytes)
这种需要四次数据拷贝和四次上下文切换;
1. 数据从磁盘读取到内核的read buffer
- 数据从内核缓冲区拷贝到用户缓冲区
- 数据从用户缓冲区拷贝到内核的socket buffer
- 数据从内核的socket buffer拷贝到网卡接口的缓冲区
零拷贝的概念
明显第二步和第三步是没有必要的,通过java的FileChannel.transferTo方法,可以避免2次多余的拷贝
- 调用transferTo,数据从文件由DMA引擎拷贝到内核read buffer
- 接着DMA从内核read buffer将数据拷贝到网卡接口buffer
上面的两次操作都不需要CPU参与,所以就达到了零拷贝
主要体现在三个方面:
- Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝.
ByteBuf header = ...
ByteBuf body = ...
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true, header, body);
不过需要注意的是, 虽然看起来 CompositeByteBuf 是由两个 ByteBuf 组合而成的, 不过在 CompositeByteBuf 内部, 这两个 ByteBuf 都是单独存在的, CompositeByteBuf 只是逻辑上是一个整体.
上面 CompositeByteBuf 代码还以一个地方值得注意的是, 我们调用 addComponents(boolean increaseWriterIndex, ByteBuf... buffers) 来添加两个 ByteBuf, 其中第一个参数是 true, 表示当添加新的 ByteBuf 时, 自动递增 CompositeByteBuf 的 writeIndex.
如果我们调用的是
compositeByteBuf.addComponents(header, body);
那么其实 compositeByteBuf 的 writeIndex 仍然是0, 因此此时我们就不可能从 compositeByteBuf 中读取到数据, 这一点希望大家要特别注意.
除了上面直接使用 CompositeByteBuf 类外, 我们还可以使用 Unpooled.wrappedBuffer 方法, 它底层封装了 CompositeByteBuf 操作, 因此使用起来更加方便:
ByteBuf header = ...
ByteBuf body = ...
ByteBuf allByteBuf = Unpooled.wrappedBuffer(header, body);
- 通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作.
我们可以使用 Unpooled 的相关方法, 包装这个 byte 数组, 生成一个新的 ByteBuf 实例, 而不需要进行拷贝操作. 上面的代码可以改为:
byte[] bytes = ...
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
可以看到, 我们通过 Unpooled.wrappedBuffer 方法来将 bytes 包装成为一个 UnpooledHeapByteBuf 对象, 而在包装的过程中, 是不会有拷贝操作的. 即最后我们生成的生成的 ByteBuf 对象是和 bytes 数组共用了同一个存储空间, 对 bytes 的修改也会反映到 ByteBuf 对象中.
- ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝.
slice 操作和 wrap 操作刚好相反, Unpooled.wrappedBuffer 可以将多个 ByteBuf 合并为一个, 而 slice 操作可以将一个 ByteBuf 切片 为多个共享一个存储区域的 ByteBuf 对象.
ByteBuf 提供了两个 slice 操作方法:
public ByteBuf slice();
public ByteBuf slice(int index, intlength);
不带参数的 slice 方法等同于 buf.slice(buf.readerIndex(), buf.readableBytes()) 调用, 即返回 buf 中可读部分的切片. 而 slice(int index, int length) 方法相对就比较灵活了, 我们可以设置不同的参数来获取到 buf 的不同区域的切片.
下面的例子展示了 ByteBuf.slice 方法的简单用法:
ByteBuf byteBuf = ...
ByteBuf header = byteBuf.slice(0, 5);
ByteBuf body = byteBuf.slice(5, 10);
用 slice 方法产生 header 和 body 的过程是没有拷贝操作的, header 和 body 对象在内部其实是共享了 byteBuf 存储空间的不同部分而已. 即:
- 通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题.
public static void copyFileWithFileChannel(String srcFileName, String destFileName) throwsException {
RandomAccessFile srcFile = new RandomAccessFile(srcFileName,"r");
FileChannel srcFileChannel = srcFile.getChannel();
RandomAccessFile destFile = newRandomAccessFile(destFileName, "rw");
FileChannel destFileChannel = destFile.getChannel();
long position = 0;
long count = srcFileChannel.size();
srcFileChannel.transferTo(position, count, destFileChannel);
}
1.netty的接收和发送ByteBuffer,使用堆外直接内存进行读写,不需要进行字节缓冲区二次拷贝
内存池
使用内存池分配器创建直接内存缓冲区
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(1024);
直接使用堆内存创建
ByteBuf byteBuf1 = Unpooled.directBuffer(1024);
采用内存池创建速度是直接创建的23倍.
内存池实现:从Cache中获得内存区域,调用allocate进行内存分配,通过recycler的get方法循环使用ByteBuf对象
合理的设置TCP参数在某些场景下对于性能的提升可以起到显著作用,
TCP_NODELAY
解释:是否启用Nagle算法,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量。
使用建议:如果需要发送一些较小的报文,则需要禁用该算法,从而最小化报文传输延时。只有在网络通信非常大时(通常指已经到100k+/秒了),设置为false会有些许优势,因此建议大部分情况下均应设置为true。
SO_LINGER
解释: Socket参数,关闭Socket的延迟时间,默认值为-1,表示禁用该功能。-1表示socket.close()方法立即返回,但OS底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,OS放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。
使用建议: 使用默认值,不做设置。
SO_SNDBUF
解释: Socket参数,TCP数据发送缓冲区大小,即TCP发送滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。缓冲区的大小决定了网络通信的吞吐量(网络吞吐量=缓冲区大小/网络时延)。
使用建议: 缓冲区大小设为网络吞吐量达到带宽上限时的值,即缓冲区大小=网络带宽*网络时延。以千兆网卡为例进行计算,假设网络时延为1ms,缓冲区大小=1000Mb/s * 1ms = 128KB。
SO_RCVBUF
与SO_SNDBUF同理。
SO_REUSEADDR
解释:是否复用处于TIME_WAIT状态连接的端口,适用于有大量处于TIME_WAIT状态连接的场景,如高并发量的Http短连接场景等。
SO_BACKLOG
解释: Backlog主要是指当ServerSocket还没执行accept时,这个时候的请求会放在os层面的一个队列里,这个队列的大小即为backlog值,这个参数对于大量连接涌入的场景非常重要,例如服务端重启,所有客户端自动重连,瞬间就会涌入很多连接,如backlog不够大的话,可能会造成客户端接到连接失败的状况,再次重连,结果就会导致服务端一直处理不过来(当然,客户端重连最好是采用类似tcp的自动退让策略);
使用建议: backlog的默认值为os对应的net.core.somaxconn,调整backlog队列的大小一定要确认ulimit -n中允许打开的文件数是够的。
SO_KEEPALIVE
解释:是否使用TCP的心跳机制;
使用建议: 心跳机制由应用层自己实现;
Reactor线程保护
如果仅仅捕获IO异常可能导致Reactor线程跑飞,在循环体内一定要捕获Throwable,而不是IO异常或者Exception.
捕获了Throwable后,即便发生了意外未知对异常,线程也不会跑飞,它休眠1s,防止死循环导致的异常绕接,然后继续恢复执行.这样处理的核心概念就是:
1.某个消息的一茶馆不应该导致整条链路不可用.
2.某条链路不可用不应该导致其他链路不可用.
3.某个进程不可用不应该导致其他集群节点不可用
需要指出的是,尽管我们可以将单个进程打开的最大句柄数修改的非常大,但是当句柄数达到一定数量级之后,处理效率将出现明显下降,因此,需要根据服务器的硬件配置和处理能力进行合理设置。如果单个服务器性能不行也可以通过集群的方式实现。