Reactor模式笔记

简介: Reactor模式笔记

为何要用Reactor


1BIO


3.png


下面是采用BIO的方式进行网络连接


{
    // 创建一个serverSocket对象,相当于服务器,并且自己设定端口,最好设置1024以后
    ServerSocket serverSocket = new ServerSocket(8888);
   while (true){
     // 调用accept方法监听访问的Socket
     Socket socket = serverSocket.accept();
      System.out.println("接受到新socket...");
      new Thread(
              new Runnable() {
                @Override
                public void run() {
                  try {
                    // 从socket中读数据
                    InputStream is = socket.getInputStream();
                    InputStreamReader isr = new InputStreamReader(is);
                    BufferedReader bs = new BufferedReader(isr);
                    String str = "";
                    while ((str = bs.readLine()) != null) {
                      System.out.println("我是服务器,客户端说:" + str);
                    }
                    // 关闭输入流
                    socket.shutdownInput();
                    // ---->下面是服务器响应客户端
                    // 获得输出流
                    OutputStream os = socket.getOutputStream();
                    // 写入数据
                    PrintWriter pw = new PrintWriter(os);
                    pw.write("欢迎您:" + new Date().toString());
                    pw.flush();
                    // 关闭输出流资源
                    socket.shutdownOutput();
                    pw.close();
                    os.close();
                    // 关闭输入流资源
                    bs.close();
                    isr.close();
                    is.close();
                  } catch (Exception e) {
                    e.printStackTrace();
                  }
                }
              })
          .start();
   }
  }


其中出现的问题就是  


1.同步阻塞IO,读写阻塞,线程等待时间过长


2.在制定线程策略的时候,只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。


3.多线程之间的上下文切换,造成线程使用效率并不高,并且不易扩展


2NIO


{
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(10022));
    Selector selector = Selector.open();
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (selector.select() > 0) {
      Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
      while (iterator.hasNext()) {
        SelectionKey next = iterator.next();
        if (next.isAcceptable()) {
          SocketChannel socketChannel = serverSocketChannel.accept();
          socketChannel.configureBlocking(false);
          socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (next.isReadable()) {
          SocketChannel socketChannel = (SocketChannel) next.channel();
          // 分配指定大小缓冲区
          ByteBuffer buffer = ByteBuffer.allocate(1024 * 5);
          int len = 0;
          while ((len = socketChannel.read(buffer)) > 0) {
            buffer.flip();
            System.out.println(new String(buffer.array(), 0, len));
            buffer.clear();
          }
        }
        iterator.remove();
      }
    }
  }


  1. 1.非阻塞的IO读写

  2. 2.基于IO事件进行分发任务,同时支持对多个fd的监听


Reactor模式


请看参考里的视频,讲的比较仔细


4.png


Handle(句柄或是描述符):本质上表示一种资源,是由操作系统提供的;该资源用于一个个的事件,比如说文件描述符号,或事针对网络编程中的Socket描述符。事件即可以来自外部,也可以来自内部;外部事件比如说客户端的连接请求,客户端发送过来的数据等;内部事件比如说操作系统的定时器事件等,。它本质上就是一个文件描述符。Handle是事件产生的发源地。


Synchronous Event Demultiplexer(同步事件分离器):它本身是一个系统的调用,用于等待事件的发生(事件可能是一个,也可能是多个)。调用方在调用它的时候会阻塞,一直阻塞到同步事件分离器有事件产生为止。对于Linux来说,同步事件分离器指的就是常用的I/0多路复用机制,比如select、poll、epoll等。在Java NIO集合中,同步事件分离器对应的组件就是Selector;对应的阻塞方法就是select方法。


Event Handler(事件处理器):本身由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的反馈机制。Netty相比于Java NIO来说,在事件处理器这个角色进行了一个升级,它为我们开发者提供了大量的回调方法,供我们在特定的事件产生时实现相应的回调方法进行业务逻辑的处理。


Concrete Event Handler(具体事件处理器):是事件处理器的实现。它本身实现了事件处理器所提供的各个回调方法,从而实现了特定与业务的逻辑。它本质上就是我们所编写的一个个处理器实现。


Initiation Dispatcher(初始分发器):事件上就是Reactor角色。他本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等设备。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过同步事件分离器来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分理处每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件。


Reactor模式的流程


1.当应用向Initiation Dispatcher 注册具体的事件处理器时,应用会标识出事件处理器希望Initiation Dispatcher 在某个事件发生时向其通知的该事件,该事件与Handle关联。

2.Initiation Dispatcher会要求每个事件处理器向其传递内部的Handle。该Handle向操作系统标识了事件处理器。

3.当所有的事件处理器注册完毕后,应用会调用handle_events方法来启动Initiation Dispatcher的事件循环。这时,Initiation Dispatcher会将每个注册的事件管理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。比如,TCP协议层会使用select同步事件分离器操作来等待客户端发送的数据到达连接的socket handle上。

4.当与某个事件源对应的Handle变为ready状态时(比如说,TCP socket变为等待读状态时) ,同步事件分离器就会通知Initiation Dispatcher。

5. Initiation Dispatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的Handle。当事件发生时,Initiation Dispatcher 会将被事件源激活的Handle作为key 来寻找并分发恰当的事件处理器回调方法。

6. Initiation Dispatcher会回调事件处理器的handle_ events回调方法来执行特定于应用的功能(开发者自己所编写的功能),从而响应这个事件。所发生的事件类型可以作为该方法参数并被该方法内部使用来执行额外的特定于服务的分离与分发。


两种主流Reactor图片比较


5.png


6.png


一般百度Reactor模式就是上面的两张图片,其实两张图片的内容差不多,现在从比较这两张图片中进行分析这两张图片。


左图中的Initiation Dispatcher就是右边的Reactor(mainReactor和subReactor)


左图中的handle就是有图中的Handle


左图中的Synchronous Event Demultiplexer其实属于Initiation Dispatcher的一部分,所以实际也相当于在右图中的Reactor中。


左图中的Event Handler对应右图中的read和send


左图中的Concrete Event Handler是Event Handler的实现。


Reactor与Netty的关系


下面是Netty服务器端的一个HelloWorld


EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          // 保持连接
          .childOption(ChannelOption.SO_KEEPALIVE, true)
          .childHandler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  // TimeClientHandler是自己定义的方法
                  socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                  socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                  socketChannel.pipeline().addLast(new MyServerHandler());
                }
              });
      // 绑定端口
      ChannelFuture f = b.bind(8888).sync();
      // 等待服务端监听端口关闭
      f.channel().closeFuture().sync();
    } catch (Exception e) {
    } finally {
      // 优雅关闭,释放线程池资源
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }


图一


5.png


其中BossEventLoopGroup和WorkBossEventLoopGroup都是Initiation Dispatcher,里面有一个Synchronous Event Demultiplexer(Selector)


Concrete Event Handler是Event Handler的实现类,则上面的代码中new MyServerHandler()就是实现类,而SimpleChannelInboundHandler和SimpleChannelOutboundHandler就是接口,即Event Handler


图二


6.png


其中BossEventLoopGroup和WorkBossEventLoopGroup就是上图中的mainRector和subReactor,


acceptor其实也是一个Event Handler,在源码中有显示


//public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
@Override
    void init(Channel channel) throws Exception {
         //省略
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                       //acceptor
                       //acceptor
                       //acceptor
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
目录
相关文章
|
6月前
|
消息中间件 Kubernetes NoSQL
Reactor 和 Proactor 区别
Reactor 和 Proactor 区别
|
1月前
|
Java
Reactor模式
通过一个具体的Java代码示例展示了如何在NIO框架下实现Reactor模式,用于处理网络IO事件,包括事件的接收、分发和处理。
36 4
Reactor模式
|
1月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
32 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
6月前
|
设计模式
深入浅出Reactor和Proactor模式
深入浅出Reactor和Proactor模式
|
6月前
|
监控 安全 Linux
reactor的原理与实现
前情回顾 网络IO,会涉及到两个系统对象:   一个是用户空间调用的进程或线程   一个是内核空间的内核系统 如果发生IO操作read时,会奖励两个阶段:
71 1
|
6月前
|
缓存
2.1.2事件驱动reactor的原理与实现
2.1.2事件驱动reactor的原理与实现
|
6月前
|
监控 Java 应用服务中间件
Reactor反应器模式
在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。 为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。
|
设计模式 网络协议 数据处理
Reactor模式(一)
Reactor模式
121 0
|
网络协议 数据处理
Reactor模式(二)
Reactor模式
83 0
灵魂一击!Netty系列笔记之Reactor模式(建议收藏)
一、什么是 Reactor 三种 IO 模式和对应的开发模式如下: BIONIOAIOThread-Per-ConnectionReactorProactor Reactor 是一种开发模式,核心流程为: 1、注册感兴趣的事件 2、扫描是否有感兴趣的事件发生 3、事件发生后做相应的处理