1. 用户态和内核态
操作系统为了限制程序的数据访问,来防止获取其他进程或外围设备数据,将CPU划分为用户态和内核态:
内核态(Kernel Mode):cpu可以访问内存的所有数据,包括外围设备,例如硬盘,网卡,cpu也可以将自己从一个程序切换到另一个程序。
用户态(User Mode):只能受限的访问内存,且不允许访问外围设备,占用cpu的能力被剥夺,cpu资源可以被其他程序获取。
一个完整的I/O操作分为两步:
用户层API调用;
内核层完成系统调用,实际调用read发起I/O请求,读取socket中的内容。
通常讲的“异步/同步”的是指用户层API调用;“阻塞/非阻塞”是指内核完成I/O调用的模式:
nio是同步阻塞io,bio是同步非阻塞io,两者的区别在于是否阻塞,就是内核层io调用模式。并且阻塞是对于网络io来说的,对于磁盘io,总是视为非阻塞的。
同步是指API调用完成之前会一直等待;阻塞是指系统调用的时候,如果没有数据可供操作,那么线程就会阻塞,直到有有新的数据到来。例如当系统调用read从socket里读取数据,如果此时没有数据可读,线程阻塞,直到有数据可读。
2. Blocking IO
bio中,用户层调用API的读、写操作,之后内核层转化为read、write的I/O请求,直到I/O请求完成API调用才会完成。此时,在API调用期间用户程序是同步的,并且这个API调用会导致系统以阻塞的模式执行I/O,如果此时没有数据可供操作,那么该线程就放弃CPU并被挂起。
以read为例,内核层的两个阶段:
读取数据:如果还没有收到一个完整的TCP包(或者没有数据),系统内核就要等待足够的数据到来;
拷贝数据:数据准备好了,系统将数据从内核拷贝到用户内存中,然后系统内核返回结果。
BIO要等拷贝数据完成后,进程才接触阻塞状态。
举个例子,当内核层用read去读取网络的数据时,是无法预知此时是否有数据的,因此在收到数据之前,能做的只有等待,直到对方把数据发过来,或者等到网络超时。
BIO模式的socket,其socket.accept()、socket.read()、socket.write()都是阻塞型的,阻塞状态下,进程无法做任何事情。
BIO下,单线程的网络服务容易卡死,因此需要多线程来响应并发网络请求,每个线程处理一个网络请求。但仍存在问题:
线程越多,线程切换越多,而线程切换是耗CPU的重操作,会浪费大量的CPU资源;
每个线程都会分配操作栈,如果每个线程占用1MB内存,1000个线程就占用了1G的内存。
还可以使用线程池和连接池,复用已有的线程和连接,但是池也是有上限的。
2.1 BIO示例
server端监听在9000端口,当有连接接入时就新建一个线程去处理:
public class BIOServer {
private static int DEFAULT_PORT = 9000;
private static ServerSocket serverSocket;
public synchronized static void start() throws IOException {
if (serverSocket != null) return;
try {
serverSocket = new ServerSocket(DEFAULT_PORT);
while (true) {
Socket socket = serverSocket.accept();
new Thread(new ServerHandler(socket)).start();
}
} finally {
if (serverSocket != null) serverSocket.close();
}
}
}
ServerHandler负责处理连接:
public class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
String message;
while (true) {
if ((message = in.readLine()) == null) break;
System.out.println(("client:" + message));
out.println(message + 1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {in.close();} catch (IOException e) {e.printStackTrace();}
}
if (out != null) out.close();
if (socket != null) {
try {socket.close();} catch (IOException e) {e.printStackTrace();}
}
}
}
}
client端去连接server端:
public class Client {
//默认的端口号
private static int DEFAULT_SERVER_PORT = 9000;
private static String DEFAULT_SERVER_IP = "127.0.0.1";
public static void send(String expression) {
send(DEFAULT_SERVER_PORT, expression);
}
public static void send(int port, String message) {
Socket socket = null;
BufferedReader bufferedReader = null;
PrintWriter printWriter = null;
try {
socket = new Socket(DEFAULT_SERVER_IP, port);
bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
printWriter = new PrintWriter(socket.getOutputStream(), true);
printWriter.println(message);
System.out.println(("server:" + bufferedReader.readLine()));
System.out.println();
} catch (Exception e) {
} finally {
if (bufferedReader != null) {
try {bufferedReader.close();} catch (IOException e) {}
}
if (printWriter != null) printWriter.close();
if (socket != null) {
try {socket.close();} catch (IOException e) {}
}
}
}
}
程序启动入口:
public class Test {
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
try {BIOServer.start();} catch (IOException e) {e.printStackTrace();}
}
}).start();
Thread.sleep(100);
final Random random = new Random(System.currentTimeMillis());
new Thread(new Runnable() {
@Override
public void run() {
String message = "hello: " + random.nextInt(10);
Client.send(message);
try {
Thread.sleep(random.nextInt(1000));
}catch (InterruptedException e){
e.printStackTrace();
}
}
}).start();
}
}
3. Non-Blocking
从上面的阐述可知,系统I/O都分为两个阶段:等待就绪和操作。以read为例,分为等待可读和真正的读;同理,write分为等待网卡可写和真正的写。
这两个阶段都会导致阻塞,但是等待就绪的阻塞是不使用CPU的,是在“空等”;而真正的读写操作的阻塞是使用CPU的,真正在"干活",而且属于memory-copy,过程非常快。
以socket.read()为例子:
BIO的socket.read(),如果TCP-RecvBuffer里没有数据,会一直阻塞直到收到数据,读取并返回;
NIO下,如果TCP-RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回,永远不会阻塞;
AIO下,不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。
换句话说:
BIO两个阶段都会阻塞;
NIO等待就绪阶段不阻塞,真正操作的阶段会阻塞;
AIO两个阶段都不阻塞。
所以,NIO的重要的特点是:socket的read、write、accept在等待就绪阶段都是非阻塞的,真正的I/O操作虽然阻塞但是执行效率非常高。
回到NIO,没有取到数据后立即返回,但是返回后此时用户层实际上并没有获得想要的数据,解决方式就是用户层轮询,不断的询问内核层有没有数据到达,有了就处理,没有则仍然返回。
但这样会带来两个新问题:
如果有大量IO要处理,那么用户层会不断轮询,read、write是系统调用,每调用一次就得在用户态和核心态切换一次;
如果轮询设置间隙,没有数据就休息一会在继续轮询,此时轮询间隙不好设置,休息时间太长则程序响应延迟过大;时间太短就会造成频繁的重试,耗费CPU资源。
IO多路复用可以解决这个问题,IO多路复用是程序注册一组socket文件描述符给操作系统,让操作系统同时监听这多个socket的事件,表示“我要监视这些fd是否有IO事件发生,有了就告诉程序处理”。
IO多路复用通常配合NIO同时使用,尽管在操作系统级别,NIO和IO多路复用是两个相对独立的事情:NIO仅仅是指等待就绪阶段不会阻塞;而IO多路复用仅仅是操作系统提供的一种便利的通知机制。操作系统并不会强制这俩必须一起使用,—你可以单独用NIO;也可以让IO多路复用配合BIO。
但是,IO多路复用和NIO是要配合一起使用才有实际意义,单独用NIO无法避免轮询的副作用,而IO多路复用配合BIO的效果还是导致线程被卡住。
我们通常说的NIO,都是这种和IO多路复用结合后的NIO,例如java的NIO,之后我们说的也是这种结合后的NIO。
2.1 NIO主要组成
NIO主要有三大核心部分:
Channel(通道)
Buffer(缓冲区)
Selector(选择器)
NIO的主要监听事件有:
读就绪
写就绪
有新连接到来
传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector用于监听多个通道的事件,比如连接打开、数据可读等,这种模式下,单个线程就可以监听多个数据通道Channel。
2.1.1 Channel
Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的,譬如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
java的BIO创建的第一步是建立socket连接,NIO也是建立连接,只不过不是采用 new socket()的方式,而是引入了一个新的概念SocketChannel。它可以看作是socket的封装和扩展,除了提供 Socket 的相关功能外,还提供了许多其他特性,例如向选择器注册。
client端建立连接代码实现:
// 初始化 socket,建立 socket 与 channel 的绑定关系
SocketChannel socketChannel = SocketChannel.open();
// 初始化远程连接地址
SocketAddress remote = new InetSocketAddress(host, port);
// 设置I/O模式,是否阻塞
socketChannel.configureBlocking(false);
// 注册感兴趣的事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, new Buffers(256, 256));
// 建立连接
socketChannel.connect(remote);
java中NIO的Channel的主要实现有:
FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel
分别可以对应文件IO、UDP、TCP客户端和TCPServer端。
2.1.2 Buffer
通常情况下,操作系统的一次写操作分为两步:
将数据从用户空间拷贝到系统空间。
从系统空间往网卡写。
同理,读操作也分为两步:
将数据从网卡拷贝到系统空间;
将数据从系统空间拷贝到用户空间。
对于NIO来说,缓存的使用可以使用DirectByteBuffer和HeapByteBuffer。如果使用了DirectByteBuffer,一般来说可以减少一次系统空间到用户空间的拷贝。但Buffer创建和销毁的成本更高,更不宜维护,通常会用内存池来提高性能。
如果数据量比较小的中小应用情况下,可以考虑使用heapBuffer;反之可以用directBuffer。
ByteBuffer属性:
byte[] buff //buff即内部用于缓存的数组。
position //当前读写数据的位置,可以理解为指针指向的位置。
mark //为某一读过的位置做标记,便于某些时候回退到该位置。
capacity //初始化时候的容量。
limit //当写数据到buffer中时,limit一般和capacity相等,当读数据时,limit代表buffer中有效数据的长度。
这些属性总是满足以下条件:
0 <= mark <= position <= limit <= capacity
常用方法如下:
ByteBuffer allocate(int capacity) //创建一个指定capacity的HeapByteBuffer。
ByteBuffer allocateDirect(int capacity) //创建一个DirectByteBuffer
ByteBuffer wrap(byte [] array)//把一个byte数组或byte数组的一部分包装成ByteBuffer。
Buffer clear() 把position设为0,把limit设为capacity,一般在把数据写入Buffer前调用。
Buffer flip() 把limit设为当前position,把position设为0,同时丢弃mark值。
一般在读或写操作之后,调用本方法来准备写或读。
Buffer rewind() 把position设为0,limit不变,一般在把数据重写入Buffer前调用。
compact() 将 position 与 limit之间的数据复制到buffer的开始位置,复制后 position = limit -position,limit = capacity, 但如 果position 与limit 之间没有数据的话发,就不会进行复制。
mark() & reset() 通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。
使用示例:
public class BufferDemo {
public static void decode(String str) throws UnsupportedEncodingException {
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
byteBuffer.put(str.getBytes("UTF-8"));
byteBuffer.flip();
/*对获取utf8的编解码器*/
Charset utf8 = Charset.forName("UTF-8");
CharBuffer charBuffer = utf8.decode(byteBuffer);/*对bytebuffer中的内容解码*/
/*array()返回的就是内部的数组引用,编码以后的有效长度是0~limit*/
char[] charArr = Arrays.copyOf(charBuffer.array(), charBuffer.limit());
System.out.println(charArr);
}
public static void encode(String str){
CharBuffer charBuffer = CharBuffer.allocate(128);
charBuffer.append(str);
charBuffer.flip();
/*对获取utf8的编解码器*/
Charset utf8 = Charset.forName("UTF-8");
ByteBuffer byteBuffer = utf8.encode(charBuffer); /*对charbuffer中的内容解码*/
/*array()返回的就是内部的数组引用,编码以后的有效长度是0~limit*/
byte[] bytes = Arrays.copyOf(byteBuffer.array(), byteBuffer.limit());
System.out.println(Arrays.toString(bytes));
}
public static void main(String[] args) throws UnsupportedEncodingException {
BufferDemo.decode("中国樽");
BufferDemo.encode("中国樽");
}
}
2.1.3 Selectors 选择器
Java NIO的选择器允许一个单独的线程同时监视多个通道,可以注册多个通道到同一个选择器上,然后使用一个单独的线程来“选择”已经就绪的通道。这种“选择”机制为一个单独线程管理多个通道提供了可能。
Selector运行单线程处理多个Channel,使用之前先向Selector注册Channel关心的事件,然后调用其select()。这个方法会一直阻塞直至某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新的连接进来、数据接收等。
例如注册连接、可读、可写事件:
socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
接收事件:
public void select() throws IOException {
// 获取就绪的 socket 个数
while (selector.select() > 0) {
// 获取符合的 socket 在选择器中对应的事件句柄 key
Set keys = selector.selectedKeys();
// 遍历所有的key
Iterator it = keys.iterator();
while (it.hasNext()) {
// 获取对应的 key,并从已选择的集合中移除
SelectionKey key = (SelectionKey)it.next();
it.remove();
if (key.isConnectable()) {
// 进行连接操作
connect(key);
}
else if (key.isWritable()) {
// 可写事件,进行写操作
write(key);
}
else if (key.isReadable()) {
// 可读事件,进行读操作
receive(key);
}
}
}
}
2.2 NIO示例
Server端:
public class NioServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public NioServer() throws IOException {
// 打开 Server Socket Channel
serverSocketChannel = ServerSocketChannel.open();
// 配置为非阻塞
serverSocketChannel.configureBlocking(false);
// 绑定 Server port
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
// 创建 Selector
selector = Selector.open();
// 注册 Server Socket Channel 到 Selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server 启动完成");
handleKeys();
}
@SuppressWarnings("Duplicates")
private void handleKeys() throws IOException {
while (true) {
// 通过 Selector 选择 Channel
int selectNums = selector.select(30 * 1000L);
if (selectNums == 0) {
continue;
}
System.out.println("选择 Channel 数量:" + selectNums);
// 遍历可选择的 Channel 的 SelectionKey 集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 移除下面要处理的 SelectionKey
if (!key.isValid()) { // 忽略无效的 SelectionKey
continue;
}
handleKey(key);
}
}
}
private void handleKey(SelectionKey key) throws IOException {
// 接受连接就绪
if (key.isAcceptable()) {
handleAcceptableKey(key);
}
// 读就绪
if (key.isReadable()) {
handleReadableKey(key);
}
// 写就绪
if (key.isWritable()) {
handleWritableKey(key);
}
}
private void handleAcceptableKey(SelectionKey key) throws IOException {
// 接受 Client Socket Channel
SocketChannel clientSocketChannel = ((ServerSocketChannel) key.channel()).accept();
// 配置为非阻塞
clientSocketChannel.configureBlocking(false);
// log
System.out.println("接受新的 Channel");
// 注册 Client Socket Channel 到 Selector
clientSocketChannel.register(selector, SelectionKey.OP_READ, new ArrayList<String>());
}
private void handleReadableKey(SelectionKey key) throws IOException {
// Client Socket Channel
SocketChannel clientSocketChannel = (SocketChannel) key.channel();
// 读取数据
ByteBuffer readBuffer = CodecUtil.read(clientSocketChannel);
// 处理连接已经断开的情况
if (readBuffer == null) {
System.out.println("断开 Channel");
clientSocketChannel.register(selector, 0);
return;
}
// 打印数据
if (readBuffer.position() > 0) { // 写入模式下,
String content = CodecUtil.newString(readBuffer);
System.out.println("读取数据:" + content);
// 添加到响应队列
List<String> responseQueue = (ArrayList<String>) key.attachment();
responseQueue.add("响应:" + content);
// 注册 Client Socket Channel 到 Selector
clientSocketChannel.register(selector, SelectionKey.OP_WRITE, key.attachment());
}
}
@SuppressWarnings("Duplicates")
private void handleWritableKey(SelectionKey key) throws ClosedChannelException {
// Client Socket Channel
SocketChannel clientSocketChannel = (SocketChannel) key.channel();
// 遍历响应队列
List<String> responseQueue = (ArrayList<String>) key.attachment();
for (String content : responseQueue) {
// 打印数据
System.out.println("写入数据:" + content);
// 返回
CodecUtil.write(clientSocketChannel, content);
}
responseQueue.clear();
// 注册 Client Socket Channel 到 Selector
clientSocketChannel.register(selector, SelectionKey.OP_READ, responseQueue);
}
public static void main(String[] args) throws IOException {
new NioServer();
}
}
Client端:
public class NioClient {
private SocketChannel clientSocketChannel;
private Selector selector;
private final List<String> responseQueue = new ArrayList<String>();
private CountDownLatch connected = new CountDownLatch(1);
public NioClient() throws IOException, InterruptedException {
// 打开 Client Socket Channel
clientSocketChannel = SocketChannel.open();
// 配置为非阻塞
clientSocketChannel.configureBlocking(false);
// 创建 Selector
selector = Selector.open();
// 注册 Server Socket Channel 到 Selector
clientSocketChannel.register(selector, SelectionKey.OP_CONNECT);
// 连接服务器
clientSocketChannel.connect(new InetSocketAddress(8080));
new Thread(new Runnable() {
@Override
public void run() {
try {
handleKeys();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
if (connected.getCount() != 0) {
connected.await();
}
System.out.println("Client 启动完成");
}
@SuppressWarnings("Duplicates")
private void handleKeys() throws IOException {
while (true) {
// 通过 Selector 选择 Channel
int selectNums = selector.select(30 * 1000L);
if (selectNums == 0) {
continue;
}
// 遍历可选择的 Channel 的 SelectionKey 集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 移除下面要处理的 SelectionKey
if (!key.isValid()) { // 忽略无效的 SelectionKey
continue;
}
handleKey(key);
}
}
}
private synchronized void handleKey(SelectionKey key) throws IOException {
// 接受连接就绪
if (key.isConnectable()) {
handleConnectableKey(key);
}
// 读就绪
if (key.isReadable()) {
handleReadableKey(key);
}
// 写就绪
if (key.isWritable()) {
handleWritableKey(key);
}
}
private void handleConnectableKey(SelectionKey key) throws IOException {
// 完成连接
if (!clientSocketChannel.isConnectionPending()) {
return;
}
clientSocketChannel.finishConnect();
// log
System.out.println("接受新的 Channel");
// 注册 Client Socket Channel 到 Selector
clientSocketChannel.register(selector, SelectionKey.OP_READ, responseQueue);
// 标记为已连接
connected.countDown();
}
@SuppressWarnings("Duplicates")
private void handleReadableKey(SelectionKey key) throws ClosedChannelException {
// Client Socket Channel
SocketChannel clientSocketChannel = (SocketChannel) key.channel();
// 读取数据
ByteBuffer readBuffer = CodecUtil.read(clientSocketChannel);
// 打印数据
if (readBuffer.position() > 0) { // 写入模式下,
String content = CodecUtil.newString(readBuffer);
System.out.println("读取数据:" + content);
}
}
@SuppressWarnings("Duplicates")
private void handleWritableKey(SelectionKey key) throws ClosedChannelException {
// Client Socket Channel
SocketChannel clientSocketChannel = (SocketChannel) key.channel();
// 遍历响应队列
List<String> responseQueue = (ArrayList<String>) key.attachment();
for (String content : responseQueue) {
// 打印数据
System.out.println("写入数据:" + content);
// 返回
CodecUtil.write(clientSocketChannel, content);
}
responseQueue.clear();
// 注册 Client Socket Channel 到 Selector
clientSocketChannel.register(selector, SelectionKey.OP_READ, responseQueue);
}
public synchronized void send(String content) throws ClosedChannelException {
// 添加到响应队列
responseQueue.add(content);
// 打印数据
System.out.println("写入数据:" + content);
// 注册 Client Socket Channel 到 Selector
clientSocketChannel.register(selector, SelectionKey.OP_WRITE, responseQueue);
selector.wakeup();
}
public static void main(String[] args) throws IOException, InterruptedException {
NioClient client = new NioClient();
for (int i = 0; i < 3; i++) {
client.send("hello: " + i);
Thread.sleep(1000L);
}
}
}
编解码工具:
public class CodecUtil {
public static ByteBuffer read(SocketChannel channel) {
// 注意,不考虑拆包的处理
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
int count = channel.read(buffer);
if (count == -1) {
return null;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return buffer;
}
public static void write(SocketChannel channel, String content) {
// 写入 Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
buffer.put(content.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
// 写入 Channel
buffer.flip();
try {
// 注意,不考虑写入超过 Channel 缓存区上限。
channel.write(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static String newString(ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
System.arraycopy(buffer.array(), buffer.position(), bytes, 0, buffer.remaining());
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
}
3. Reactor
一般情况下,I/O 复用机制需要事件分发器(event dispatcher)。事件分发器的作用,即将那些事件分发给各读写事件的处理者,观察者们需要在开始的时候到分发器那里注册感兴趣的事件,并提供相应的处理者(event handler)。
涉及到事件分发器的两种模式称为:Reactor和Proactor。Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。在Reactor模式中,事件分发器等待某个事件或者可应用或个操作的状态发生,比如文件描述符可读写,或者是socket可读写,事件分发器就把这个事件传给事先注册的事件处理者去做实际的读写操作。
随着模型的演变,Reactor也分为了如下几种。
3.1 经典Reactor
当前模式下,Reactor为单线程,并且事件处理也使用reactor线程。
这种模式的坏处是,当其中某个handler阻塞时,会导致其他所有的client的handler都得不到执行, 并且更严重的是,handler的阻塞也会导致整个服务不能接收新的client请求,因此单线程Reactor模型用的比较少。
示例:
public class NIOServer {
private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// reactor同时监听连接、读、写事件
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if (count <= 0) {
socketChannel.close();
key.cancel();
LOGGER.info("Received invalide data, close the connection");
continue;
}
LOGGER.info("Received message {}", new String(buffer.array()));
}
keys.remove(key);
}
}
}
}
3.2 多工作线程Reactor模式
经典Reactor模式中,所有事件的处理器和Reactor共用一个线程,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。因此可以用本模式将事件处理和Reactor分离:
本模式下,事件处理和reactor分离,且事件处理使用多线程方式,示例:
public class NIOServer {
private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if(selector.selectNow() < 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// reactor同时监听连接、读、写事件
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
readKey.attach(new Processor());
} else if (key.isReadable()) {
Processor processor = (Processor) key.attachment();
processor.process(key);
}
}
}
}
}
class Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(Processor2.class);
private static final ExecutorService service = Executors.newFixedThreadPool(16);
public void process(SelectionKey selectionKey) {
// 事件处理由多线程负责
service.submit(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
selectionKey.cancel();
LOGGER.info("{}\t Read ended", socketChannel);
return null;
} else if(count == 0) {
return null;
}
LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
return null;
});
}
}
3.3 Reactor主从多线程
更进一步的,多Reactor模式下,主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟,netty使用了这种模式。
本模式下,mainReactor只负责监听新连接,而读写事件分发则交给了subReactor,示例:
public class NIOServer {
private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int coreNum = Runtime.getRuntime().availableProcessors();
Processor[] processors = new Processor[2 * coreNum];
for (int i = 0; i < processors.length; i++) {
processors[i] = new Processor();
}
int index = 0;
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
keys.remove(key);
// mainReactor只负责监听新连接事件
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
Processor processor = processors[(int) ((index++) % coreNum)];
processor.addChannel(socketChannel);
processor.wakeup();
}
}
}
}
}
class Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
private static final ExecutorService service =
Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
private Selector selector;
public Processor() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
start();
}
// 读写事件监听由subReactor负责
public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
public void wakeup() {
this.selector.wakeup();
}
public void start() {
// 事件处理由多线程处理
service.submit(() -> {
while (true) {
if (selector.select(500) <= 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
key.cancel();
LOGGER.info("{}\t Read ended", socketChannel);
continue;
} else if (count == 0) {
LOGGER.info("{}\t Message size is 0", socketChannel);
continue;
} else {
LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array()));
}
}
}
}
});
}
}
在Processor中,同样创建了一个静态的线程池,且线程池的大小为机器核数的两倍。每个Processor实例均包含一个Selector实例。同时每次获取Processor实例时均提交一个任务到该线程池,并且该任务正常情况下一直循环处理,不会停止。而提交给该Processor的SocketChannel通过在其Selector注册事件,加入到相应的任务中。由此实现了每个子Reactor包含一个Selector对象,并由一个独立的线程处理。
参考链接:
https://tech.meituan.com/2016/11/04/nio.html
http://www.jasongj.com/java/nio_reactor/