1 stream vs channel
- stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
- stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
- 二者均为全双工,即读写可以同时进行
2 IO 模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)
当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:
- 等待数据阶段
- 复制数据阶段
- 阻塞 IO
- 非阻塞 IO
- 多路复用
- 信号驱动
- 异步 IO
- 阻塞 IO vs 多路复用
3 零拷贝
3.1 传统 IO 问题
传统的 IO 将一个文件通过 socket 写出
File f = new File("helloword/data.txt"); RandomAccessFile file = new RandomAccessFile(file, "r"); byte[] buf = new byte[(int)f.length()]; file.read(buf); Socket socket = ...; socket.getOutputStream().write(buf);
内部工作流程是这样的:
- java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
- 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
- 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝
- 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次
3.2 NIO 优化
通过 DirectByteBuf
- ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
- 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
- DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到
- 只发生了一次用户态与内核态的切换
- 数据拷贝了 3 次
进一步优化(linux 2.4)
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享
- 零拷贝适合小文件传输
4 AIO
AIO 用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows 系统通过 IOCP 实现了真正的异步 IO
- Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势
4.1 文件 AIO
先来看看 AsynchronousFileChannel
@Slf4j public class AioDemo1 { public static void main(String[] args) throws IOException { try{ AsynchronousFileChannel s = AsynchronousFileChannel.open( Paths.get("1.txt"), StandardOpenOption.READ); ByteBuffer buffer = ByteBuffer.allocate(2); log.debug("begin..."); s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { log.debug("read completed...{}", result); buffer.flip(); debug(buffer); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.debug("read failed..."); } }); } catch (IOException e) { e.printStackTrace(); } log.debug("do other things..."); System.in.read(); } }
输出
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin... 13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things... 13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 0d |a. | +--------+-------------------------------------------------+----------------+
可以看到
- 响应文件读取成功的是另一个线程 Thread-5
- 主线程并没有 IO 操作阻塞
4.2 守护线程
默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read()
以避免守护线程意外结束
4.3 网络 AIO
public class AioServer { public static void main(String[] args) throws IOException { AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open(); ssc.bind(new InetSocketAddress(8080)); ssc.accept(null, new AcceptHandler(ssc)); System.in.read(); } private static void closeChannel(AsynchronousSocketChannel sc) { try { System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress()); sc.close(); } catch (IOException e) { e.printStackTrace(); } } private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel sc; public ReadHandler(AsynchronousSocketChannel sc) { this.sc = sc; } @Override public void completed(Integer result, ByteBuffer attachment) { try { if (result == -1) { closeChannel(sc); return; } System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress()); attachment.flip(); System.out.println(Charset.defaultCharset().decode(attachment)); attachment.clear(); // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件 sc.read(attachment, attachment, this); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { closeChannel(sc); exc.printStackTrace(); } } private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel sc; private WriteHandler(AsynchronousSocketChannel sc) { this.sc = sc; } @Override public void completed(Integer result, ByteBuffer attachment) { // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容 if (attachment.hasRemaining()) { sc.write(attachment); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); closeChannel(sc); } } private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> { private final AsynchronousServerSocketChannel ssc; public AcceptHandler(AsynchronousServerSocketChannel ssc) { this.ssc = ssc; } @Override public void completed(AsynchronousSocketChannel sc, Object attachment) { try { System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress()); } catch (IOException e) { e.printStackTrace(); } ByteBuffer buffer = ByteBuffer.allocate(16); // 读事件由 ReadHandler 处理 sc.read(buffer, buffer, new ReadHandler(sc)); // 写事件由 WriteHandler 处理 sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc)); // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件 ssc.accept(null, this); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } } }