基于NIO的三种Reactor模式

简介: 基于NIO的三种Reactor模式

传统的bio堵塞类型io传输由于每一次进行数据传输的时候都需要等待排队,因此效率低下,后期随着jdk的一步步发展,nio非堵塞技术开始变多越来越加广泛。


 

什么是Reactor


Reactor可以理解为反应器模式。当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。


NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的SocketChannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的SocketChannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。

 

单线程Reactor****模式


Reactor里面的单线程模式的结构图:


 

网络异常,图片无法展示
|


当有多个请求发送到server的时候,会经过反应器对其进行处理,相应的代码如下所示:


import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * @author idea
 * @data 2019/4/11
 */
@Slf4j
public class NioServer {
    public static void main(String[] args) {
        try {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("server is open!");
            while (true) {
                if (selector.select() > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isReadable()) {
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int len = 0;
                            //当管道的数据都读取完毕了
                            while ((len = (socketChannel.read(byteBuffer))) > 0) {
                                byteBuffer.flip();
                                System.out.println(new String(byteBuffer.array(), 0, len));
                                byteBuffer.clear();
                            }
                        } else if (selectionKey.isAcceptable()) {
                            //第一次链接到server,需要构建一个通道
                            ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
                            //开通通道
                            SocketChannel socketChannel = acceptServerSocketChannel.accept();
                            //设置为非堵塞
                            socketChannel.configureBlocking(false);
                            //注册可读的监听事件
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println("[server]接收到新的链接");
                        }
                        iterator.remove();
                    }
                }
            }
        } catch (IOException e) {
            log.error("[server]异常出现,信息为{}", e);
        }
    }
}
复制代码


单线程模式的Reactor有一个很明显的缺点,那就是在处理请求的时候,对于不同状态的通道处理,以及请求的监听全部都放在了单个线程上进行,(多个Channel可以注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel))因此效率很低下。因此就会有了第二种Reactor模式。

 

多线程Reactor****模式


在原先的单线程模式中,一个线程同时处理多个请求,但是所有的读写请求以及对于数据的处理都在同一线程中,无法充分利用多cpu的优势,因此诞生了这种多线程的Reactor模式。


多线程的Reactor模式基本结构图如下所示:


网络异常,图片无法展示
|


代码如下所示:


import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * @author idea
 * @data 2019/4/11
 */
@Slf4j
public class Server {
    public static void main(String[] args) {
        try {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("[server]开始启动服务器");
            while (true) {
                if (selector.selectNow() < 0) {
                    continue;
                }
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        Processor processor = (Processor) selectionKey.attachment();
                        processor.process(selectionKey);
                    } else if (selectionKey.isAcceptable()) {
                        ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel socketChannel = acceptServerSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
                        //绑定处理器线程
                        key.attach(new Processor());
                        System.out.println("[server]接收到新的链接");
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            log.error("[server]异常出现,信息为{}", e);
        }
    }
}
复制代码


从代码可以看到,每次当系相应的channel注册完相应的OP_READ事件后,可以对相应的SelectionKey attach一个对象(本例中attach了一个Processor对象,该对象处理读请求),并且在获取到可读事件后,可以取出该对象。


再看到相应的Processor对象代码


import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 处理器
 *
 * @author idea
 * @data 2019/4/11
 */
public class Processor {
    private static final ExecutorService service = new ThreadPoolExecutor(16, 16,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    public void process(SelectionKey selectionKey) {
        service.submit(() -> {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
                socketChannel.close();
                selectionKey.cancel();
                System.out.println("读取结束!");
                return null;
            } else if (count == 0) {
                return null;
            }
            System.out.println("读取内容:" + new String(buffer.array()));
            return null;
        });
    }
}
复制代码


需要开启一个线程池来进行数据处理的任务。这里面就将数据处理的压力分担给了线程池来执行,充分利用了多线程的优势,将新线程的连接和数据的io操作分别放在了不同的线程中进行运行。


在上述的多线程Reactor模式中,有专门的nio-acceptor线程来用于监听服务器,接收客户端的tcp连接。然后又有专门的线程池来处理消息的读取,发送,编码解码等工作。一个nio同时处理N条链路,每个链路只对应一个NIO线程。(防止了并发操作的发生)。看似这样的安排很美好,也确实能解决大多数应用场景的问题。


但是在极端情况下仍然会有弊端,单独的NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。


主从Reactor多线程模式


网络异常,图片无法展示
|


 

主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(subreactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端SubReactor线程池的IO线程上,由IO线程负责后续的IO操作。


.相应代码:


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * @author idea
 * @data 2019/4/11
 */
public class Server {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(1234));
        //初始化通道,标志为accept类型
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        int coreNum = Runtime.getRuntime().availableProcessors();
        Processor[] processors = new Processor[coreNum];
        for (int i = 0; i < processors.length; i++) {
            processors[i] = new Processor();
        }
        int index = 0;
        //一直处于堵塞的状态
        while (selector.select() > 0) {
            //获取到selectionkey的集合
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {
                    ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = acceptServerSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
                    Processor processor = processors[(int) ((index++) / coreNum)];
                    processor.addChannel(socketChannel);
                }
                iterator.remove();
            }
        }
    }
}
复制代码


处理器部分的代码:


import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class Processor {
    private static final ExecutorService service =
            Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
    private Selector selector;
    public Processor() throws IOException {
        this.selector = SelectorProvider.provider().openSelector();
        start();
    }
    public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }
    public void start() {
        service.submit(() -> {
            while (true) {
                if (selector.selectNow() <= 0) {
                    continue;
                }
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        int count = socketChannel.read(buffer);
                        if (count < 0) {
                            socketChannel.close();
                            key.cancel();
                            System.out.println("读取结束" + socketChannel);
                            continue;
                        } else if (count == 0) {
                            System.out.println("客户端信息大小:" + socketChannel);
                            continue;
                        } else {
                            System.out.println("客户端信息:" + new String(buffer.array()));
                        }
                    }
                }
            }
        });
    }
}
复制代码


通常在互联网公司中,对于一些高并发的应用场景里面都会使用到了Reactor模式,其代替了常用的多线程处理方式,节省系统的资源,提高系统的吞吐量。类似于一些netty框架的核心原理其实就是通过nio的Reactor模式来进行设计和开发。

目录
相关文章
|
8月前
|
网络协议 Java
【JAVA基础】- 同步非阻塞模式NIO详解
【JAVA基础】- 同步非阻塞模式NIO详解
90 0
|
9月前
|
网络协议 Java API
JAVA IO模式 —— BIO、NIO、AIO
JAVA IO模式 —— BIO、NIO、AIO
137 0
|
11月前
|
消息中间件 存储 网络协议
Linux五种I/O模式 NIO BIO AIO IO多路复用 信号驱动 I/O
Linux五种I/O模式 NIO BIO AIO IO多路复用 信号驱动 I/O
145 0
|
缓存 中间件 Java
Web中间件——Tomcat中的BIO、NIO、APR模式
Tomcat在我们日常开发B/S项目时常常进行使用,当然在Spring全家桶中我们现在也是使用的内置tomcat,但是很多人可能不会对于tomcat进行深入性研究,其实在我们的产品进行性能提升时,web中间件的优化也是占有很大一部分,而tomcat中采用不同的模式对应的使用场景下性能也是不一样的,因此本篇文章将对于这几个模式进行简单讲解,后续我们会对于不同的模式进行配置方式讲解。
128 0
Web中间件——Tomcat中的BIO、NIO、APR模式
|
缓存 网络协议 安全
【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
197 0
【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
|
Java 数据处理 分布式计算
|
算法 Java
JAVA并发处理经验(四)并行模式与算法6:NIO网络编程
一、前言 首先我们必须了解NIO的一些基本概念 channel:是NIO中的一个通道,类似我们说的流。
870 0
|
Java Unix Android开发
Java NIO中的Glob模式详解
版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/46482571 Java NIO中的Glob模式详解 作者:chszs,转载需注明。
1139 0
|
2月前
|
存储 Java 数据处理