2.3.4 buffer其他常用方法
rewind()方法
Buffer.rewind()将position设回0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)。
clear()与compact()方法
一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成。
如果调用的是clear()方法,position将被设回0,limit被设置成 capacity的值。换句话说,Buffer 被清空了。Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据。
如果Buffer中有一些未读的数据,调用clear()方法,数据将“被遗忘”,意味着不再有任何标记会告诉你哪些数据被读过,哪些还没有。
如果Buffer中仍有未读的数据,且后续还需要这些数据,但是此时想要先先写些数据,那么使用compact()方法。
compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。
mark()与reset()方法
通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。例如:
buffer.mark();//call buffer.get() a couple of times, e.g. during parsing.
buffer.reset(); //set position back to mark.
equals()与compareTo()方法
可以使用equals()和compareTo()方法两个Buffer。
equals()
当满足下列条件时,表示两个Buffer相等:
- 有相同的类型(byte、char、int等)。
- Buffer中剩余的byte、char等的个数相等。
- Buffer中所有剩余的byte、char等都相同。
如你所见,equals只是比较Buffer的一部分,不是每一个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。
compareTo()方法
compareTo()方法比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:
- 第一个不相等的元素小于另一个Buffer中对应的元素 。
- 所有元素都相等,但第一个Buffer比另一个先耗尽(第一个Buffer的元素个数比另一个少)。
Buffer方法总结
limit(), limit(10)等 |
其中读取和设置这4个属性的方法的命名和jQuery中的val(),val(10)类似,一个负责get,一个负责set |
reset() |
把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方 |
clear() |
position = 0;limit = capacity;mark = -1; 有点初始化的味道,但是并不影响底层byte数组的内容 |
flip() |
limit = position;position = 0;mark = -1; 翻转,也就是让flip之后的position到limit这块区域变成之前的0到position这块,翻转就是将一个处于存数据状态的缓冲区变为一个处于准备取数据的状态 |
rewind() |
把position设为0,mark设为-1,不改变limit的值 |
remaining() |
return limit - position;返回limit和position之间相对位置差 |
hasRemaining() |
return position < limit返回是否还有未读内容 |
compact() |
把从position到limit中的内容移到0到limit-position的区域内,position和limit的取值也分别变成limit-position、capacity。如果先将positon设置到limit,再compact,那么相当于clear() |
get() |
相对读,从position位置读取一个byte,并将position+1,为下次读写作准备 |
get(int index) |
绝对读,读取byteBuffer底层的bytes中下标为index的byte,不改变position |
get(byte[] dst, int offset, int length) |
从position位置开始相对读,读length个byte,并写入dst下标从offset到offset+length的区域 |
put(byte b) |
相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备 |
put(int index, byte b) |
绝对写,向byteBuffer底层的bytes中下标为index的位置插入byte b,不改变position |
put(ByteBuffer src) |
用相对写,把src中可读的部分(也就是position到limit)写入此byteBuffer |
put(byte[] src, int offset, int length) |
从src数组中的offset到offset+length区域读取数据并使用相对写写入此byteBuffer |
buffer方法演示
/** * @author DarkKing * 类说明:Buffer方法演示 */ public class BufferMethod { public static void main(String[] args) { System.out.println("------Test get-------------"); ByteBuffer buffer = ByteBuffer.allocate(32); buffer.put((byte) 'a')//0 .put((byte) 'b')//1 .put((byte) 'c')//2 .put((byte) 'd')//3 .put((byte) 'e')//4 .put((byte) 'f');//5 System.out.println("before flip()" + buffer); /* 转换为读取模式*/ buffer.flip(); System.out.println("before get():" + buffer); System.out.println((char) buffer.get()); System.out.println("after get():" + buffer); /* get(index)不影响position的值*/ System.out.println((char) buffer.get(2)); System.out.println("after get(index):" + buffer); byte[] dst = new byte[10]; /* position移动两位*/ buffer.get(dst, 0, 2); /*这里的buffer是 abcdef[pos=3 lim=6 cap=32]*/ System.out.println("after get(dst, 0, 2):" + buffer); System.out.println("dst:" + new String(dst)); System.out.println("--------Test put-------"); ByteBuffer bb = ByteBuffer.allocate(32); System.out.println("before put(byte):" + bb); System.out.println("after put(byte):" + bb.put((byte) 'z')); // put(2,(byte) 'c')不改变position的位置 bb.put(2, (byte) 'c'); System.out.println("after put(2,(byte) 'c'):" + bb); System.out.println(new String(bb.array())); // 这里的buffer是 abcdef[pos=3 lim=6 cap=32] bb.put(buffer); System.out.println("after put(buffer):" + bb); System.out.println(new String(bb.array())); System.out.println("--------Test reset----------"); buffer = ByteBuffer.allocate(20); System.out.println("buffer = " + buffer); buffer.clear(); buffer.position(5);//移动position到5 buffer.mark();//记录当前position的位置 buffer.position(10);//移动position到10 System.out.println("before reset:" + buffer); buffer.reset();//复位position到记录的地址 System.out.println("after reset:" + buffer); System.out.println("--------Test rewind--------"); buffer.clear(); buffer.position(10);//移动position到10 buffer.limit(15);//限定最大可写入的位置为15 System.out.println("before rewind:" + buffer); buffer.rewind();//将position设回0 System.out.println("before rewind:" + buffer); System.out.println("--------Test compact--------"); buffer.clear(); //放入4个字节,position移动到下个可写入的位置,也就是4 buffer.put("abcd".getBytes()); System.out.println("before compact:" + buffer); System.out.println(new String(buffer.array())); buffer.flip();//将position设回0,并将limit设置成之前position的值 System.out.println("after flip:" + buffer); //从Buffer中读取数据的例子,每读一次,position移动一次 System.out.println((char) buffer.get()); System.out.println((char) buffer.get()); System.out.println((char) buffer.get()); System.out.println("after three gets:" + buffer); System.out.println(new String(buffer.array())); //compact()方法将所有未读的数据拷贝到Buffer起始处。 // 然后将position设到最后一个未读元素正后面。 buffer.compact(); System.out.println("after compact:" + buffer); System.out.println(new String(buffer.array())); } }
三、NIO之Reactor模式
“反应”器名字中”反应“的由来:
“反应”即“倒置”,“控制逆转”,具体事件处理程序不调用反应器,而向反应器注册一个事件处理器,表示自己对某些事件感兴趣,有时间来了,具体事件处理程序通过事件处理器对某个指定的事件发生做出反应;这种控制逆转又称为“好莱坞法则”(不要调用我,让我来调用你)
NIO为实现Reactor模式提供了基础,上面的NIO图示其实就是Reactor模式的雏形,只是Reactor以OO的方式抽象出了几个概念,使得职责划分更加明确。
- Reactor:Reactor是IO事件的派发者,对应NIO的Selector;
- Acceptor:Acceptor接受client连接,建立对应client的Handler,并向Reactor注册此Handler,对应NIO中注册Channel和事件触发时的判断分支(上述NIO服务端示例代码的38-46行);
- Handler:IO处理类,对应NIO中Channel[使用socket]操作Buffer的过程。
3.1 单线程Reactor模式流程
- 服务器端的Reactor是一个线程对象,该线程会启动事件循环,并使用Selector(选择器)来实现IO的多路复用。注册一个Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。
- 客户端向服务器端发起一个连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ事件了。
- 当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过SocketChannel的read()方法读取数据,此时read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。
- 每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。
注意,Reactor的单线程模式的单线程主要是针对于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。
但在目前的单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。所以我们应该将非I/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应。
3.2 单线程Reactor,工作者线程池
与单线程Reactor模式不同的是,添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理。
使用线程池的优势:
- 通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程产生的巨大开销。
- 另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
- 通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态。同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
改进的版本中,所以的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作。
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:
- 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送;
- 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
3.3 多Reactor线程模式
Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的事件循环逻辑。
mainReactor可以只有一个,但subReactor一般会有多个。mainReactor线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信。
流程:
- 注册一个Acceptor事件处理器到mainReactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样mainReactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。启动mainReactor的事件循环。
- 客户端向服务器端发起一个连接请求,mainReactor监听到了该ACCEPT事件并将该ACCEPT事件派发给Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个SocketChannel传递给subReactor线程池。
- subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事件以及对应的READ事件处理器注册到subReactor线程中。当然你也注册WRITE事件以及WRITE事件处理器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的循环逻辑。
- 当有I/O事件就绪时,相关的subReactor就将事件派发给响应的处理器处理。注意,这里subReactor线程只负责完成I/O的read()操作,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后需要返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。
- 注意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)依旧还是在Reactor线程(mainReactor线程 或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处理非I/O操作的逻辑。
多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。
3.4 和观察者模式的区别
观察者模式:
也可以称为为 发布-订阅 模式,主要适用于多个对象依赖某一个对象的状态并,当某对象状态发生改变时,要通知其他依赖对象做出更新。是一种一对多的关系。当然,如果依赖的对象只有一个时,也是一种特殊的一对一关系。通常,观察者模式适用于消息事件处理,监听者监听到事件时通知事件处理者对事件进行处理(这一点上面有点像是回调,容易与反应器模式和前摄器模式的回调搞混淆)。
Reactor模式:
reactor模式,即反应器模式,是一种高效的异步IO模式,特征是回调,当IO完成时,回调对应的函数进行处理。这种模式并非是真正的异步,而是运用了异步的思想,当IO事件触发时,通知应用程序作出IO处理。模式本身并不调用系统的异步IO函数。
reactor模式与观察者模式有点像。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。当一个主体发生改变时,所有依属体都得到通知。
四、NIO使用举例
4.1、NioServerHandle
Nio通信服务端处理器
/** * @author DarkKing * 类说明:nio通信服务端处理器 */ public class NioServerHandle implements Runnable { private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 构造方法 * * @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try { //创建选择器 selector = Selector.open(); //打开监听通道 serverChannel = ServerSocketChannel.open(); //如果为 true,则此通道将被置于阻塞模式; // 如果为 false,则此通道将被置于非阻塞模式 serverChannel.configureBlocking(false);//开启非阻塞模式 serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); //标记服务器已开启 started = true; System.out.println("服务器已启动,端口号:" + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { started = false; } @Override public void run() { //循环遍历selector while (started) { try { //阻塞,只有当至少一个注册的事件发生的时候才会继续. selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { t.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if (selector != null) try { selector.close(); } catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { //处理新接入的请求消息 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); System.out.println("=======建立连接==="); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } //读消息 if (key.isReadable()) { System.out.println("======socket channel 数据准备完成," + "可以去读==读取======="); SocketChannel sc = (SocketChannel) key.channel(); //创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节进行编解码 if (readBytes > 0) { //将缓冲区当前的limit设置为position,position=0, // 用于后续对缓冲区的读取操作 buffer.flip(); //根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String message = new String(bytes, "UTF-8"); System.out.println("服务器收到消息:" + message); //处理数据 String result = Const.response(message); //发送应答消息 doWrite(sc, result); } //链路已经关闭,释放资源 else if (readBytes < 0) { key.cancel(); sc.close(); } } } } //发送应答消息 private void doWrite(SocketChannel channel, String response) throws IOException { //将消息编码为字节数组 byte[] bytes = response.getBytes(); //根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } }
4.2、NioServer
nio通信服务端
/** * @author DarkKing * 类说明:nio通信服务端 */ public class NioServer { private static NioServerHandle nioServerHandle; public static void start() { if (nioServerHandle != null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(Const.DEFAULT_PORT); new Thread(nioServerHandle, "Server").start(); } public static void main(String[] args) { start(); } }
4.3NioClientHandlenio
通信客户端处理器
/** * @author DarkKing * 类说明:nio通信客户端处理器 */ public class NioClientHandle implements Runnable { private String host; private int port; private volatile boolean started; private Selector selector; private SocketChannel socketChannel; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { /*创建选择器*/ this.selector = Selector.open(); /*打开监听通道*/ socketChannel = SocketChannel.open(); /*如果为 true,则此通道将被置于阻塞模式; * 如果为 false,则此通道将被置于非阻塞模式 * 缺省为true*/ socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); System.exit(-1); } } public void stop() { started = false; } @Override public void run() { //连接服务器 try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(-1); } /*循环遍历selector*/ while (started) { try { /*阻塞方法,当至少一个注册的事件发生的时候就会继续*/ selector.select(); /*获取当前有哪些事件可以使用*/ Set<SelectionKey> keys = selector.selectedKeys(); /*转换为迭代器*/ Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。 如果我们没有删除处理过的键,那么它仍然会在事件集合中以一个激活 的键出现,这会导致我们尝试再次处理它。*/ it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); System.exit(-1); } } if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } /*具体的事件处理方法*/ private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { /*获得关心当前事件的channel*/ SocketChannel sc = (SocketChannel) key.channel(); /*处理连接就绪事件 * 但是三次握手未必就成功了,所以需要等待握手完成和判断握手是否成功*/ if (key.isConnectable()) { /*finishConnect的主要作用就是确认通道连接已建立, 方便后续IO操作(读写)不会因连接没建立而 导致NotYetConnectedException异常。*/ if (sc.finishConnect()) { /*连接既然已经建立,当然就需要注册读事件, 写事件一般是不需要注册的。*/ socketChannel.register(selector, SelectionKey.OP_READ); } else System.exit(-1); } /*处理读事件,也就是当前有数据可读*/ if (key.isReadable()) { /*创建ByteBuffer,并开辟一个1k的缓冲区*/ ByteBuffer buffer = ByteBuffer.allocate(1024); /*将通道的数据读取到缓冲区,read方法返回读取到的字节数*/ int readBytes = sc.read(buffer); if (readBytes > 0) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String result = new String(bytes, "UTF-8"); System.out.println("客户端收到消息:" + result); } /*链路已经关闭,释放资源*/ else if (readBytes < 0) { key.cancel(); sc.close(); } } } } /*进行连接*/ private void doConnect() throws IOException { /*如果此通道处于非阻塞模式,则调用此方法将启动非阻塞连接操作。 如果连接马上建立成功,则此方法返回true。 否则,此方法返回false, 因此我们必须关注连接就绪事件, 并通过调用finishConnect方法完成连接操作。*/ if (socketChannel.connect(new InetSocketAddress(host, port))) { /*连接成功,关注读事件*/ socketChannel.register(selector, SelectionKey.OP_READ); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } /*写数据对外暴露的API*/ public void sendMsg(String msg) throws IOException { doWrite(socketChannel, msg); } private void doWrite(SocketChannel sc, String request) throws IOException { byte[] bytes = request.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); sc.write(writeBuffer); } }
4.4、NioClient
nio通信客户端
/** * @author DarkKing * 类说明:nio通信客户端 */ public class NioClient { private static NioClientHandle nioClientHandle; public static void start() { if (nioClientHandle != null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(Const.DEFAULT_SERVER_IP, Const.DEFAULT_PORT); new Thread(nioClientHandle, "Server").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception { nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); Scanner scanner = new Scanner(System.in); while (NioClient.sendMsg(scanner.next())) ; } }
4.5 效果演示
执行Nioserver服务
执行Nioclient,启动客户端,服务端打印
客户端输入
服务端接收客户端数据,并返回消息
致此,JAVA IO相关的已经介绍的差不多了。当然还有AIO,但是因为linux系统还不支持异步IO,顾暂时不做多讲,下一节开始讲NIO的应用。