一、基本概念描述
什么是NIO
NIO即New IO,这个库是在JDK1.4中才引入的。NIO和IO有相同的作用和目的,但实现方式不同,NIO主要用到的是块,所以NIO的效率要比IO高很多。
流与块的比较
NIO和IO最大的区别是数据打包和传输方式。IO是以流的方式处理数据,而NIO是以块的方式处理数据。
面向流的IO一次一个字节的处理数据,一个输入流产生一个字节,一个输出流就消费一个字节。为流式数据创建过滤器就变得非常容易,链接几个过滤器,以便对数据进行处理非常方便而简单,但是面向流的IO通常处理的很慢。
面向块的IO系统以块的形式处理数据。每一个操作都在一步中产生或消费一个数据块。按块要比按流快的多,但面向块的IO缺少了面向流IO所具有的有雅兴和简单性
NIO与传统IO的对比
NIO
IO
面向缓冲区Buffer
面向流Stream
双向(基于通道Channel)
单向(分别建立输入流、输出流)
同步非阻塞(non-blocking)
同步阻塞
选择器(Selector,多路复用)
无
支持字符集编码解码解决方案,支持锁,支持内存映射文件的文件访问接口
无
NIO基础
缓冲区(Buffer)、通道(Channel)和选择器(Selector)、字符集(Charset)
缓冲区Buffer
Buffer是一个对象,它包含一些要写入或读出的数据。在NIO中,数据是放入buffer对象的,而在IO中,数据是直接写入或者读到Stream对象的。应用程序不能直接对 Channel 进行读写操作,而必须通过 Buffer 来进行,即 Channel 是通过 Buffer 来读写数据的
Buffer 读写数据步骤:
- 写入数据到 Buffer;
- 调用 flip() 方法;
- 从 Buffer 中读取数据;
- 调用 clear() 方法或者 compact() 方法。
通道Channel
Channel表示到IO设备(如:文件、套接字)的连接,即用于源节点与目标节点的连接,在java NIO中Channel本身不负责存储数据,主要是配合缓冲区,负责数据的传输。
通道的主要实现类
FileChannel类:本地文件IO通道,用于读取、写入、映射和操作文件的通道。 SocketChannel类:网络套接字IO通道,TCP协议,针对面向流的连接套接字的可选择通道(一般用在客户端)。 ServerSocketChannel类:网络通信IO操作,TCP协议,针对面向流的监听套接字的可选择通道(一般用于服务端)。 DatagramChannel类:针对面向数据报套接字的可选择通道,能够发送和接受UDP数据包的Channel。UDP协议,由于UDP是一种无连接的网络协议,只能发送和接受数据包。 以上几个类都实现了java.nio.channels.Channel接口。
选择器Selector
Selector是selectableChannel的多路复用器,用于监控SelectableChannel的IO状况。利用selector可以实现在一个线程中管理多个通道Channel,selector是非阻塞IO的核心。
Selector常用方法
方法
描述
Set< SelectionKey > keys()
所有的SelectionKey集合,代表注册在该Selector上的Channel
selectedKeys()
被选择的SelectionKey集合。返回此Selector的已选择键集
int select()
监控所有注册的Channel,当它们中间有需要处理的IO操作时,该方法返回,并将对应的SelectionKey加入被选择的SelectionKey集合中,该方法返回这些Channel的数量。
int select(long timeout)
可以设置超时时长的select()操作
int selectNow()
执行一个立即返回的select()操作,该方法不会阻塞线程
Selector wakeUp()
使一个还未返回的select()方法立即返回
字符集Charset
CharSet是对java nio编码解码的解决方案,专门负责字符的编码和解码。
编码:字符数组、字符串 ===> 字节数组。 解码:字节数组 ==> 字符数组、字符串
NIO中的读和写示例
public static void copyFileUseNIO(String src,String dst) throws IOException{ FileInputStream fi=new FileInputStream(new File(src)); FileOutputStream fo=new FileOutputStream(new File(dst)); //获得传输通道channel FileChannel inChannel=fi.getChannel(); FileChannel outChannel=fo.getChannel(); //获得容器buffer ByteBuffer buffer=ByteBuffer.allocate(1024); while(true){ //判断是否读完文件 int eof =inChannel.read(buffer); if(eof==-1){ break; } buffer.flip(); //开始写 outChannel.write(buffer); //写完要重置buffer buffer.clear(); } inChannel.close(); outChannel.close(); fi.close(); fo.close(); }
NIO中的聊天室示例
服务端
private ServerSocketChannel listenChannel; //监听通道 private Selector selector; //选择器对象 private static final int PORT = 9999; public static void main(String[] args) { ServerDemo demo = new ServerDemo(); demo.listen(); } public ServerDemo() { try { // 1得到监听通道 listenChannel = ServerSocketChannel.open(); // 2得到选择器 selector = Selector.open(); // 3绑定端口 listenChannel.bind(new InetSocketAddress(PORT)); // 4设置为非阻塞模式 listenChannel.configureBlocking(false); // 5将选择器绑定到监听通道并监听accept事件 listenChannel.register(selector, SelectionKey.OP_ACCEPT); printInfo("server ok ..."); } catch (Exception e) { e.printStackTrace(); } } // 6干活 public void listen() { try { while (true) { if (selector.select(3000) == 0) { continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { // 连接请求事件 SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); System.out.println(sc.getRemoteAddress().toString().substring(1) + "上线了..."); } if (key.isReadable()) { // 读取数据事件 readMsg(key); } // 把keys删掉,防止重复处理 iterator.remove(); } } } catch (Exception e) { e.printStackTrace(); } } // 读取到客户端发送过来的消息并广播出去 public void readMsg(SelectionKey key) throws Exception { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); if (sc.read(buffer) > 0) { // 打印接收到的消息 String msg = new String(buffer.array()); printInfo(msg.trim()); // 发广播 broadCast(sc, msg); } } public void broadCast(SocketChannel sc, String msg) throws Exception { printInfo("服务器发送了广播..."); for (SelectionKey key : selector.keys()) { SelectableChannel channel; if ((channel = key.channel()) instanceof SocketChannel && key.channel() != sc) { ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); ((SocketChannel) channel).write(buffer); } } } private void printInfo(String str) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("["+sdf.format(new Date()) + "]->" + str); }
客户端
private final String HOST = "127.0.0.1"; private int PORT = 9999; private SocketChannel socketChannel; private String userName; public ClientDemo() { try { // 1得到一个网络通道 socketChannel = SocketChannel.open(); // 2设置非阻塞方式 socketChannel.configureBlocking(false); // 3服务端IP和端口 InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT); // 4连接服务器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("客户端一直在连接....."); } } // 5得到客户端IP和端口,作为聊天用户名 userName = socketChannel.getLocalAddress().toString().substring(1); System.out.println("-------------client: " + userName + " is ready-------------"); } catch (Exception e) { e.printStackTrace(); } } // 向服务器发送数据 public void sendMsg(String msg) throws Exception { if (msg.equalsIgnoreCase("bye")) { socketChannel.close(); return; } msg = userName + "say: " + msg; ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buffer); } // 从服务端接收数据 public void receiveMsg() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(1024); if (socketChannel.read(buffer) > 0) { String msg = new String(buffer.array()); System.out.println(msg.trim()); } } public static void main(String[] args) throws Exception { ClientDemo chatClient = new ClientDemo(); new Thread(new Runnable() { @Override public void run() { while (true) { try { chatClient.receiveMsg(); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } }).start(); Scanner scanner = new Scanner(System.in); while(scanner.hasNextLine()) { chatClient.sendMsg(scanner.nextLine()); } }