高并发Java:NIO和AIO(二)

简介: 高并发Java:NIO和AIO

3. Channel


多线程网络服务器的一般结构:

1.png

简单的多线程服务器:

public static void main(String[] args) throws Exception {
       ServerSocket echoServer = null;
       Socket clientSocket = null;
       try {
         echoServer = new ServerSocket(8000);
       } catch (IOException e) {
         System.out.println(e);
       }
       while (true) {
         try {
          clientSocket = echoServer.accept();
          System.out.println(clientSocket.getRemoteSocketAddress()
                 + " connect!");
          tp.execute(new HandleMsg(clientSocket));
         } catch (IOException e) {
          System.out.println(e);
         }
       }
    }

功能就是服务器端读到什么数据,就向客户端回写什么数据。

这里的tp是一个线程池,HandleMsg是处理消息的类。

static class HandleMsg implements Runnable{  
        省略部分信息                 
        public void run(){         
          try {         
           is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 
           os = new PrintWriter(clientSocket.getOutputStream(), true); 
           // 从InputStream当中读取客户端所发送的数据              
           String inputLine = null;                 
           long b=System. currentTimeMillis ();                 
           while ((inputLine = is.readLine()) != null)
           {           
               os.println(inputLine);                 
           }                 
           long e=System. currentTimeMillis ();                 
           System. out.println ("spend:"+(e - b)+" ms ");             
         } catch (IOException e) {                 
          e.printStackTrace();             
         }finally
         {  
          关闭资源 
         }     
       } 
     }

客户端:

public static void main(String[] args) throws Exception {
       Socket client = null;
       PrintWriter writer = null;
       BufferedReader reader = null;
       try {
         client = new Socket();
         client.connect(new InetSocketAddress("localhost", 8000));
         writer = new PrintWriter(client.getOutputStream(), true);
         writer.println("Hello!");
         writer.flush();
         reader = new BufferedReader(new InputStreamReader(
              client.getInputStream()));
         System.out.println("from server: " + reader.readLine());
       } catch (Exception e) {
       } finally {
         // 省略资源关闭
       }
    }

以上的网络编程是很基本的,使用这种方式,会有一些问题:

为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的准备和读取都在这个线程中。此时,如果客户端数量众多,可能会消耗大量的系统资源。


解决方案:

使用非阻塞的NIO (读取数据不等待,数据准备好了再工作)

为了体现NIO使用的高效。


这里先模拟一个低效的客户端来模拟因网络而延时的情况:

private static ExecutorService tp= Executors.newCachedThreadPool();  
       private static final int sleep_time=1000*1000*1000;  
       public static class EchoClient implements Runnable{   
         public void run(){          
          try {              
              client = new Socket();              
              client.connect(new InetSocketAddress("localhost", 8000)); 
              writer = new PrintWriter(client.getOutputStream(), true); 
              writer.print("H");              
              LockSupport.parkNanos(sleep_time);       
              writer.print("e");           
              LockSupport.parkNanos(sleep_time);      
              writer.print("l");       
              LockSupport.parkNanos(sleep_time);  
              writer.print("l");       
              LockSupport.parkNanos(sleep_time);  
              writer.print("o");     
              LockSupport.parkNanos(sleep_time);  
              writer.print("!");         
              LockSupport.parkNanos(sleep_time);    
              writer.println();      
              writer.flush(); 
          }catch(Exception e)
          {
          }
         }
       }

服务器端输出:

spend:6000ms 
spend:6000ms 
spend:6000ms 
spend:6001ms 
spend:6002ms 
spend:6002ms 
spend:6002ms 
spend:6002ms 
spend:6003ms 
spend:6003ms

因为

while ((inputLine = is.readLine()) != null)

是阻塞的,所以时间都花在等待中。

如果用NIO来处理这个问题会怎么做呢?

NIO有一个很大的特点就是:把数据准备好了再通知我

而Channel有点类似于流,一个Channel可以和文件或者网络Socket对应 。

1.png

selector是一个选择器,它可以选择某一个Channel,然后做些事情。


一个线程可以对应一个selector,而一个selector可以轮询多个Channel,而每个Channel对应了一个Socket。


与上面一个线程对应一个Socket相比,使用NIO后,一个线程可以轮询多个Socket。


当selector调用select()时,会查看是否有客户端准备好了数据。当没有数据被准备好时,select()会阻塞。平时都说NIO是非阻塞的,但是如果没有数据被准备好还是会有阻塞现象。


当有数据被准备好时,调用完select()后,会返回一个SelectionKey,SelectionKey表示在某个selector上的某个Channel的数据已经被准备好了。


只有在数据准备好时,这个Channel才会被选择。


这样NIO实现了一个线程来监控多个客户端。


而刚刚模拟的网络延迟的客户端将不会影响NIO下的线程,因为某个Socket网络延迟时,数据还未被准备好,selector是不会选择它的,而会选择其他准备好的客户端。


selectNow()与select()的区别在于,selectNow()是不阻塞的,当没有客户端准备好数据时,selectNow()不会阻塞,将返回0,有客户端准备好数据时,selectNow()返回准备好的客户端的个数。


主要代码:

package test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreadNIOEchoServer {
    public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();
    class EchoClient {
       private LinkedList<ByteBuffer> outq;
       EchoClient() {
         outq = new LinkedList<ByteBuffer>();
       }
       public LinkedList<ByteBuffer> getOutputQueue() {
         return outq;
       }
       public void enqueue(ByteBuffer bb) {
         outq.addFirst(bb);
       }
    }
    class HandleMsg implements Runnable {
       SelectionKey sk;
       ByteBuffer bb;
       public HandleMsg(SelectionKey sk, ByteBuffer bb) {
         super();
         this.sk = sk;
         this.bb = bb;
       }
       @Override
       public void run() {
         // TODO Auto-generated method stub
         EchoClient echoClient = (EchoClient) sk.attachment();
         echoClient.enqueue(bb);
         sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
         selector.wakeup();
       }
    }
    private Selector selector;
    private ExecutorService tp = Executors.newCachedThreadPool();
    private void startServer() throws Exception {
       selector = SelectorProvider.provider().openSelector();
       ServerSocketChannel ssc = ServerSocketChannel.open();
       ssc.configureBlocking(false);
       InetSocketAddress isa = new InetSocketAddress(8000);
       ssc.socket().bind(isa);
       // 注册感兴趣的事件,此处对accpet事件感兴趣
       SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
       for (;;) {
         selector.select();
         Set readyKeys = selector.selectedKeys();
         Iterator i = readyKeys.iterator();
         long e = 0;
         while (i.hasNext()) {
          SelectionKey sk = (SelectionKey) i.next();
          i.remove();
          if (sk.isAcceptable()) {
              doAccept(sk);
          } else if (sk.isValid() && sk.isReadable()) {
              if (!geym_time_stat.containsKey(((SocketChannel) sk
                   .channel()).socket())) {
                 geym_time_stat.put(
                    ((SocketChannel) sk.channel()).socket(),
                    System.currentTimeMillis());
              }
              doRead(sk);
          } else if (sk.isValid() && sk.isWritable()) {
              doWrite(sk);
              e = System.currentTimeMillis();
              long b = geym_time_stat.remove(((SocketChannel) sk
                   .channel()).socket());
              System.out.println("spend:" + (e - b) + "ms");
          }
         }
       }
    }
    private void doWrite(SelectionKey sk) {
       // TODO Auto-generated method stub
       SocketChannel channel = (SocketChannel) sk.channel();
       EchoClient echoClient = (EchoClient) sk.attachment();
       LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
       ByteBuffer bb = outq.getLast();
       try {
         int len = channel.write(bb);
         if (len == -1) {
          disconnect(sk);
          return;
         }
         if (bb.remaining() == 0) {
          outq.removeLast();
         }
       } catch (Exception e) {
         // TODO: handle exception
         disconnect(sk);
       }
       if (outq.size() == 0) {
         sk.interestOps(SelectionKey.OP_READ);
       }
    }
    private void doRead(SelectionKey sk) {
       // TODO Auto-generated method stub
       SocketChannel channel = (SocketChannel) sk.channel();
       ByteBuffer bb = ByteBuffer.allocate(8192);
       int len;
       try {
         len = channel.read(bb);
         if (len < 0) {
          disconnect(sk);
          return;
         }
       } catch (Exception e) {
         // TODO: handle exception
         disconnect(sk);
         return;
       }
       bb.flip();
       tp.execute(new HandleMsg(sk, bb));
    }
    private void disconnect(SelectionKey sk) {
       // TODO Auto-generated method stub
       //省略略干关闭操作
    }
    private void doAccept(SelectionKey sk) {
       // TODO Auto-generated method stub
       ServerSocketChannel server = (ServerSocketChannel) sk.channel();
       SocketChannel clientChannel;
       try {
         clientChannel = server.accept();
         clientChannel.configureBlocking(false);
         SelectionKey clientKey = clientChannel.register(selector,
              SelectionKey.OP_READ);
         EchoClient echoClinet = new EchoClient();
         clientKey.attach(echoClinet);
         InetAddress clientAddress = clientChannel.socket().getInetAddress();
         System.out.println("Accepted connection from "
              + clientAddress.getHostAddress());
       } catch (Exception e) {
         // TODO: handle exception
       }
    }
    public static void main(String[] args) {
       // TODO Auto-generated method stub
       MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();
       try {
         echoServer.startServer();
       } catch (Exception e) {
         // TODO: handle exception
       }
    }
}

代码仅作参考,主要的特点是,对不同事件的感兴趣来做不同的事。


当用之前模拟的那个延迟的客户端时,这次的时间消耗就在2ms到11ms之间了。性能提升是很明显的。


总结:

1. NIO会将数据准备好后,再交由应用进行处理,数据的读取/写入过程依然在应用线程中完成,只是将等待的时间剥离到单独的线程中去。

节省数据准备时间(因为Selector可以复用)


5. AIO


AIO的特点:

1. 读完了再通知我

2. 不会加快IO,只是在读完后进行通知

3. 使用回调函数,进行业务处理

AIO的相关代码:

AsynchronousServerSocketChannel

server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));

使用server上的accept方法

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

CompletionHandler为回调接口,当有客户端accept之后,就做handler中的事情。

示例代码:

server.accept(null,
          new CompletionHandler<AsynchronousSocketChannel, Object>() {
              final ByteBuffer buffer = ByteBuffer.allocate(1024);
              public void completed(AsynchronousSocketChannel result,
                   Object attachment) {
                 System.out.println(Thread.currentThread().getName());
                 Future<Integer> writeResult = null;
                 try {
                   buffer.clear();
                   result.read(buffer).get(100, TimeUnit.SECONDS);
                   buffer.flip();
                   writeResult = result.write(buffer);
                 } catch (InterruptedException | ExecutionException e) {
                   e.printStackTrace();
                 } catch (TimeoutException e) {
                   e.printStackTrace();
                 } finally {
                   try {
                    server.accept(null, this);
                    writeResult.get();
                    result.close();
                   } catch (Exception e) {
                    System.out.println(e.toString());
                   }
                 }
              }
              @Override
              public void failed(Throwable exc, Object attachment) {
                 System.out.println("failed: " + exc);
              }
          });

这里使用了Future来实现即时返回,关于Future请参考[上一篇][Link 1]

在理解了NIO的基础上,看AIO,区别在于AIO是等读写过程完成后再去调用回调函数。

NIO是同步非阻塞的

AIO是异步非阻塞的

由于NIO的读写过程依然在应用线程里完成,所以对于那些读写过程时间长的,NIO就不太适合。

而AIO的读写过程完成后才被通知,所以AIO能够胜任那些重量级,读写过程长的任务。

相关文章
|
3天前
|
缓存 Java UED
BIO、NIO、AIO有什么区别
【8月更文挑战第16天】BIO、NIO、AIO有什么区别
15 4
|
1天前
|
Java
"揭秘Java IO三大模式:BIO、NIO、AIO背后的秘密!为何AIO成为高并发时代的宠儿,你的选择对了吗?"
【8月更文挑战第19天】在Java的IO编程中,BIO、NIO与AIO代表了三种不同的IO处理机制。BIO采用同步阻塞模型,每个连接需单独线程处理,适用于连接少且稳定的场景。NIO引入了非阻塞性质,利用Channel、Buffer与Selector实现多路复用,提升了效率与吞吐量。AIO则是真正的异步IO,在JDK 7中引入,通过回调或Future机制在IO操作完成后通知应用,适合高并发场景。选择合适的模型对构建高效网络应用至关重要。
|
20天前
|
安全 Java Linux
(七)Java网络编程-IO模型篇之从BIO、NIO、AIO到内核select、epoll剖析!
IO(Input/Output)方面的基本知识,相信大家都不陌生,毕竟这也是在学习编程基础时就已经接触过的内容,但最初的IO教学大多数是停留在最基本的BIO,而并未对于NIO、AIO、多路复用等的高级内容进行详细讲述,但这些却是大部分高性能技术的底层核心,因此本文则准备围绕着IO知识进行展开。
|
30天前
|
算法 Java 调度
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
|
1月前
|
监控 网络协议 Java
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
29 0
|
Java
Java NIO系列教程三
​ 今天主要给大家介绍的是Buffer的基本使用这个也是NIO里面最总要的概率之一,里面的操作也是有一些复杂的同时也是需要大家必须要重点掌握的知识点,同时也介绍了一下Selector的用法下一篇文章我们将为大家介绍Pipe管道以及FileLock文件锁这也是NIO里面最后的一分部内容了。
89 0
|
安全 Java API
Java NIO系列教程四【完】-管道-文件锁-异步写入
​ 到此位置NIO的所有的内容都结束了,对于NIO来说主要是各种概念需要大家去理解然后有很多的用法和api也需要大家去熟悉所以想把NIO学懂学好其实并不容易一定要多写案例去测试巩固,也预祝大家能把NIO的知识看懂理顺!!!
88 0
|
网络协议 Java
Java NIO系列教程一
今天主要给大家介绍的是NIO的基本的概念以及Channel中常用的FileChannel的基本的用法,算是对Channel有一个简单的介绍。下一篇文章我们将详细的为大家介绍其他的常用Channel。
106 0
Java NIO系列教程一
|
网络协议 Java
Java NIO系列教程二
​ 今天主要是为大家详细的介绍了常见的各种Channel以及他们的用法,本文编写了大量的案例还需要大家认真的去实践以后才能真正的掌握住。介绍完Channel那么下一篇文章我们就可以为大家介绍Buffer和Selector的具体使用了。
76 0
|
Java
Java NIO系列教程(8)-SocketChannel的最佳实践(下)
Java NIO系列教程(8)-SocketChannel的最佳实践
164 0