传统的bio堵塞类型io传输由于每一次进行数据传输的时候都需要等待排队,因此效率低下,后期随着jdk的一步步发展,nio非堵塞技术开始变多越来越加广泛。
什么是Reactor
Reactor可以理解为反应器模式。当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。
NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的SocketChannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的SocketChannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。
单线程的Reactor****模式
Reactor里面的单线程模式的结构图:
当有多个请求发送到server的时候,会经过反应器对其进行处理,相应的代码如下所示:
import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @author idea * @data 2019/4/11 */ @Slf4j public class NioServer { public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(9090)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server is open!"); while (true) { if (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = 0; //当管道的数据都读取完毕了 while ((len = (socketChannel.read(byteBuffer))) > 0) { byteBuffer.flip(); System.out.println(new String(byteBuffer.array(), 0, len)); byteBuffer.clear(); } } else if (selectionKey.isAcceptable()) { //第一次链接到server,需要构建一个通道 ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel(); //开通通道 SocketChannel socketChannel = acceptServerSocketChannel.accept(); //设置为非堵塞 socketChannel.configureBlocking(false); //注册可读的监听事件 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("[server]接收到新的链接"); } iterator.remove(); } } } } catch (IOException e) { log.error("[server]异常出现,信息为{}", e); } } } 复制代码
单线程模式的Reactor有一个很明显的缺点,那就是在处理请求的时候,对于不同状态的通道处理,以及请求的监听全部都放在了单个线程上进行,(多个Channel可以注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel))因此效率很低下。因此就会有了第二种Reactor模式。
多线程的Reactor****模式
在原先的单线程模式中,一个线程同时处理多个请求,但是所有的读写请求以及对于数据的处理都在同一线程中,无法充分利用多cpu的优势,因此诞生了这种多线程的Reactor模式。
多线程的Reactor模式基本结构图如下所示:
代码如下所示:
import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @author idea * @data 2019/4/11 */ @Slf4j public class Server { public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(9090)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("[server]开始启动服务器"); while (true) { if (selector.selectNow() < 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isReadable()) { Processor processor = (Processor) selectionKey.attachment(); processor.process(selectionKey); } else if (selectionKey.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); //绑定处理器线程 key.attach(new Processor()); System.out.println("[server]接收到新的链接"); } iterator.remove(); } } } catch (IOException e) { log.error("[server]异常出现,信息为{}", e); } } } 复制代码
从代码可以看到,每次当系相应的channel注册完相应的OP_READ事件后,可以对相应的SelectionKey attach一个对象(本例中attach了一个Processor对象,该对象处理读请求),并且在获取到可读事件后,可以取出该对象。
再看到相应的Processor对象代码
import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 处理器 * * @author idea * @data 2019/4/11 */ public class Processor { private static final ExecutorService service = new ThreadPoolExecutor(16, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); public void process(SelectionKey selectionKey) { service.submit(() -> { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); selectionKey.cancel(); System.out.println("读取结束!"); return null; } else if (count == 0) { return null; } System.out.println("读取内容:" + new String(buffer.array())); return null; }); } } 复制代码
需要开启一个线程池来进行数据处理的任务。这里面就将数据处理的压力分担给了线程池来执行,充分利用了多线程的优势,将新线程的连接和数据的io操作分别放在了不同的线程中进行运行。
在上述的多线程Reactor模式中,有专门的nio-acceptor线程来用于监听服务器,接收客户端的tcp连接。然后又有专门的线程池来处理消息的读取,发送,编码解码等工作。一个nio同时处理N条链路,每个链路只对应一个NIO线程。(防止了并发操作的发生)。看似这样的安排很美好,也确实能解决大多数应用场景的问题。
但是在极端情况下仍然会有弊端,单独的NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。
主从Reactor多线程模式
主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(subreactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端SubReactor线程池的IO线程上,由IO线程负责后续的IO操作。
.相应代码:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @author idea * @data 2019/4/11 */ public class Server { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); //初始化通道,标志为accept类型 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); int coreNum = Runtime.getRuntime().availableProcessors(); Processor[] processors = new Processor[coreNum]; for (int i = 0; i < processors.length; i++) { processors[i] = new Processor(); } int index = 0; //一直处于堵塞的状态 while (selector.select() > 0) { //获取到selectionkey的集合 Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); System.out.println("Accept request from {}" + socketChannel.getRemoteAddress()); Processor processor = processors[(int) ((index++) / coreNum)]; processor.addChannel(socketChannel); } iterator.remove(); } } } } 复制代码
处理器部分的代码:
import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class Processor { private static final ExecutorService service = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); private Selector selector; public Processor() throws IOException { this.selector = SelectorProvider.provider().openSelector(); start(); } public void addChannel(SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(this.selector, SelectionKey.OP_READ); } public void start() { service.submit(() -> { while (true) { if (selector.selectNow() <= 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); key.cancel(); System.out.println("读取结束" + socketChannel); continue; } else if (count == 0) { System.out.println("客户端信息大小:" + socketChannel); continue; } else { System.out.println("客户端信息:" + new String(buffer.array())); } } } } }); } } 复制代码
通常在互联网公司中,对于一些高并发的应用场景里面都会使用到了Reactor模式,其代替了常用的多线程处理方式,节省系统的资源,提高系统的吞吐量。类似于一些netty框架的核心原理其实就是通过nio的Reactor模式来进行设计和开发。