private static void channelCopy(ReadableByteChannel src,
WritableByteChannel dest)
throws IOException {
ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);while (src.read(buffer) != -1) {
// Prepare the buffer to be drainedbuffer.flip();// Write to the channel; may block
dest.write(buffer);// If partial transfer, shift remainder down// If buffer is empty, same as doing clear( )buffer.compact();
}
// EOF will leave buffer in fill statebuffer.flip();// Make sure that the buffer is fully drainedwhile (buffer.hasRemaining()) {
dest.write(buffer);
}
}
private void run(int port) throws IOException {
// Allocate buffer
ByteBuffer echoBuffer = ByteBuffer.allocate(1024);// Create a new selector
Selector selector = Selector.open();// Open a listener on the port, and register with the selector
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);System.out.println("Going to listen on " + port);for (;;){int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();while (it.hasNext()) {
SelectionKey selectionKey = (SelectionKey) it.next();if ((selectionKey.readyOps() & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT) {
// Accept the new connection
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel sc = serverSocketChannel.accept();
sc.configureBlocking(false);// Add the new connection to the selector
SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
it.remove();System.out.println("Got connection from " + sc);
} elseif ((selectionKey.readyOps() & SelectionKey.OP_READ)
== SelectionKey.OP_READ) {
// Read the data
SocketChannel sc = (SocketChannel) selectionKey.channel();// Echo dataint bytesEchoed = 0;while (true) {
echoBuffer.clear();int r = sc.read(echoBuffer);if (r <= 0) {
break;
}
echoBuffer.flip();
sc.write(echoBuffer);
bytesEchoed += r;
}
System.out.println("Echoed " + bytesEchoed + " from " + sc);
it.remove();
}
}
}
}
I/O多路复用模式
I/O多路复用有两种经典模式:基于同步I/O的reactor和基于异步I/O的proactor。
• Reactor
o 某个事件处理者宣称它对某个socket上的读事件很感兴趣;
o 事件分离者等着这个事件的发生;
o 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理者;
o 事件处理者收到消息,于是去那个socket上读数据了. 如果需要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;
• Proactor
o 事件处理者直接投递发一个写操作(当然,操作系统必须支持这个异步操作). 这个时候,事件处理者根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个写操作的完成事件。这个处理者很拽,发个命令就不管具体的事情了,只等着别人(系统)帮他搞定的时候给他回个话。
o 事件分离者等着这个读事件的完成(比较下与Reactor的不同);
o 当事件分离者默默等待完成事情到来的同时,操作系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离者,这个事情我搞完了;
o 事件分享者通知之前的事件处理者: 你吩咐的事情搞定了;
o 事件处理者这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。如果有需要,事件处理者还像之前一样发起另外一个写操作,和上面的几个步骤一样。
异步的proactor固然不错,但它局限于操作系统(要支持异步操作),为了开发真正独立平台的通用接口,我们可以通过reactor模拟来实现proactor。
• Proactor(模拟)
o 等待事件 (Proactor 的工作)
o 读数据(看,这里变成成了让 Proactor 做这个事情)
o 把数据已经准备好的消息给用户处理函数,即事件处理者(Proactor 要做的)
o 处理数据 (用户代码要做的)
总结
本文介绍了 I/O的一些基础概念及5种I/O模型,NIO是5种模型中的I/O复用模型;接着进入主题Java NIO,分别讲了NIO中三个最重要的概念:缓冲区、通道、选择器;我们也明白了NIO是如何实现I/O复用模型的。最后讨论了I/O多路复用模式中的两 种模式:reactor和proactor,以及如何用reactor模拟proactor。
参考资料
O'Reilly Java NIO
Richard Stevens《UNIX网络编程 卷1:套接字联网API》
两种高性能I/O设计模式(Reactor/Proactor)的比较
Understanding Network I/O
Understanding Disk I/O - when should you be worried?