Netty 高效的Reactor线程模型

简介: Netty 高效的Reactor线程模型

高效的Reactor线程模型

1.Reactor单线程模型

所有IO操作都在同一个NIO线程上面完成的,NIO线程责任如下

1)作为NIO服务端,接收客户端的TCP连接.

2)作为NIO客户端,向服务端发起TCP连接.

3)读取通信对端的请求或者应答消息.

4)向通信对端发送请求或者应答消息.

 

对于小容量应用场景可以使用单线程模型,一个NIO线程无法满足海量信息的编码,解码,读取和发送.

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

 

       try {

           ServerBootstrap b = new ServerBootstrap();

           b.group(bossGroup, bossGroup )

                  .channel(NioServerSocketChannel.class)

 

2.Reactor多线程模型

与单线程模型最大的区别在于有一组NIO线程处理IO操作

1)有一个专门的NIO线程---Acceptor线程用于监听服务端,接收客户端的TCP的连接请求.

2)网络的IO操作----读写等由一个NIO线程池负责,负责消息的解码,读取和发送.

3)1NIO线程可以同时处理N条链路,但是一个链路只对应一个NIO线程,防止发生并发操作问题.

 

在绝大数场景下,Reactor多线程模型都可以满足性能需求,但是,在极特殊应用场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题.

