本文主要讲解NIO的简介、NIO和传统阻塞I/O有什么区别、NIO模型和传统I/O模型之间的对比、以及围绕NIO的三大组件来讲解,理论代码相结合。
很喜欢一句话:
"沉下去,再浮上来"
。我想我们会变的不一样。
一、Java NIO 简介
在 Java 1.4 中引入了 NIO 框架(java.nio 包),提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操作系统底层的高性能数据操作方式
同步非阻塞:
Java NIO
的非阻塞模式:
- 非阻塞读:一个线程从某一个通道发送请求或者读取数据时,它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,但是并不会像原生IO一样,阻塞等待着,反而是直到数据变得可以读取之前,此线程可以去继续做其他事情。
- 非阻塞写:和非阻塞读一样,一个线程发送请求写入一些数据到某通道,在等待它完成写入这段时间中,无需阻塞等待,这个线程可以同时去做其他的事情。
- 总结起来就是
NIO
是可以做到用一个线程来处理多个操作的。 - 通俗理解:
NIO
是可以做到用一个线程来处理多个操作的。假设有10000
个请求过来,根据实际情况,可以分配50
或者100
个线程来处理。不像之前的阻塞IO
那样,非得分配10000
个。
Java NIO 由以下几个核心部分组成:
- Channels (通道)
- Buffers (缓冲区)
- Selector (选择器)
虽然Java NIO除此之外,仍有很多类和组件,但是总的来说仍然是靠Channel,Buffer 和 Selector
构成了核心的API。其余更多的是为了能让这三个核心组件更方便的使用。之后的讲解也会偏向于此。三个核心部分模型图:
简单说明:
- 一个
Thread
(线程)对应一个Selector
选择器,每个选择器对应着多个Channel
通道,每个通道又都对应着一个缓冲区(Buffer) Selector
会根据不同的事件,在各个通道上切换,但是程序切换到哪个Channel
是由事件(比如:连接请求、数据到达)决定的。- 数据的读取写入是通过
Buffer
,这个和BIO
是不同的,BIO
中要么是输入流,或者是输出流,不能双向,但是NIO
的Buffer
是可以读也可以写,需要flip
方法切换Channel
是双向的,可以返回底层操作系统的情况,比如Linux
,底层的操作系统通道就是双向的
二、传统阻塞BIO存在的问题
我们先来回忆一下传统阻塞IO的经典编程模型:
public static void main(String[] args) throws Exception { //1. 创建一个线程池 ExecutorService newCachedThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //2、创建ServerSocket ServerSocket serverSocket = new ServerSocket(8888); while (true) { //3.侦听要与此套接字建立的连接并接受它。 该方法阻塞,直到建立连接。 final Socket socket = serverSocket.accept(); //4、就创建一个线程,与之通讯(单独写一个方法) newCachedThreadPool.execute(() -> { //可以和客户端通讯 handler(socket); }); } } /** * 编写一个handler方法,和客户端通讯,读取客户端发过来的信息 * @param socket */ public static void handler(Socket socket) { try { byte[] bytes = new byte[1024]; //通过socket获取输入流 InputStream inputStream = socket.getInputStream(); //循环的读取客户端发送的数据 while (true) { int read = inputStream.read(bytes); if (read != -1) { //输出客户端发送的数据 System.out.println(new String(bytes, 0, read)); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("关闭和client的连接"); try { socket.close(); } catch (Exception e) { e.printStackTrace(); } } }
这是一个经典的一个连接一个线程的模型,使用多线程的主要原因在于socket.accept()、socket.read()、socket.write()
三个主要函数都是同步阻塞的,即没有成功连接或没有数据读、写时,系统都是阻塞,在这种情况下如果是单线程的话就会一直阻塞在哪里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。
不过,这个模型是存在问题的,线程池,虽然可以让线程的创建和回收成本变低。但是线程池本生是有局限的的,在并发数并不是特别高的时候(小于单机1000),使用线程池搭用这种模型还是可行的,并竟线程池是一个天然的漏斗,不用过多考虑负载、限流等问题,编程也简单。但是仍然是解决不了根本问题的,根本问题在于严重的依赖于线程。但是大家都知道哈,线程是个特别贵的资源。
为什么说线程贵呢?
主要表现在以下几个方面:
- 首先线程的创建和销毁的成本相当高,并不是我们简单的看到的
new Thread().start()
即可了,它之后是去调用本地函数private native void start0()
,然后靠操作系统来分配资源创建线程,用完还要销毁。(要用到底层的操作系统的,就没有便宜的操作啦。)像在Liunx这样的操作系统中,一个个线程本质上就是一个个进程,创建、销毁都是属于重量级的函数,所以说线程的创建和销毁成本真的蛮高啦。 - 其次线程本身占用较大内存的,在Java中,线程的线程栈所占用的内存在Java堆外,不受Java程序控制,只受系统资源限制(如若系统给资源不足,可能创建失败),默认一个线程的线程栈大小是1M。如果每个请求都新建线程,1024个线程就会占用1个G内存,那样恐怕整个JVM的内存都会被干掉一半,而且也容易会引起系统的崩溃。
- 另外线程的切换成本也是很高的。当一个cpu从一个线程切换到另一个线程时,cpu需要保存当前线程的本地数据,程序当前的指针等,然后加载下一个等待执行的线程的本地数据,程序指针等,然后执行系统调用。这种切换也被称之为上下文切换。但是如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,就会导致系统load偏高,CPU使用率偏高,易导致系统陷入崩溃状态。
所以说,使用BIO面临十万或百万请求时,是无能为力的。所以必然需要更加高效的I/O处理模型。
三、NIO和BIO的区别
BIO
是阻塞的,NIO
则是非阻塞的。BIO
以流的方式处理数据,而NIO
以块的方式处理数据,块I/O
的效率比流I/O
高很多。BIO
基于字节流和字符流进行操作,而NIO
基于Channel
(通道)和Buffer
(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector
(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。- BIO适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,编程比较简单JDK1.4以前唯一的选择。 NIO适用于连接数目多且连接比较短的架构,比如聊天服务器,弹幕系统,服务器间通讯等,编程比较复杂,JDK1.4开始支持。
四、I/O模型之间的对比
学习I/O,I/O模型是必须知道的一点。I/O模型共有五种,分别是:
- 同步阻塞IO(Blocking IO):即传统的IO模型。
- 同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。
注意这里所说的NIO并非Java的NIO(New IO)库
。 - 多路复用IO(IO Multiplexing):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。
- 信号驱动IO:信号驱动式I/O是指进程预先告知内核,使得当某个描述符上发生某事时,内核使用信号通知相关进程。
- 异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO。
不过在这只描述了前三种。
传统阻塞式I/O
模型图如下:Java中原生的IO便是这种。
特点:
当用户线程发出IO请求之后,内核会去查看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用户线程交出CPU。当数据就绪之后,内核会将数据拷贝到用户线程,并返回结果给用户线程,用户线程才解除block状态。
非阻塞式I/O
模型图如下:
特点:
当用户线程发起一个read操作后,并不需要等待,而是马上就得到了一个结果。如果结果是一个error时,它就知道数据还没有准备好,于是就返回到用户进程去执行其他任务,等过一段时间后在去查看数据是否准备好。一旦内核中的数据准备好了,并且又再次收到了用户线程的请求,那么它马上就将数据拷贝到了用户线程,然后返回。所以事实上,在非阻塞IO模型中,用户线程需要不断地询问内核数据是否就绪,也就说非阻塞IO不会交出CPU,而会一直占用CPU。
注意
:对于非阻塞IO也有一个非常严重的问题,在while循环中需要不断地去询问内核数据是否就绪,这样会导致CPU占用率非常高,因此一般情况下很少使用while循环方式来读取数据。
多路复用I/O模型
Java NIO使用的即是这种模型。
多路复用IO主要用于处理多个IO连接时候的场景。在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才会真正调用实际的IO读写操作。因为在多路复用IO模型中,只需使用一个线程就可以管理多个socket,系统无需重复建立新的线程和销毁线程,也不必维护等,另一方面同时也避免了多线程之间的上下文切换导致的开销。并且只有在真正有socket读写事件进行时,才会使用IO资源,大大减少了资源占用,极大的提高了效率。
I/O多路复用比传统I/O好在哪里?
也许有小伙伴会说,我可以采用多线程+ 阻塞IO达到类似的效果,但是你需要考虑到多线程+阻塞IO,没有改变的是每个socket还是对应着一个线程,仍然无法解决根本问题,并且尤其是对于长连接来说,线程的资源一直不释放,如果后面陆续还有很多连接的话,就会造成系统的极大压力。
而使用多路复用IO模式,通过一个线程就可以去管理多个socket,并且只有当socket真正有读写事件发生才会占用资源来进行实际的读写操作。所以多路复用IO模式一方面减少了创建和销毁线程的操作,同时也避免了多线程之间切换的开销,并且提高了并发数。
I/O多路复用为何比非阻塞式I/O模型效率高?
因为在非阻塞IO中,不断地询问socket状态是通过用户线程去进行的,但是在多路复用IO模型中,轮询每个socket状态是内核在进行的,这个效率要比用户线程要高的多。
注意事项:
多路复用IO模型是通过轮询的方式来检测是否有事件到达,并且对到达的事件逐一进行响应。因此对于多路复用IO模型来说,如果一旦事件响应体很大,那么就有可能会导致后续的事件迟迟得不到处理,并且会影响到新的事件轮询。
在Java NIO中的理解是:
Selector就是一个socket选择器,它不停地查看所有与他绑定的socket是否准备完成,哪一个io准备完成,它就会处理对应的channel。
但是终究选择什么样的IO模型,还是需要根据实际问题实际分析的。
五、三大核心组件 | Buffer
基本介绍
Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中的。
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。
Buffer结构
Buffer是一个顶级的抽象类,底下有很多的实现类,这里是小小的一个图哈。从这张图可用看出Java
中的基本数据类型(boolean
除外),都有一个Buffer对应,不过其中ByteBuffer使用的最多。
Buffer API:
public abstract class Buffer { public final int capacity() ;//返回此缓冲区的容量。 public final int position();//返回此缓冲区的位置。 public Buffer position(int newPosition) ;//设置此缓冲区的位置。 如果标记已定义且大于新位置,则将其丢弃。 public final int limit();//返回此缓冲区的限制。 public Buffer limit(int newLimit) ;//设置此缓冲区的限制。 public Buffer mark() ;//在其位置设置此缓冲区的标记。 public Buffer reset();//将此缓冲区的位置重置为先前标记的位置。 public Buffer clear() ;//清除此缓冲区。 将位置设置为零,将限制设置为容量,并丢弃标记。 public Buffer flip();//翻转这个缓冲区。 将限制设置为当前位置,然后将位置设置为零。 如果定义了标记,则将其丢弃。 public Buffer rewind();//倒带此缓冲区。 位置设置为零,标记被丢弃。 public final int remaining() ;//返回当前位置和限制之间的元素数。 public final boolean hasRemaining();//告诉当前位置和限制之间是否有任何元素。 public abstract boolean isReadOnly();//告诉这个缓冲区是否是只读的。 // -----------jdk1.6引入---------- public abstract boolean hasArray();//告诉此缓冲区是否由可访问数组支持。 public abstract Object array();//返回支持此缓冲区的数组(可选操作) 。 public abstract int arrayOffset();//返回缓冲区第一个元素在此缓冲区的后备数组中的偏移量 public abstract boolean isDirect();//判断此缓冲区是否为direct 。 }
六、三大核心组件 | Channel
基本介绍
Java NIO的通道类似流,但又有些不同:
- 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
- 通道可以异步地读写。
- 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。
常用Channel的实现类
以下是Java NIO中最常用Channel的通道的实现:
FileChannel
从文件中读写数据。DatagramChannel
能通过UDP读写网络中的数据。SocketChannel
能通过TCP读写网络中的数据。ServerSocketChannel
可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
结构图:
小小的应用案例:
场景:使用一个Buffer
和FileChannel
完成文件读取、写入。
my.txt--->you.txt
public class NIOFileChannel { public static void main(String[] args) throws Exception { FileInputStream fileInputStream = new FileInputStream("my.txt"); FileChannel fileChannel01 = fileInputStream.getChannel(); FileOutputStream fileOutputStream = new FileOutputStream("you.txt"); FileChannel fileChannel02 = fileOutputStream.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(512); //循环读取 while (true) { //清空 buffer 缓冲区 byteBuffer.clear(); int read = fileChannel01.read(byteBuffer); System.out.println("read = " + read); //表示读完 if (read == -1) { break; } //将 buffer 中的数据写入到 fileChannel02--2.txt byteBuffer.flip(); fileChannel02.write(byteBuffer); } //关闭相关的流 fileInputStream.close(); fileOutputStream.close(); } }
七、三大核心组件 | Selector
基本介绍
Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,才使得一个单独的线程可以管理多个channel,从而管理多个网络连接。
在这里只要知道使用Selector能够处理多个通道就足够了,其他的暂时先不去瞄了哈。
模型图:
Selector | API:
public abstract class Selector implements Closeable { public static Selector open() ;//打开一个选择器。得到一个选择器对象 public abstract boolean isOpen();//告诉这个选择器是否打开 public abstract SelectorProvider provider(); //返回创建此频道的提供者。 public abstract Set<SelectionKey> keys(); //返回此选择器的键集 public abstract Set<SelectionKey> selectedKeys();//返回此选择器的选定键集。 public abstract int selectNow() throws IOExcep tion;//不阻塞,立马返还 ,其对应的通道已准备好进行 I/O 操作。 public abstract int select(long timeout) throws IOException;//监控所有注册的通道,当其中有IO操作时,将对应的SelectionKey加入到内部集合中并返回,参数用来设置超时时间。 public abstract Selector wakeup();//使尚未返回的第一个选择操作立即返回。如果另一个线程当前在选择操作中被阻塞,那么该调用将立即返回。 public abstract void close() throws IOException;//关闭此选择器。 }
因为要结合其他组件才能体现出Selector
,所以案例放在后文中了。
八、NIO 网络编程分析图
模型图如下:
流程分析:
- 服务器端启动,打开服务器的ServerSocketChannel,创建并且打开Selector,将ServerSocketChannel注册到selector上
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数
if (selector.select(1000) == 0) //这里我写的是每秒刷新一次
- 客户端启动,打开客户端的SocketChannel,绑定服务端地址,向服务端发送连接请求
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(inetSocketAddress)
- 服务端Selector监听到注册事件发生后,通过
selector.selectedKeys()
方法拿到SelectionKey
,会和该 Selector 关联(集合)
//通过selector得到selectionKey Set<SelectionKey> selectionKeys = selector.selectedKeys();
- 再通过 java.nio.channels.SelectionKey 反向获取 SocketChannel , 方法 channel()
SocketChannel channel = (SocketChannel) selectionKey.channel();
- 可以通过得到的
channel
, 完成业务处理
九、快速入门案例(一)
一直写理论,不写代码,看着发愁。所以一起来看看这个简单的入门案例吧。
场景:NIO
入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; public class NioServerTest { public static void main(String[] args) throws Exception{ //1、创建一个ServerSocketChannel对象,绑定端口并配置成非阻塞模式。 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8888), 1024); //2、下面这句必需要,否则ServerSocketChannel会使用阻塞的模式,那就不是NIO了 serverSocketChannel.configureBlocking(false); //3、把ServerSocketChannel交给Selector监听 Selector selector = Selector.open(); //4、注册进Selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //5、循环,不断的从Selector中获取准备就绪的Channel,最开始的时候Selector只监听了一个ServerSocketChannel //但是后续有客户端连接时,会把客户端对应的Channel也交给Selector对象 while (true) { //6、一直监听,每秒刷新一次,等待客户端的连接 if (selector.select(1000) == 0){ continue; } //获取所有的准备就绪的Channel,SelectionKey中包含中Channel信息 Set<SelectionKey> selectionKeySet = selector.selectedKeys(); //遍历,每个Channel都可处理 for (SelectionKey selectionKey : selectionKeySet) { //如果Channel已经无效了,则跳过(如Channel已经关闭了) if(!selectionKey.isValid()) { continue; } //判断Channel具体的就绪事件,如果是有客户端连接,则建立连接 if (selectionKey.isAcceptable()) { System.out.println("连接成功!!!"); //当确定有selectionKey时,说明必有socketChannel,Server接收后得到SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); //把客户端的Channel交给Selector监控,之后如果有数据可以读取时,会被select出来 socketChannel.register(selector, SelectionKey.OP_READ); } //如果有客户端可以读取请求了,则读取请求然后返回数据 if (selectionKey.isReadable()) { System.out.println(readFromSelectionKey(selectionKey)); } } //处理完成后把返回的Set清空,如果不清空下次还会再返回这些Key,导致重复处理 selectionKeySet.clear(); } } //从客户端读取数据的庐江 private static String readFromSelectionKey(SelectionKey selectionKey) throws Exception{ //从SelectionKey中包含选取出来的Channel的信息把Channel获取出来 SocketChannel socketChannel = ((SocketChannel) selectionKey.channel()); //读取数据到ByteBuffer中 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = socketChannel.read(byteBuffer); //如果读到-1,说明数据已经传输完成了,可以并闭 if (len < 0) { socketChannel.close(); selectionKey.cancel(); return ""; } else if(len == 0) { //什么都没读到 return ""; } byteBuffer.flip(); doWrite(selectionKey, "Hello Nio"); return new String(byteBuffer.array(), 0, len); } private static void doWrite(SelectionKey selectionKey, String responseMessage) throws Exception{ System.err.println("Output message..."); SocketChannel socketChannel = ((SocketChannel) selectionKey.channel()); ByteBuffer byteBuffer = ByteBuffer.allocate(responseMessage.getBytes().length); byteBuffer.put(responseMessage.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); } }
客户端:
package com.crush.nio.nio2;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Set;/** * @Author: crush * @Date: 2021-08-24 16:01 * version 1.0 */public class NioClientTest { public static void main(String[] args) throws Exception { //创建一个SocketChannel对象,配置成非阻塞模式 SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); //创建一个选择器,并把SocketChannel交给selector对象 Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); //发起建立连接的请求,这里会立即返回,当连接建立完成后,SocketChannel就会被选取出来 socketChannel.connect(new InetSocketAddress("localhost", 8888)); //遍历,不段的从Selector中选取出已经就绪的Channel,在这个例子中,Selector只监控了一个SocketChannel while (true) { selector.select(1000); Set<SelectionKey> selectionKeySet = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeySet) { if (!selectionKey.isValid()) { continue; } //连接建立完成后的操作:直接发送请求数据 if (selectionKey.isConnectable()) { if (socketChannel.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_READ); doWriteRequest(((SocketChannel) selectionKey.channel())); } } //如果当前已经可以读数据了,说明服务端已经响应完了,读取数据 if (selectionKey.isReadable()) { doRead(selectionKey); } } //最后同样要清除所有的Key selectionKeySet.removeAll(selectionKeySet); } } //发送请求 private static void doWriteRequest(SocketChannel socketChannel) throws Exception { System.err.println("start connect..."); //创建ByteBuffer对象,会放入数据 ByteBuffer byteBuffer = ByteBuffer.allocate("Hello Server!".getBytes().length); byteBuffer.put("Hello Server!".getBytes()); byteBuffer.flip(); //写数据 socketChannel.write(byteBuffer); if (!byteBuffer.hasRemaining()) { System.err.println("Send request success..."); } } //读取服务端的响应 private static void doRead(SelectionKey selectionKey) throws Exception { SocketChannel socketChannel = ((SocketChannel) selectionKey.channel()); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = socketChannel.read(byteBuffer); System.out.println("Recv:" + new String(byteBuffer.array(), 0, len)); }}
先启动服务器端,再启动客户端,直接可以在客户端看到输出消息。
十、快速入门案例(二)
/** * @Author: crush * @Date: 2021-08-24 16:33 * version 1.0 */ public class GroupChatServer { //定义属性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; //构造器 //初始化工作 public GroupChatServer() { try { //得到选择器 selector = Selector.open(); //ServerSocketChannel listenChannel = ServerSocketChannel.open(); //绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞模式 listenChannel.configureBlocking(false); //将该 listenChannel 注册到 selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } public void listen() { try { //循环处理 while (true) { int count = selector.select(); if (count > 0) { //有事件处理 // 遍历得到 selectionKey 集合 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //取出 selectionkey SelectionKey key = iterator.next(); //监听到 accept if (key.isAcceptable()) { SocketChannel sc = listenChannel.accept(); sc.configureBlocking(false); //将该 sc 注册到 seletor sc.register(selector, SelectionKey.OP_READ); //提示 System.out.println(sc.getRemoteAddress() + " 上线 "); } if (key.isReadable()) {//通道发送read事件,即通道是可读的状态 // 处理读(专门写方法..) readData(key); } //当前的 key 删除,防止重复处理 iterator.remove(); } } else { System.out.println("等待...."); } } } catch (Exception e) { e.printStackTrace(); } finally { //发生异常处理.... } } //读取客户端消息 public void readData(SelectionKey key) { SocketChannel channel = null; try { //得到 channel channel = (SocketChannel) key.channel(); //创建 buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根据 count 的值做处理 if (count > 0) { //把缓存区的数据转成字符串 String msg = new String(buffer.array()); //输出该消息 System.out.println("form客户端:" + msg); //向其它的客户端转发消息(去掉自己),专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } } catch (IOException e) { try { System.out.println(channel.getRemoteAddress() + "离线了.."); //取消注册 key.cancel(); //关闭通道 channel.close(); } catch (IOException e2) { e2.printStackTrace(); } } } //转发消息给其它客户(通道) private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException { System.out.println("服务器转发消息中..."); //遍历所有注册到 selector 上的 SocketChannel,并排除 self for (SelectionKey key : selector.keys()) { //通过 key 取出对应的 SocketChannel Channel targetChannel = key.channel(); //排除自己 if (targetChannel instanceof SocketChannel && targetChannel != self) { //转型 SocketChannel dest = (SocketChannel) targetChannel; //将 msg 存储到 buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //将 buffer 的数据写入通道 dest.write(buffer); } } } public static void main(String[] args) { //创建服务器对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }
客户端:
/** * @Author: crush * @Date: 2021-08-24 16:33 * version 1.0 */ public class GroupChatClient { //定义相关的属性 private final String HOST = "127.0.0.1";//服务器的ip private final int PORT = 6667;//服务器端口 private Selector selector; private SocketChannel socketChannel; private String username; //构造器,完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将 channel 注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到 username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok..."); } //向服务器发送消息 public void sendInfo(String info) { info = username + " 说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (IOException e) { e.printStackTrace(); } } //读取从服务器端回复的消息 public void readInfo() { try { int readChannels = selector.select(); if (readChannels > 0) {//有可以用的通道 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { //得到相关的通道 SocketChannel sc = (SocketChannel) key.channel(); //得到一个 Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //读取 sc.read(buffer); //把读到的缓冲区的数据转成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); //删除当前的 selectionKey,防止重复操作 } else { //System.out.println("没有可以用的通道..."); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //启动我们客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程,每个 3 秒,读取从服务器发送数据 new Thread() { public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } } }
测试:
十一、自言自语
个人感受:越学习到后面,就越感觉基础的重要性。很多技术都只会去用,很多时候甚至都不懂它的设计理念也不懂为什么要那样做。
最近在持续更新中,如果你觉得对你有所帮助,也感兴趣的话,关注我吧,让我们一起学习,一起讨论吧。
在学习路上充满好奇心,明白思考的重要性,是支持我一直学习下去的积极推动力吧。希望你也能喜欢上编程!😁
热爱生活,享受生活!!!无论在哪里,无论此时的生活多么艰难,都要记得热爱生活!!!相信总会有光来的。
你好,我是博主宁在春
,Java学习路上的一颗小小的种子,也希望有一天能扎根长成苍天大树。
希望与君共勉
😁
我们:待别时相见时,都已有所成。
本文是个人的学习笔记,夹杂着个人理解,如有不对,请不啬赐教。课程是来自于尚硅谷-韩顺平老师Netty课程
参考: 五种IO模型详解及优缺点