69.【nio】Reactor模式

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据传输服务 DTS,数据同步 1个月
简介: 【nio】Reactor模式

文档参考: 《Java高并发核心编程 卷1:NIO、Netty、Redis、ZooKeeper》

68.【nio】四种主要的IO模型

前文如上:



为什么学习Reactor模式?


为什么在学习Netty之前首先要学习Reactor模式呢?


资深程序员都知道,Java程序不是按照顺序执行的逻辑来组织的。代码中所用到的设计模式在一定程度上已经演变成代码的组织方式。越是高水平的Java代码,抽象的层次越高,到处都是高度抽象和面向接口的调用,大量用到继承、多态、设计模式。


在阅读别人的源代码时,如果不了解代码所使用的设计模式,往往会晕头转向,不知身在何处,对代码跟踪和阅读都很成问题。反过来,如果先掌握到代码的设计模式,再去阅读代码,其过程就会变得很轻松,代码也就不会那么难懂了。当然,在编写代码时,如果不能熟练地掌握设计模式,也很难写出高水平的Java代码。


在学习和掌握高并发通信过程时,会学到很多高并发通信的框架,比如Netty, Netty本身很抽象,大量应用了设计模式。所以,学习像Netty这样的“精品中的精品”框架也是需要先从设计模式入手的,而Netty的整体架构是基于Reactor模式的


什么是Reactor模式?


简介


站在巨人的肩膀上,首先引用一下Doug Lea大师在文章“Scalable IO in Java”中对Reactor模式的定义

Reactor模式由Reactor线程、Handlers处理器两大角色组成,两大角色的职责分别如下:

(1)Reactor线程的职责:负责响应IO事件,并且分发到Handlers处理器。

(2)Handlers处理器的职责:非阻塞的执行业务处理逻辑。

从上面的Reactor模式定义中看不出这种模式有什么神奇的地方。当然,从简单到复杂,Reactor模式也有很多版本,前面的定义仅仅是最为简单的一个版本。如果需要彻底了解Reactor模式,还得从最原始的OIO编程开始讲起。


多线程OIO的致命缺陷


在Java的OIO编程中,原始的网络服务器程序一般使用一个while循环不断地监听端口是否有新的连接。如果有,就调用一个处理函数来完成传输处理。示例代码如下:

while(true){    

socket = accept();

//阻塞,接收连接    

handle(socket) ;  

//读取数据、业务处理、写入结果

}

这种方法的最大问题是:如果前一个网络连接的handle(socket)没有处理完,那么后面的新连接无法被服务端接收,于是后面的请求就会被阻塞,导致服务器的吞吐量太低。这对于服务器来说是一个严重的问题。


为了解决这个严重的连接阻塞问题,出现了一个极为经典的模式:Connection Per Thread(一个线程处理一个连接)模式。(参照前文提到的四种io模型:同步阻塞io)示例代码如下:


package com.crazymakercircle.iodemo.OIO;

//省略import导入的Java类

class ConnectionPerThread implements Runnable {

   public void run() {

       try {

           //服务器监听socket

           ServerSocketserverSocket =

                   new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);

           while (!Thread.interrupted()) {

               Socket socket = serverSocket.accept();

               //接收一个连接后,为socket连接,新建一个专属的处理器对象

               Handler handler = new Handler(socket);

               //创建新线程,专门负责一个连接的处理

               new Thread(handler).start();

           }

       } catch (IOException ex) { /* 处理异常 */ }

   }

   //处理器,这里将内容回显到客户端

   static class Handler implements Runnable {

       final Socket socket;

       Handler(Socket s) {

           socket = s;

       }

       public void run() {

           while (true) {

               try {

                   byte[] input = new byte[1024];

                   /* 读取数据 */

                   socket.getInputStream().read(input);

                   /* 处理业务逻辑,获取处理结果*/

                   byte[] output =null;

                   /* 写入结果 */

                   socket.getOutputStream().write(output);

               } catch (IOException ex) { /*处理异常*/ }

           }

       }

   }

}

