二、NIO vs BIO
2.1、stream与channel的区别
1、缓冲层面
stream不会自动缓冲数据,是比较高层的API,不会关心系统提供的一些缓冲功能(例如发送数据使用到的发送缓冲区sendbuffer,接收数据的receivebuffer)。
channel:例如socketchannel就能够利用系统提供的发送缓冲区,接收缓冲区,更为底层。(网卡直接读取的缓冲,你自己定义的缓冲还要复制到这个缓冲区)
2、阻塞
stream只支持阻塞。
channel同时支持阻塞、非阻塞API,网络channel可以配合selector实现多路复用,文件channel不能够配合selector。
3、二者均为全双工,即读写可以同时进行。
全双工通信:是双方可以同时通信(读的同时可以写,写的同时可以读),半双工是必须交替通信,不论是BIO还是NIO都是全双工。
2.2、IO模型
2.2.1、网络IO模型
一定要从网络上的IO模型上去理解,不要被同步、异步、阻塞、非阻塞组合的名词所干扰!
java的read()方法不能够真正完成读取数据的功能,从网络上读取数据的功能并不是java干的,而是需要由操作系统来干,这就牵扯到一次从用户程序空间-linux内核空间的切换,也就是java调用read方法实际上间接的是调用操作系统的方法,操作系统这边有一个read()方法能够真正的完成数据的读取。
数据读取分为两个阶段:①等待数据(调用read等待拷贝到网卡缓冲区时间)。②复制数据(从网卡中读取到缓冲区里)。复制结束了就会从linux内核空间切换到用户程序空间。
2.2.2、五种网络IO模型
五种网络IO模型:参考《UNIX 网络编程 -卷1》,用C语言写的触及更加底层的API
分别是:阻塞IO、非阻塞IO、多路复用、信号驱动、异步IO
阻塞IO:切换到内核空间这一整段时间用户程序是阻塞住的,在此期间什么活都不能干。
非阻塞IO:while(true){int result = read()} 能够不断的尝试获取读取到的数据,在等待数据过程中若是都能够直接得到结果,不过一般都是为0,一旦到复制数据阶段用户程序就会阻塞,这一段时间内会进行缓冲区拷贝!
多路复用:核心在于selector。相对于阻塞IO在做一件事时做不了另外一件事(例如在进行accept()等待时无法做read()事件)。一个是具体的事件阻塞,另一个是select阻塞,对于select可以响应多种事件,每一次响应可以做一批事件。
宏观上来看是进行了两次阻塞:
若是事件复杂呢,此时就能够看出多路复用的好处:阻塞IO是只能等待具体的事件某个阶段完成才能够往下执行,而多路复用是所有的事件都能往后执行。
总结:多路复用是事件驱动,阻塞IO是代码驱动。
信号驱动:不太常用。
异步IO:同步是线程自己获取结果(自己从头到尾把一件事情干完);异步则是线程不去自己获取结果,而是由其他线程送来结果。
同步:线程自己去获取结果(一个线程)。
异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)。
针对于是哪个线程要得到结果取决于回调方法,如等你的数据都完成了,调用上一次定义的回调方法,调用方法时候把真正读取到的结果作为参数传过去,这就完成了两个线程之间的数据传递,这就是异步模式。
针对于IO模型来区分同步异步:同步阻塞、同步非阻塞、同步多路复用、异步阻塞(不存在)、异步非阻塞
阻塞IO:同步。
非阻塞IO:同步。
多路复用(可单线程、多线程):同步。
异步IO:异步。
2.3、零拷贝(文件传输)
2.3.1、传统IO问题
传统的 IO 将一个文件通过 socket 写出
File f = new File("helloword/data.txt"); RandomAccessFile file = new RandomAccessFile(file, "r"); byte[] buf = new byte[(int)f.length()]; file.read(buf); Socket socket = ...; socket.getOutputStream().write(buf);
内部工作流程是这样的:
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝
接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
用户态与内核态的切换发生了 3 次,这个操作比较重量级
数据拷贝了共 4 次
2.3.2、NIO 优化
以下说的是filechannel,也就是文件传输零拷贝。
优化1
通过 DirectByteBuf
ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yZATRSSv-1642430711320)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0025.png)]
大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
通过专门线程访问引用队列,根据虚引用释放堆外内存
减少了一次数据拷贝,用户态与内核态的切换次数没有减少
优化2(linux 2.1)
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yHgDNJ1D-1642430711321)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0026.png)]
java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到
只发生了一次用户态与内核态的切换
数据拷贝了 3 次
进一步优化(linux 2.4)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L2g3PF1s-1642430711321)(F:\在学资料\Netty网络编程\Netty教程源码资料\讲义\img\0027.png)]
java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
零拷贝:内核态和用户态没有拷贝操作。
不用在java内存中间拷贝,拷贝操作都发生在操作系统和网络设备,socket的缓冲区,不需要复制到java内存中了。
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有:
更少的用户态与内核态的切换
不利用 cpu 计算,减少 cpu 缓存伪共享(这部分拷贝操作可以使用DMA来进行操作,并不会利用CPU来计算)
零拷贝适合小文件传输:尽量不要传输大文件,一般文件拷贝都是从缓冲区发送到网卡,那么过大文件的话就会有问题,可能会占用其他文件的读写。
2.4、AIO
2.4.1、AIO说明
AIO介绍
AIO 用来解决数据复制阶段的阻塞问题
同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果。
AIO的相关问题
对于AIO,两个阶段都能够腾出手来不让cpu去做,但是linux对于异步IO的支持并不好,真正对异步IO性能操作好的是windows,通过一套IOCP的API实现了真正的异步IO。
一般我们将java程序部署到linux上,由于并没有真正支持IO,所以并没有进行大量使用,并且编程的复杂度也较高。
netty5.0也对异步IO进行了实现,结果发现性能没有优势,并且引入了没有必要的复杂性,维护成本高,之后netty就将5.0废弃了。
现在Linux全面支持了异步IO,新API叫做io_uring。AIO 的新归宿:io_uring
注意:AIO支持文件、网络;多路复用只支持网络
2.4.2、文件AIO
目的:读取文件内容到bytebuffer中,读取的操作是异步操作,非主线程来完成,而是让额外的线程执行!
import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll; /** * @ClassName FileAIO * @Author ChangLu * @Date 2021/12/28 19:01 * @Description 异步文件读取 */ @Slf4j public class FileAIO { public static void main(String[] args) { try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("data1.txt"), StandardOpenOption.READ)) { final ByteBuffer buffer = ByteBuffer.allocate(16); //1、首先执行 log.info("read before..."); //第三个参数是异步接口 fileChannel.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() { //3、实际读取完成之后其他线程执行 @Override public void completed(Integer result, ByteBuffer attachment) { log.info("read completed..."); buffer.flip(); debugAll(buffer); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); //2、紧接着执行 log.info("read end ..."); //注意:这里使用一个读取字符阻塞API是为了防止主线程结束而导致守护线程也直接停止。默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束。直到守护线程执行完后我们敲回车结束程序。 System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }
2.4.3、网络AIO
补充了WriteHandler
server:
import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; /** * @ClassName AioServer * @Author ChangLu * @Date 2021/12/28 19:16 * @Description 网络异步:编写处理连接、读、写handler */ @Slf4j public class AioServer { public static void main(String[] args){ try (AsynchronousServerSocketChannel aioChannel = AsynchronousServerSocketChannel.open()) { aioChannel.bind(new InetSocketAddress(8080)); //关联一个连接事件handler aioChannel.accept(null,new AcceptHandler(aioChannel)); System.in.read(); } catch (IOException e) { e.printStackTrace(); } } //关闭channel private static void closeChannel(AsynchronousSocketChannel sc) { try { System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress()); sc.close(); } catch (IOException e) { e.printStackTrace(); } } //读事件handler private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel sc; public ReadHandler(AsynchronousSocketChannel sc) { this.sc = sc; } @Override public void completed(Integer result, ByteBuffer attachment) { try { if (result == -1) { closeChannel(sc); return; } System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress()); attachment.flip(); System.out.println(Charset.defaultCharset().decode(attachment)); attachment.clear(); // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件 sc.read(attachment, attachment, this); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { closeChannel(sc); exc.printStackTrace(); } } //连接事件handler private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> { private final AsynchronousServerSocketChannel ssc; public AcceptHandler(AsynchronousServerSocketChannel ssc) { this.ssc = ssc; } @Override public void completed(AsynchronousSocketChannel sc, Object attachment) { try { System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress()); } catch (IOException e) { e.printStackTrace(); } ByteBuffer buffer = ByteBuffer.allocate(16); // 读事件由 ReadHandler 处理 sc.read(buffer, buffer, new ReadHandler(sc)); // 写事件由 WriteHandler 处理(直接交由sc直接写出去) sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc)); // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件 ssc.accept(null, this); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } } //写事件handler private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer>{ private final AsynchronousSocketChannel ssc; public WriteHandler(AsynchronousSocketChannel ssc) { this.ssc = ssc; } @Override public void completed(Integer result, ByteBuffer attachment) { log.info("WriteHandle进行写事件..."); //若是一次没有写完再次执行调用! if (attachment.hasRemaining()) { ssc.write(attachment,ByteBuffer.allocate(16),this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { } } }
client:
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import static com.changlu.ByteBuffer.utils.ByteBufferUtil.debugAll; /** * @ClassName NioClient * @Author ChangLu * @Date 2021/12/28 19:29 * @Description 客户端 */ public class NioClient { public static void main(String[] args) throws Exception{ final SocketChannel sc = SocketChannel.open(); final boolean result = sc.connect(new InetSocketAddress("localhost", 8080)); if (result) { System.out.println("客户端连接成功"); }else{ sc.finishConnect(); } //接收数据 final ByteBuffer buffer = ByteBuffer.allocate(16); sc.read(buffer); debugAll(buffer); //示例(在Evaluate中执行):sc.write(StandardCharsets.UTF_8.encode("hello!")) System.in.read(); } }
注意一下处理事件的线程:对应执行回调函数的线程并不是自己事先开辟的,你可以看到每次通知的线程都可能不相同!下面是两个连接处理操作: