本博客是《Netty权威指南》的读书笔记,如有错误环境指正、探讨,谢谢!此书源码见附件。
此博客涉及的代码地址:https://gitee.com/wuzhengfei/great-truth;参考com.wzf.greattruth.nio包中的代码。
关于IO模型,请参考《IO模型》
一、 NIO简介
NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。
传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。
NIO的原理见《IO模型》中IO多路复用部分,地址如下:
http://blog.csdn.net/wuzhengfei1112/article/details/78242004
二、 Java IO VS NIO
1. 流 VS 缓冲区
IO是面向流的,NIO是面向缓冲区的。
Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。
NIO的数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。
2. 阻塞 VS 非阻塞
Java IO的各种流是阻塞的。当一个线程调用read()或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
NIO的非阻塞模式。例如:一个线程从某channel读取数据时,如果有数据已经存在缓冲去了,那么直接读取,如果没有就不获取,线程不会被阻塞,还可以去做其他的事情。写操作也是如此。线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
四、 核心组件
1. 通道Channel
IO中的 Stream是单向的,如InputStream, OutputStream。NIO中的Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel分两大类:用于网络读写的SelectableChannel和用于文件操作的FileChannel,其的主要实现有:
FileChannel:从文件中读写数据。
DatagramChannel:能通过UDP读写网络中的数据。
SocketChannel:能通过TCP读写网络中的数据。
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
2. 缓冲区Buffer
缓冲区实质上是一个数组,NIO中的缓冲区提供了对数组接过话访问以及维护了其读写信息。在NIO库中,所有数据都是用缓冲区处理的,在读取数据时,它是直接读到缓冲区中的;在写入数据时,它也是写入到缓冲区中的。
1) NIO中的关键Buffer实现
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
分别对应基本数据类型: byte, char, double, float, int,long, short。
另外还有:MappedByteBuffer, HeapByteBuffer,DirectByteBuffer等。
2) 常用方法:
allocate():分配一块缓冲区
put():向缓冲区写数据
get():向缓冲区读数据
filp():将缓冲区从写模式切换到读模式
clear():从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即使有部分数据没有读,也会被遗忘;
compact():从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
mark():对position做出标记,配合reset使用
reset():将position置为标记值
3) 缓冲区的属性
capacity:缓冲区大小,无论是读模式还是写模式,此属性值不会变;
position:写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1切换到读模式时,position会被置为0,表示当前读的位置
limit:写模式下,limit 相当于capacity 表示最多可以写多少数据,切换到读模式时,limit 等于原先的position,表示最多可以读多少数据。
3. 多路复用器Selector
多路复用器提供选择已经就绪任务的能力。简单来说:Selector会不断轮询注册在其上的Channel,如果某个Channel上发生读或写事件,这个Channel就处于就绪状态,就会被Selector轮询出来,然后通过SelectionKey就可以获取就绪的Channel集合,接着就可以进行或许的读写操作。
一个多路复用器可以同时轮询多个Channel,由于JDK使用了epool()代替传统的Select实现,所以他没有最大连接句柄1024/2048的限制,这意味着只需要一个线程负责Selector伦旭,就可以接入成千上万的客户端。
1) Selector支持的事件
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
如果你对不止一种事件感兴趣,那么可以用“位或”操作符将常量连接起来,如:int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
可使用以下方法获取已就绪事件,返回值为boolean:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
可以将一个对象或者更多信息附着到SelectionKey上,即记录在附加对象上,方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
可以通过选择器的select方法获取是否有就绪的通道;
int select()
int select(long timeout)
int selectNow()
可以通过selectedKeySet获取已就绪的通道。返回值是SelectionKey的集合,处理完相应的通道之后,需要removed因为Selector不会自己removed。select阻塞后,可以用wakeup唤醒;执行wakeup时,如果没有阻塞的select那么执行完wakeup后下一个执行select就会立即返回。调用close() 方法关闭selector。
五、 NIO(IO多路复用)
Java 1.4中引入NIO的概念,本节内容主要讲述基于此版本(即IO多路复用模型)NIO实现,其使用的IO模型,请参考《IO模型》
1. 优点
客户端发起的连接操作是一步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
SocketChannel的读写操作都是异步的,如果没有可读写的数据,他不会等待直接返回,这样IO同学线程就可以处理其他的链路,不需要等待这个链路可用。
由于JDK的Selector在Linux等主流操作系统上通过epool实现,他没有连接句柄的限制(指受限于操作系统的最大句柄数或者对单个现成的句柄限制),这意味着一个Selector可以同时处理成千上万个客户端连接,而且性能不会随客户端的增加而线性下降。它适合做高性能、高负载的网络服务器。
2. NIO服务端序列图
3. NIO服务端序列分析
1) 打开ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
2) 绑定监听地址InetSocketAddress
serverSocketChannel.socket().bind(newInetSocketAddress(port), 1024);
serverSocketChannel.configureBlocking(false);
3) 创建Selector,启动线程
selector = Selector.open();
//新建线程启动Server
new Thread(new NIOServer(), "NIO-Server").start();
4) 将ServerSocketChannel注册到Selector、监听
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
5) Selector轮询就绪的Key
while (true) {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
//处理IO时间
handleInput(key);
}
}
6) handlerAcceptor()处理新的客户端接入
// Accept the new connection
ServerSocketChannelssc = (ServerSocketChannel) key.channel();
SocketChannelsc = ssc.accept();
7) 设置新客户端连接的Socket参数
sc.configureBlocking(false);
8) 向Selector注册监听读操作SelectionKey.OP_Read
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
9) handlerRead()异步读取请求信息到ByteBuffer
SocketChannelsc = (SocketChannel) key.channel();
ByteBufferreadBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
10) decode请求消息
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body= new String(bytes,"UTF-8");
}
11) 异步写ByteBuffer到SocketChannel
byte[] bytes = response.getBytes();
ByteBufferwriteBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
4. NIO客户端序列图
5. NIO客户端序列分析
1) 打开SocketChannel
socketChannel = SocketChannel.open();
2) 设置SocketChannel为非阻塞模式,同时设置TCP参数
socketChannel.configureBlocking(false);
3) 异步连接服务器
socketChannel.connect(newInetSocketAddress(host, port))
4) 判断连接结果,如果连接成功,跳到10,否则到5
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if ( connected ) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else {
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
5) 向Reactor线程的多路复用器注册OP_CONNECT事件
socketChannel.register(selector,SelectionKey.OP_CONNECT);
6) 创建Selector,启动线程
selector = Selector.open();
TimeClientHandle client = new TimeClientHandle("127.0.0.1", port);
new Thread(client, "TimeClient-001").start();
7) Selector轮询就绪的key
while (!stop) {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key= null;
while (it.hasNext()) {
key =it.next();
it.remove();
handleInput(key);
}
}
8) 如果是CONNECT事件,则handlerConnect()
SocketChannelsc = (SocketChannel) key.channel();
if (key.isConnectable()) {
// connect
}
9) 判断连接是否完成,完成则执行10
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}
10) 向多路复用器注册读事件 OPEN_READ
sc.register(selector, SelectionKey.OP_READ);
11) handRead()异步渡请求消息到ByteBuffer
ByteBufferreadBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
12) 读取并decode请求消息
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body= new String(bytes,"UTF-8");
}
13) 异步写ByteBuffer到SocketChannel
byte[] req = "HELLOWORLD ".getBytes();
ByteBufferwriteBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()){
System.out.println("Send2 server succeed.");
}