: 百万客户端并发连接,或者服务端需要对客户端的握手消息进行安全认证,认证本身很损耗性能

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

       EventLoopGroup workerGroup = new NioEventLoopGroup();

 

       try {

           ServerBootstrap b = new ServerBootstrap();

           b.group(bossGroup, workerGroup)

                  .channel(NioServerSocketChannel.class)

 

 

3.主从线程模型

服务端接收客户端连接不再是一个单独线程,而是一个独立的线程池.

EventLoopGroup bossGroup = new NioEventLoopGroup();

       EventLoopGroup workerGroup = new NioEventLoopGroup();

 

       try {

           ServerBootstrap b = new ServerBootstrap();

           b.group(bossGroup, workerGroup)

                  .channel(NioServerSocketChannel.class)

Acceptor接收到客户端TCP连接请求并完成处理后,将新创建的socketChannel注册到IO线程池的某个线程上,

由他负责socketchannel的读写和编解码工作Acceptor线程池仅仅用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,由I/O线程负责后续的I/O操作。

 

Netty用于接收客户端请求的线程池职责如下。

(1)接收客户端TCP连接,初始化Channel参数;

(2)将链路状态变更事件通知给ChannelPipeline。

Netty处理I/O操作的Reactor线程池职责如下。

(1)异步读取通信对端的数据报,发送读事件到ChannelPipeline;

(2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口;

(3)执行系统调用Task;

(4)执行定时任务Task,例如链路空闲状态监测定时任务。

 

 

Netty的零拷贝

 

传统意义上的拷贝

 

是在发送数据的时候,

1.File.read(bytes)

2.Socket.send(bytes)

这种需要四次数据拷贝和四次上下文切换;

 

 1.  数据从磁盘读取到内核的read buffer

  1. 数据从内核缓冲区拷贝到用户缓冲区
  2. 数据从用户缓冲区拷贝到内核的socket buffer
  3. 数据从内核的socket buffer拷贝到网卡接口的缓冲区

 

零拷贝的概念

 

明显第二步和第三步是没有必要的,通过javaFileChannel.transferTo方法,可以避免2次多余的拷贝

  1. 调用transferTo,数据从文件由DMA引擎拷贝到内核read buffer
  2. 接着DMA从内核read buffer将数据拷贝到网卡接口buffer

上面的两次操作都不需要CPU参与,所以就达到了零拷贝

主要体现在三个方面:

  • Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf      之间的拷贝.

ByteBuf header = ...

ByteBuf body = ...

CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();

compositeByteBuf.addComponents(true, header, body);

 

不过需要注意的是, 虽然看起来 CompositeByteBuf 是由两个 ByteBuf 组合而成的, 不过在 CompositeByteBuf 内部, 这两个 ByteBuf 都是单独存在的, CompositeByteBuf 只是逻辑上是一个整体.

上面 CompositeByteBuf 代码还以一个地方值得注意的是, 我们调用 addComponents(boolean increaseWriterIndex, ByteBuf... buffers) 来添加两个 ByteBuf, 其中第一个参数是 true, 表示当添加新的 ByteBuf 时, 自动递增 CompositeByteBuf 的 writeIndex.

如果我们调用的是

compositeByteBuf.addComponents(header, body);

那么其实 compositeByteBufwriteIndex 仍然是0, 因此此时我们就不可能从 compositeByteBuf 中读取到数据, 这一点希望大家要特别注意.

 

除了上面直接使用 CompositeByteBuf 类外, 我们还可以使用 Unpooled.wrappedBuffer 方法, 它底层封装了 CompositeByteBuf 操作, 因此使用起来更加方便:

ByteBuf header = ...

ByteBuf body = ...

ByteBuf allByteBuf = Unpooled.wrappedBuffer(header, body);

 

 

  • 通过      wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象,      进而避免了拷贝操作.

我们可以使用 Unpooled 的相关方法, 包装这个 byte 数组, 生成一个新的 ByteBuf 实例, 而不需要进行拷贝操作. 上面的代码可以改为:

byte[] bytes = ...

ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);

可以看到, 我们通过 Unpooled.wrappedBuffer 方法来将 bytes 包装成为一个 UnpooledHeapByteBuf 对象, 而在包装的过程中, 是不会有拷贝操作的. 即最后我们生成的生成的 ByteBuf 对象是和 bytes 数组共用了同一个存储空间, 对 bytes 的修改也会反映到 ByteBuf 对象中.

 

 

  • ByteBuf      支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝.

slice 操作和 wrap 操作刚好相反, Unpooled.wrappedBuffer 可以将多个 ByteBuf 合并为一个, 而 slice 操作可以将一个 ByteBuf 切片 为多个共享一个存储区域的 ByteBuf 对象.

ByteBuf 提供了两个 slice 操作方法:

public ByteBuf slice();

public ByteBuf slice(int index, intlength);

不带参数的 slice 方法等同于 buf.slice(buf.readerIndex(), buf.readableBytes()) 调用, 即返回 buf 中可读部分的切片. 而 slice(int index, int length) 方法相对就比较灵活了, 我们可以设置不同的参数来获取到 buf 的不同区域的切片.

下面的例子展示了 ByteBuf.slice 方法的简单用法:

ByteBuf byteBuf = ...

ByteBuf header = byteBuf.slice(0, 5);

ByteBuf body = byteBuf.slice(5, 10);

slice 方法产生 header 和 body 的过程是没有拷贝操作的, header 和 body 对象在内部其实是共享了 byteBuf 存储空间的不同部分而已. 即:

 

 

  • 通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write      方式导致的内存拷贝问题.

 

public static void copyFileWithFileChannel(String srcFileName, String destFileName) throwsException {

   RandomAccessFile srcFile = new RandomAccessFile(srcFileName,"r");

   FileChannel srcFileChannel = srcFile.getChannel();

RandomAccessFile destFile = newRandomAccessFile(destFileName, "rw");

   FileChannel destFileChannel = destFile.getChannel();

long position = 0;

   long count = srcFileChannel.size();

srcFileChannel.transferTo(position, count, destFileChannel);

}

 

 

 

 

 

 

1.netty的接收和发送ByteBuffer,使用堆外直接内存进行读写,不需要进行字节缓冲区二次拷贝

内存池

使用内存池分配器创建直接内存缓冲区

ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(1024);

直接使用堆内存创建

ByteBuf byteBuf1 = Unpooled.directBuffer(1024);

 

采用内存池创建速度是直接创建的23.

内存池实现:Cache中获得内存区域,调用allocate进行内存分配,通过recyclerget方法循环使用ByteBuf对象

 

 

合理的设置TCP参数在某些场景下对于性能的提升可以起到显著作用,

TCP_NODELAY

  解释:是否启用Nagle算法,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量。

  使用建议:如果需要发送一些较小的报文,则需要禁用该算法,从而最小化报文传输延时。只有在网络通信非常大时(通常指已经到100k+/秒了),设置为false会有些许优势,因此建议大部分情况下均应设置为true

 

SO_LINGER

  解释: Socket参数,关闭Socket的延迟时间,默认值为-1,表示禁用该功能。-1表示socket.close()方法立即返回,但OS底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,OS放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。

  使用建议: 使用默认值,不做设置。

 

SO_SNDBUF

  解释: Socket参数,TCP数据发送缓冲区大小,即TCP发送滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。缓冲区的大小决定了网络通信的吞吐量(网络吞吐量=缓冲区大小/网络时延)。

  使用建议: 缓冲区大小设为网络吞吐量达到带宽上限时的值,即缓冲区大小=网络带宽*网络时延。以千兆网卡为例进行计算,假设网络时延为1ms,缓冲区大小=1000Mb/s * 1ms = 128KB

 

SO_RCVBUF

  与SO_SNDBUF同理。

 

SO_REUSEADDR

  解释:是否复用处于TIME_WAIT状态连接的端口,适用于有大量处于TIME_WAIT状态连接的场景,如高并发量的Http短连接场景等。

 

SO_BACKLOG

  解释: Backlog主要是指当ServerSocket还没执行accept时,这个时候的请求会放在os层面的一个队列里,这个队列的大小即为backlog值,这个参数对于大量连接涌入的场景非常重要,例如服务端重启,所有客户端自动重连,瞬间就会涌入很多连接,如backlog不够大的话,可能会造成客户端接到连接失败的状况,再次重连,结果就会导致服务端一直处理不过来(当然,客户端重连最好是采用类似tcp的自动退让策略);

  使用建议: backlog的默认值为os对应的net.core.somaxconn,调整backlog队列的大小一定要确认ulimit -n中允许打开的文件数是够的。

 

SO_KEEPALIVE

  解释:是否使用TCP的心跳机制;

  使用建议: 心跳机制由应用层自己实现;

 

 

Reactor线程保护

如果仅仅捕获IO异常可能导致Reactor线程跑飞,在循环体内一定要捕获Throwable,而不是IO异常或者Exception.

捕获了Throwable,即便发生了意外未知对异常,线程也不会跑飞,它休眠1s,防止死循环导致的异常绕接,然后继续恢复执行.这样处理的核心概念就是:

1.某个消息的一茶馆不应该导致整条链路不可用.

2.某条链路不可用不应该导致其他链路不可用.

3.某个进程不可用不应该导致其他集群节点不可用

 

需要指出的是,尽管我们可以将单个进程打开的最大句柄数修改的非常大,但是当句柄数达到一定数量级之后,处理效率将出现明显下降,因此,需要根据服务器的硬件配置和处理能力进行合理设置。如果单个服务器性能不行也可以通过集群的方式实现。

 

 

 

相关文章
|
5月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
3月前
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
3月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
32 1
|
4月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
119 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
50 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
3月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
66 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
4月前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
|
3月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
3月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
42 0
|
5月前
|
存储 Kubernetes NoSQL
Tair的发展问题之Tair在适配不同的存储介质时对于线程模型该如何选择
Tair的发展问题之Tair在适配不同的存储介质时对于线程模型该如何选择