以上示例代码中,对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的socket连接的输入和输出。当然,服务器的监听线程也是独立的,任何socket连接的输入和输出处理都不会阻塞到后面新socket连接的监听和建立,这样服务器的吞吐量就得到了提升。早期版本的Tomcat服务器就是这样实现的。


Connection Per Thread模式(一个线程处理一个连接)的优点是解决了前面的新连接被严重阻塞的问题,在一定程度上较大地提高了服务器的吞吐量。


onnection Per Thread模式的缺点是对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。在系统中,线程是比较昂贵的系统资源。如果线程的数量太多,系统将无法承受。而且,线程的反复创建、销毁、切换也需要代价。因此,在高并发的应用场景下,多线程OIO的缺陷是致命的。


新的问题来了:如何减少线程数量?比如说让一个线程同时负责处理多个socket连接的输入和输出,行不行?看上去没有什么不可以,实际上作用不大。因为在传统OIO编程中每一次socket传输的IO读写处理都是阻塞的。在同一时刻,一个线程里只能处理一个socket的读写操作,前一个socket操作被阻塞了,其他连接的IO操作同样无法被并行处理。所以,在OIO中,即使是一个线程同时负责处理多个socket连接的输入和输出,同一时刻该线程也只能处理一个连接的IO操作。 (所以就需要这个线程去不断轮询一个连接内io读写状态,比如前文中提到的 同步非阻塞IO,每个用户线程需要不断地进行IO系统调用,轮询数据是否已经准备好,但是这么做还是有问题,就是同一时刻该用户线程也只能处理这一个用户连接的IO操作,如何让某个线程去处理多个io的读写状态呢?


如何解决Connection Per Thread模式的巨大缺陷呢?一个有效途径是使用Reactor模式。用Reactor模式对线程的数量进行控制,做到一个线程处理大量的连接。那么它是如何做到的呢?首先来看一个简单的版本——单线程的Reactor模式。


单线程Reactor模式


体来说,Reactor模式有点类似事件驱动模式

在事件驱动模式中,当有事件触发时,事件源会将事件分发到Handler(处理器),由Handler负责事件处理。


Reactor模式中的反应器角色类似于事件驱动模式中的事件分发器(Dispatcher)角色(有点servlet dispatcher 的意思。具体来说,在Reactor模式中有Reactor和Handler两个重要的组件:


(1)Reactor:负责查询IO事件,当检测到一个IO事件时将其发送给相应的Handler处理器去处理。这里的IO事件就是NIO中选择器查询出来的通道IO事件。

(2)Handler:与IO事件(或者选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写到通道等。


什么是单线程Reactor


什么是单线程版本的Reactor模式呢?简单地说,Reactor和Handlers处于一个线程中执行。这是最简单的Reactor模型,如图所示。

image.png

基于Java NIO如何实现简单的单线程版本的Reactor模式呢?需要用到SelectionKey(选择键)的几个重要的成员方法:


(1)void attach(Object o):将对象附加到选择键。此方法可以将任何Java POJO对象作为附件添加到SelectionKey实例。此方法非常重要,因为在单线程版本的Reactor模式实现中可以将Handler实例作为附件添加到SelectionKey实例。


(2)Object attachment():从选择键获取附加对象。此方法与attach(Object o)是配套使用的,其作用是取出之前通过attach(Object o)方法添加到SelectionKey实例的附加对象。这个方法同样非常重要,当IO事件发生时,选择键将被select方法查询出来,可以直接将选择键的附件对象取出。


在Reactor模式实现中,通过attachment()方法所取出的是之前通过attach(Object o)方法绑定的Handler实例,然后通过该Handler实例完成相应的传输处理。


总之,在Reactor模式中,需要将attach和attachment结合使用:在选择键注册完成之后调用attach()方法,将Handler实例绑定到选择键;当IO事件发生时调用attachment()方法,可以从选择键取出Handler实例,将事件分发到Handler处理器中完成业务处理。


单线程Reactor的参考代码

Doug Lea在“Scalable IO in Java”一文中实现了一个单线程Reactor模式的参考代码。这里,我们站在巨人的肩膀上,借鉴Doug Lea的实现,对Reactor模式进行介绍。为了方便说明,本书对Doug Lea的参考代码进行一些适当的修改。具体的参考代码如下:


package com.crazymakercircle.ReactorModel;

//省略import

//单线程Reactor

class EchoServerReactor implements Runnable {

   Selector selector;

   ServerSocketChannel serverSocket;

   //构造函数

   EchoServerReactor() throws IOException {

       //省略:打开选择器、serverSocket连接监听通道

       //注册serverSocket的accept新连接接收事件

       SelectionKey sk =serverSocket.register(selector,

SelectionKey.OP_ACCEPT);

       //将新连接处理器作为附件,绑定到sk选择键

       sk.attach(new AcceptorHandler());

   }

   public void run() {

       //选择器轮询

       try {

           while (!Thread.interrupted()) {

               selector.select();

               Set selected = selector.selectedKeys();

               Iterator it = selected.iterator();

               while (it.hasNext()) {

                  //反应器负责dispatch收到的事件

                   SelectionKey sk=it.next();

                   dispatch(sk);

               }

               selected.clear();

           }

       } catch (IOException ex) { ex.printStackTrace(); }

   }

   //反应器的分发事件

   void dispatch(SelectionKey k) {

       Runnable handler = (Runnable) (k.attachment());

       //调用之前绑定到选择键的handler对象

       if (handler != null) {

           handler.run();

       }

   }

   //处理器:处理新连接

   class AcceptorHandler implements Runnable {

       public void run() {

           //接受新连接

           SocketChannel channel = serverSocket.accept();

           //需要为新连接创建一个输入输出的handler

            if (channel != null)

                   new IOHandler(selector, channel);

 }

   }

   //…

}


在上面的代码中设计了一个Handler,叫作AcceptorHandler处理器,它是一个内部类。在注册serverSocket服务监听连接的接受事件之后,创建一个AcceptorHandler新连接处理器的实例作为附件,被附加(attach)到SelectionKey中。


       //注册serverSocket的accept新连接接收事件

       SelectionKey sk =serverSocket.register(selector,

SelectionKey.OP_ACCEPT);

       //将新连接处理器作为附件,绑定到sk选择键

      sk.attach(new AcceptorHandler());

当新连接事件发生后,取出之前附加到SelectionKey中的Handler业务处理器进行socket的各种IO处理。

   //反应器的分发事件

   void dispatch(SelectionKey k) {

       Runnable handler = (Runnable) (k.attachment());

       //调用之前绑定到选择键的handler对象

       if (handler != null) {

           handler.run();

       }

   }

处理器AcceptorHandler的两大职责是完成新连接的接收工作、为新连接创建一个负责数据传输的Handler(称之为IOHandler)。

   //处理器:处理新连接

   class AcceptorHandler implements Runnable {

       public void run() {

           //接受新连接

           SocketChannel channel = serverSocket.accept();

           //需要为新连接创建一个输入输出的handler

            if (channel != null)

                   new IOHandler(selector, channel);

 }

   }

   //…

}


顾名思义,IOHandler就是负责socket连接的数据输入、业务处理、结果输出。该处理器的示例代码大致如下:


package com.crazymakercircle.ReactorModel;

//负责数据传输的Handler

class IOHandler implements Runnable {

   final SocketChannel channel;

   final SelectionKey sk;

   IOHandler (Selector selector, SocketChannel c) {

       channel = c;

       c.configureBlocking(false);

       //与之前的注册方式不同,先仅仅取得选择键,之后再单独设置感兴趣的IO事件

       sk = channel.register(selector, 0);  //仅仅取得选择键

       //将Handler处理器作为选择键的附件

       sk.attach(this);

       //注册读写就绪事件

       sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);

   }

   public void run()  {

   //…处理输入和输出

  }

}

在传输处理器IOHandler的构造器中,有两点比较重要:

(1)将新的SocketChannel传输通道注册到Reactor类的同一个选择器中。这样保证了Reactor在查询IO事件时能查询到Handler注册到选择器的IO事件(数据传输事件)。

(2)Channel传输通道注册完成后,将IOHandler实例自身作为附件附加到选择键中。这样,在Reactor类分发事件(选择键)时,能执行到IOHandler的run()方法,完成数据传输处理。


如果由于上面的示例代码过于复杂而导致不能被快速理解,可以参考下面的EchoServer回显服务器实例,自己动手开发一个可以执行的单线程反应器实例。


单线程Reactor模式的EchoServer的实战案例

EchoServer的功能很简单:读取客户端的输入并回显到客户端,所以也叫回显服务器。基于Reactor模式来实现,设计三个重要的类:


(1)设计一个反应器类:EchoServerReactor类。


(2)设计两个处理器类:AcceptorHandler新连接处理器、EchoHandler回显处理器。


反应器类EchoServerReactor的实现思路和前面的示例代码基本上相同,具体如下:


package com.crazymakercircle.ReactorModel;

//省略import

//反应器

public class EchoServerReactor implements Runnable {

   Selector selector;

   ServerSocketChannel serverSocket;

   EchoServerReactor() throws IOException {

        //...获取选择器、开启serverSocket服务监听通道
       selector = Selector.open();

       serverSocket = ServerSocketChannel.open();

       serverSocket.configureBlocking(false);

       serverSocket.bind(new InetSocketAddress(9000));

       serverSocket.register(selector, SelectionKey.OP_ACCEPT);

       //...绑定AcceptorHandler新连接处理器到selectKey
       SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

       sk.attach(new AcceptorHandler());

   }

   //轮询和分发事件
   @Override
   public void run() {

       try {

           while (!Thread.interrupted()) {

               selector.select();

               Set<SelectionKey> selected = selector.selectedKeys();

               Iterator<SelectionKey> it = selected.iterator();

               while (it.hasNext()) {

                   //反应器负责dispatch收到的事件
                   SelectionKey sk = it.next();

                   dispatch(sk);

               }

               selected.clear();

           }

       } catch (IOException ex) {

           ex.printStackTrace();

         }

   }

   void dispatch(SelectionKey sk) {

       Runnable handler = (Runnable) sk.attachment();

       //调用之前attach绑定到选择键的handler处理器对象
       if (handler != null) {

           handler.run();

       }

   }

   // Handler:新连接处理器
   class AcceptorHandler implements Runnable {

       @Override
       public void run() {

           try {

               //只有一个线程接收,accept来了连接,EchoHandler才能处理
               SocketChannel channel = serverSocket.accept();


               System.out.println("收到客户端连接"+channel.socket().getLocalAddress()+"--"+channel.socket().getPort());

               if (channel != null) {

                   new EchoHandler(selector, channel);

               }

           } catch (IOException e) {

               e.printStackTrace();

            }

       }

   }

   public static void main(String[] args) throws IOException {

       new Thread(new EchoServerReactor()).start();

   }

}

第二个处理器为EchoHandler回显处理器,也是一个传输处理器,主要是完成客户端的内容读取和回显,具体如下:

import com.crazymakercircle.util.Logger;

//省略import

public class EchoHandler implements Runnable {

   final SocketChannel channel;

   final SelectionKey sk;

   final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

   static final int RECIEVING = 0, SENDING = 1;

   int state = RECIEVING;

   EchoHandler(Selector selector, SocketChannel c) throws IOException {

       channel = c;

       c.configureBlocking(false);

       //取得选择键,再设置感兴趣的IO事件
       sk = channel.register(selector, 0);

       //Handler自身作为选择键的附件
       sk.attach(this);

       //注册Read就绪事件
       sk.interestOps(SelectionKey.OP_READ);

       selector.wakeup();

   }


   @Override
   public void run() {

       try {

           if (state == SENDING) {

               //写入通道
               channel.write(byteBuffer);

               //写完后,准备开始从通道读,byteBuffer切换成写入模式
               byteBuffer.clear();

               //写完后,注册read就绪事件
               sk.interestOps(SelectionKey.OP_READ);

               //写完后,进入接收的状态
               state = RECIEVING;

           } else if (state == RECIEVING) {

               //从通道读-->写入到buffer
               int length = 0;

               while ((length = channel.read(byteBuffer)) > 0) {

                   System.out.println(new String(byteBuffer.array(), 0, length));

               }

               //读完后,准备开始写入通道,byteBuffer切换成读取模式
               byteBuffer.flip();

               //读完后,注册write就绪事件
               sk.interestOps(SelectionKey.OP_WRITE);

               //读完后,进入发送的状态
               state = SENDING;

           }

           //处理结束了, 这里不能关闭select key,需要重复使用
           //sk.cancel();
       } catch (IOException ex) {

           ex.printStackTrace();

         }

   }

}

以上代码是一个基于Reactor模式的EchoServer回显服务器的完整实现。它是一个单线程版本的Reactor模式,Reactor和所有的Handler实例都在同一条线程中执行。


运行EchoServerReactor类中的main()方法,可以启动回显服务器。如果要看到具体的回显输出,还需要启动客户端程序。客户端的代码在同一个包下,其类名为EchoClient,它的主要职责为数据的发送。打开源代码工程,直接运行即可。


以上代码是一个基于Reactor模式的EchoServer回显服务器的完整实现。它是一个单线程版本的Reactor模式,Reactor和所有的Handler实例都在同一条线程中执行。运行EchoServerReactor类中的main()方法,可以启动回显服务器。如果要看到具体的回显输出,还需要启动客户端程序。客户端的代码在同一个包下,其类名为EchoClient,它的主要职责为数据的发送。

public class EchoClientReactor {

   private String SOCKET_SERVER_IP="127.0.0.1";

   private Integer SOCKET_SERVER_PORT=9000;

   public void start() throws IOException {


       InetSocketAddress address =

               new InetSocketAddress(SOCKET_SERVER_IP,SOCKET_SERVER_PORT);


       // 1、获取通道(channel
       SocketChannel socketChannel = SocketChannel.open(address);

       // 2、切换成非阻塞模式
       socketChannel.configureBlocking(false);

       //不断的自旋、等待连接完成,或者做一些其他的事情
       while (!socketChannel.finishConnect()) {


       }

       System.out.println("客户端启动成功!");


       //启动接受线程
       Processer processer = new Processer(socketChannel);

       new Thread(processer).start();


   }


   static class Processer implements Runnable {

       final Selector selector;

       final SocketChannel channel;

       private Integer SEND_BUFFER_SIZE =1000;

       Processer(SocketChannel channel) throws IOException {

           //Reactor初始化
           selector = Selector.open();

           this.channel = channel;

           channel.register(selector,

                   SelectionKey.OP_READ | SelectionKey.OP_WRITE);

       }


       @Override
       public void run() {

           try {

               while (!Thread.interrupted()) {

                   selector.select();

                   Set<SelectionKey> selected = selector.selectedKeys();

                   Iterator<SelectionKey> it = selected.iterator();

                   while (it.hasNext()) {

                       SelectionKey sk = it.next();

                       if (sk.isWritable()) {

                           ByteBuffer buffer = ByteBuffer.allocate(SEND_BUFFER_SIZE);


                           Scanner scanner = new Scanner(System.in);

                           System.out.println("请输入发送内容:");

                           if (scanner.hasNext()) {

                               SocketChannel socketChannel = (SocketChannel) sk.channel();

                               String next = scanner.next();

                               buffer.put((new Date() + " >>" + next).getBytes());

                               buffer.flip();

                               // 操作三:通过DatagramChannel数据报通道发送数据
                               socketChannel.write(buffer);

                               buffer.clear();

                           }


                       }

                       if (sk.isReadable()) {

                           // 若选择键的IO事件是可读事件,读取数据
                           SocketChannel socketChannel = (SocketChannel) sk.channel();


                           //读取数据
                           ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

                           int length = 0;

                           while ((length = socketChannel.read(byteBuffer)) > 0) {

                               byteBuffer.flip();

                               System.out.println("server echo:" + new String(byteBuffer.array(), 0, length));

                               byteBuffer.clear();

                           }


                       }

                       //处理结束了, 这里不能关闭select key,需要重复使用
                       //selectionKey.cancel();
                   }

                   selected.clear();

               }

           } catch (IOException ex) {

               ex.printStackTrace();

           }

       }

   }


   public static void main(String[] args) throws IOException {

       new EchoClientReactor().start();

   }


单线程Reactor模式的缺点

单线程Reactor模式是基于Java的NIO实现的。相对于传统的多线程OIO,Reactor模式不再需要启动成千上万条线程,避免了线程上下文的频繁切换,服务端的效率自然是大大提升了。在单线程Reactor模式中,Reactor和Handler都在同一条线程中执行。

这样,带来了一个问题:当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,被阻塞的Handler不仅仅负责输入和输出处理的传输处理器,还包括负责新连接监听的AcceptorHandler处理器,可能导致服务器无响应。这是一个非常严重的缺陷,导致单线程反应器模型在生产场景中使用得比较少。除此之外,目前的服务器都是多核的,单线程Reactor模式模型不能充分利用多核资源。总之,在高性能服务器应用场景中,单线程Reactor模式实际使用的很少。



多线程Reactor模式


Reactor和Handler挤在单个线程中会造成非常严重的性能缺陷,可以使用多线程来对基础的Reactor模式进行改造和演进。


多线程版本的Reactor模式演进


多线程Reactor的演进分为两个方面:

(1)升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。

(2)升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。

总体来说,多线程版本的Reactor模式大致如下:

(1)将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听受到阻塞。

(2)如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力,也大大提升了反应器管理大量连接或者监听大量传输通道的能力。


多线程版本Reactor的实战案例

在前面的“回显服务器”(EchoServerReactor)的基础上完成多线程反应器的升级。多线程反应器的实战案例设计如下:

(1)引入多个选择器。

(2)设计一个新的子反应器(SubReactor)类,子反应器负责查询一个选择器。

(3)开启多个处理线程,一个处理线程负责执行一个子反应器。

为了提升效率,可以让SubReactor的数量和选择器的数量一致,避免多个线程负责一个选择器,导致需要进行线程同步,引起效率降低。多线程版本反应器MultiThreadEchoServerReactor的逻辑模型如图所示。


image.png

多线程版本反应器MultiThreadEchoServerReactor的参考代码大致如下:


//....反应器
public class MultiThreadEchoServerReactor {

   ServerSocketChannel serverSocket;

   AtomicInteger next = new AtomicInteger(0);

   private String SOCKET_SERVER_IP="127.0.0.1";

   private Integer SOCKET_SERVER_PORT=9000;

   //选择器集合,引入多个选择器
   Selector[] selectors = new Selector[2];

   //引入多个子反应器
   SubReactor[] subReactors = null;

   MultiThreadEchoServerReactor() throws IOException {

       //初始化多个选择器
       selectors[0] = Selector.open();

       selectors[1] = Selector.open();

       serverSocket = ServerSocketChannel.open();

       InetSocketAddress address =

               new InetSocketAddress(SOCKET_SERVER_IP,

       SOCKET_SERVER_PORT);

       serverSocket.socket().bind(address);

       //非阻塞
       serverSocket.configureBlocking(false);

       //第一个选择器,负责监控新连接事件
       SelectionKey sk =

               serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);

       //绑定Handlerattach新连接监控handler处理器到SelectionKey(选择键)
       sk.attach(new AcceptorHandler());

       //第一个子反应器,一子反应器负责一个选择器
       SubReactor subReactor1 = new SubReactor(selectors[0]);

       //第二个子反应器,一子反应器负责一个选择器
       SubReactor subReactor2 = new SubReactor(selectors[1]);

       subReactors = new SubReactor[]{subReactor1, subReactor2};

   }

   private void startService() {

       // 一子反应器对应一个线程
       new Thread(subReactors[0]).start();

       new Thread(subReactors[1]).start();

   }

   //子反应器
   class SubReactor implements Runnable {

       //每个线程负责一个选择器的查询和选择
       final Selector selector;

       public SubReactor(Selector selector) {

           this.selector = selector;

       }

       public void run() {

           try {

               while (!Thread.interrupted()) {

                   selector.select();

                   Set<SelectionKey> keySet = selector.selectedKeys();

                   Iterator<SelectionKey> it = keySet.iterator();

                   while (it.hasNext()) {

                       //反应器负责dispatch收到的事件
                       SelectionKey sk = it.next();

                       dispatch(sk);

                   }

               keySet.clear();

               }

           } catch (IOException ex) {

               ex.printStackTrace();

             }

       }

       void dispatch(SelectionKey sk) {

           Runnable handler = (Runnable) sk.attachment();

           //调用之前attach绑定到选择键的handler处理器对象
           if (handler != null) {

               handler.run();

           }

       }

   }

   // Handler:新连接处理器
   class AcceptorHandler implements Runnable {

       public void run() {

           try {

               SocketChannel channel = serverSocket.accept();

               if (channel != null)

                   new MultiThreadEchoHandler(selectors[next.get()], channel);

           } catch (IOException e) {

               e.printStackTrace();

             }

           if (next.incrementAndGet() == selectors.length) {

               next.set(0);

           }

       }

   }

   public static void main(String[] args) throws IOException {

       MultiThreadEchoServerReactor server =

                  new MultiThreadEchoServerReactor();

       server.startService();

   }

}


上面是反应器的多线程版本演进代码,创建了两个子反应器,负责查询和分发两个选择器的事件。


总共有两个选择器:第一个选择器专门负责查询和分发新连接事件,第二个选择器专门负责查询和分发IO传输事件。


总共有两条事件轮询线程:第一条线程为新连接事件轮询线程,专门轮询第一个选择器;第二条线程为IO事件轮询线程,专门轮询第二个选择器。

服务端的监听通道注册到第一个选择器,而所有的Socket传输通道都注册到第二个选择器,从而实现了新连接监听和IO读写事件监听的线程分离。

接下来为大家演示一下Handler的多线程演进。

多线程版本Handler的实战案例

仍然基于前面的单线程Reactor模式的回显处理器的程序代码加以改进,新的回显处理器为MultiThreadEchoHandler,主要的升级是引入了一个线程池(ThreadPool),使得数据传输和业务处理的代码执行在独立的线程池中,彻底地做到IO处理以及业务处理线程和反应器IO事件轮询线程的完全隔离。这个实战案例的代码如下:

//多线程版本Handler的实战案例
public class MultiThreadEchoHandler implements Runnable {

   final SocketChannel channel;

   final SelectionKey sk;

   final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

   static final int RECIEVING = 0, SENDING = 1;

   int state = RECIEVING;

   //引入线程池
   static ExecutorService pool = Executors.newFixedThreadPool(4);

   MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {

       channel = c;

       c.configureBlocking(false);

       //取得选择键,、再设置感兴趣的IO事件
       sk = channel.register(selector, 0);

       //将本Handler作为sk选择键的附件,方便事件分发(dispatch
       sk.attach(this);

       //sk选择键注册Read就绪事件
       sk.interestOps(SelectionKey.OP_READ);

       selector.wakeup();

   }

   @Override
   public void run() {

       //异步任务,在独立的线程池中执行
       pool.execute(new AsyncTask());

   }

   //业务处理,不在反应器线程中执行
   public synchronized void asyncRun() {

       try {

           if (state == SENDING) {

               //写入通道
               channel.write(byteBuffer);

               //写完后,准备开始从通道读,byteBuffer切换成写入模式
               byteBuffer.clear();

               //写完后,注册read就绪事件
               sk.interestOps(SelectionKey.OP_READ);

               //写完后,进入接收的状态
               state = RECIEVING;

           } else if (state == RECIEVING) {

               //从通道读
               int length = 0;

               while ((length = channel.read(byteBuffer)) > 0) {

                   System.out.println(new String(byteBuffer.array(), 0, length));

               }

               //读完后,准备开始写入通道,byteBuffer切换成读取模式
               byteBuffer.flip();

               //读完后,注册write就绪事件
               sk.interestOps(SelectionKey.OP_WRITE);

               //读完后,进入发送的状态
               state = SENDING;

           }

           //处理结束了, 这里不能关闭select key,需要重复使用
           //sk.cancel();
       } catch (IOException ex) {

           ex.printStackTrace();

         }

   }

   //异步任务的内部类
   class AsyncTask implements Runnable {

       @Override
       public void run() {

           MultiThreadEchoHandler.this.asyncRun();

       }

   }

}


以上代码中,IO操作和业务处理被提交到线程池中异步执行,为了避免发送和读取的状态混乱,需要进行线程安全处理,这里在asyncRun()方法的前面加上synchronized同步修饰符。至此,多线程版本的Reactor模式实战案例的代码介绍完毕,可以开始执行新版本的多线程MultiThreadEchoServerReactor服务器。当然,也可以执行之前的EchoClient客户端程序,完成整个回显的通信演示。


由于演示程序的输出结果与前面单线程版本的EchoServer运行输出是一模一样的,因此这里不再贴出程序的执行结果。


Reactor模式的优缺点

在总结Reactor模式的优点和缺点之前,先看看Reactor模式和其他模式的对比,加强一下对它的理解。(1)Reactor模式和生产者消费者模式对比二者的相似之处:在一定程度上,Reactor模式有点类似生产者消费者模式。在生产者消费者模式中,一个或多个生产者将事件加入一个队列中,一个或多个消费者主动从这个队列中拉取(Pull)事件来处理。

二者的不同之处:Reactor模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler来处理。

(2)Reactor模式和观察者模式对比二者的相似之处:在Reactor模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步分发这些IO事件。


观察者模式(Observer Pattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时会通知所有观察者,它们能够执行相应的处理。


二者的不同之处:在Reactor模式中,Handler实例和IO事件(选择键)的订阅关系基本上是一个事件绑定到一个Handler,每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler,也就是一个事件只能被一个Handler处理;在观察者模式中,同一时刻、同一主题可以被订阅过的多个观察者处理。


最后,总结一下Reactor模式的优点和缺点。作为高性能的IO模式,

Reactor模式的优点如下:

响应快,虽然同一反应器线程本身是同步的,但是不会被单个连接的IO操作所阻塞。编程相对简单,最大限度避免了复杂的多线程同步,也避免了多线程各个进程之间切换的开销。

可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。

Reactor模式的缺点如下:

Reactor模式增加了一定的复杂性,因而有一定的门槛,并且不易于调试。Reactor模式依赖于操作系统底层的IO多路复用系统调用的支持,如Linux中的epoll系统调用。如果操作系统的底层不支持IO多路复用,Reactor模式不会那么高效

同一个Handler业务线程中,如果出现一个长时间的数据读写,就会影响这个反应器中其他通道的IO处理。例如,在大文件传输时,IO操作就会影响其他客户端的响应时间。对于这种操作,还需要进一步对Reactor模式进行改进。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
相关文章
|
5月前
|
消息中间件 Kubernetes NoSQL
Reactor 和 Proactor 区别
Reactor 和 Proactor 区别
|
Java 调度
【Netty】主从反应器 ( Reactor ) 多线程模型
【Netty】主从反应器 ( Reactor ) 多线程模型
776 0
【Netty】主从反应器 ( Reactor ) 多线程模型
|
17天前
|
Java
Reactor模式
通过一个具体的Java代码示例展示了如何在NIO框架下实现Reactor模式,用于处理网络IO事件,包括事件的接收、分发和处理。
27 4
Reactor模式
|
5月前
|
Java 调度
【Netty 网络通信】Reactor模式
【1月更文挑战第9天】【Netty 网络通信】Reactor模式
|
5月前
|
监控 Java 应用服务中间件
Reactor反应器模式
在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。 为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。
|
网络协议 数据处理
Reactor模式(二)
Reactor模式
78 0
|
设计模式 网络协议 数据处理
Reactor模式(一)
Reactor模式
111 0
|
Java
Netty「基石」之Reactor模式
Netty「基石」之Reactor模式
189 0
|
JavaScript 安全 Java
深入Netty逻辑架构,从Reactor线程模型开始(一)
深入Netty逻辑架构,从Reactor线程模型开始(一)
393 0
深入Netty逻辑架构,从Reactor线程模型开始(一)
|
消息中间件 Java 调度
深入Netty逻辑架构,从Reactor线程模型开始(二)
深入Netty逻辑架构,从Reactor线程模型开始(二)
213 0
深入Netty逻辑架构,从Reactor线程模型开始(二)