java Nio 异步操作(三)

简介: 异步channel API 主要引入三个异步类: AsynchronousFileChannel,AsynchronousSocketChannel, and AsynchronousServerSocketChannel. AsynchronousFileChannel跟FileChannel区别:不保存全局的position和offset,可以制定访问位置,也支持并发访问文件不

异步channel API

主要引入三个异步类: AsynchronousFileChannel,AsynchronousSocketChannel, and AsynchronousServerSocketChannel.

AsynchronousFileChannel跟FileChannel区别:不保存全局的position和offset,可以制定访问位置,也支持并发访问文件不同。
AsynchronousServerSocketChannel AsynchronousSocketChannel:能够绑定到一个制定线程池的组中,这个线程池能够用future或者CompletionHandler来对执行结果进行处理,
AsynchronousChannelGroup:执行异步IO的java线程池的组类,
AsynchronousChannelGroup.java:
public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,int initialSize)
public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)​​​

我们看使用示例
  1. package com.mime;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetSocketAddress;  
  5. import java.net.StandardSocketOptions;  
  6. import java.nio.ByteBuffer;  
  7. import java.nio.channels.AsynchronousChannelGroup;  
  8. import java.nio.channels.AsynchronousFileChannel;  
  9. import java.nio.channels.AsynchronousServerSocketChannel;  
  10. import java.nio.channels.AsynchronousSocketChannel;  
  11. import java.nio.channels.CompletionHandler;  
  12. import java.nio.channels.FileLock;  
  13. import java.nio.charset.Charset;  
  14. import java.nio.file.Path;  
  15. import java.nio.file.Paths;  
  16. import java.nio.file.StandardOpenOption;  
  17. import java.util.ArrayList;  
  18. import java.util.List;  
  19. import java.util.Set;  
  20. import java.util.TreeSet;  
  21. import java.util.concurrent.Callable;  
  22. import java.util.concurrent.ExecutionException;  
  23. import java.util.concurrent.ExecutorService;  
  24. import java.util.concurrent.Executors;  
  25. import java.util.concurrent.Future;  
  26. import java.util.concurrent.ThreadLocalRandom;  
  27.   
  28. public class NIO2AsynchronousFileChannel {  
  29.     public static void main(String[] args) {  
  30.           
  31.         asyFile();  
  32.         asyFileChannel2();  
  33.         asyServerSocketChannel();  
  34.     }  
  35.   
  36.     // 异步文件读写示例  
  37.     public static void asyFile() {  
  38.         ByteBuffer buffer = ByteBuffer.allocate(100);  
  39.         String encoding = System.getProperty("file.encoding");  
  40.         Path path = Paths.get("/tmp""store.txt");  
  41.         try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel  
  42.                 .open(path, StandardOpenOption.READ)) {  
  43.             Future<Integer> result = asynchronousFileChannel.read(buffer, 0);  
  44.             // 读超时控制  
  45.             // int count = result.get(100, TimeUnit.NANOSECONDS);  
  46.   
  47.             while (!result.isDone()) {  
  48.                 System.out.println("Do something else while reading ...");  
  49.             }  
  50.             System.out.println("Read done: " + result.isDone());  
  51.             System.out.println("Bytes read: " + result.get());  
  52.   
  53.             // 使用CompletionHandler回调接口异步读取文件  
  54.             final Thread current = Thread.currentThread();  
  55.             asynchronousFileChannel.read(buffer, 0,  
  56.                     "Read operation status ...",  
  57.                     new CompletionHandler<Integer, Object>() {  
  58.                         @Override  
  59.                         public void completed(Integer result, Object attachment) {  
  60.                             System.out.println(attachment);  
  61.                             System.out.print("Read bytes: " + result);  
  62.                             current.interrupt();  
  63.                         }  
  64.   
  65.                         @Override  
  66.                         public void failed(Throwable exc, Object attachment) {  
  67.                             System.out.println(attachment);  
  68.                             System.out.println("Error:" + exc);  
  69.                             current.interrupt();  
  70.                         }  
  71.                     });  
  72.   
  73.         } catch (Exception ex) {  
  74.             System.err.println(ex);  
  75.         }  
  76.         buffer.flip();  
  77.         System.out.print(Charset.forName(encoding).decode(buffer));  
  78.         buffer.clear();  
  79.   
  80.         // 异步文件写示例  
  81.         ByteBuffer buffer1 = ByteBuffer  
  82.                 .wrap("The win keeps Nadal at the top of the heap in men's"  
  83.                         .getBytes());  
  84.         Path path1 = Paths.get("/tmp""store.txt");  
  85.         try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel  
  86.                 .open(path1, StandardOpenOption.WRITE)) {  
  87.             Future<Integer> result = asynchronousFileChannel  
  88.                     .write(buffer1, 100);  
  89.             while (!result.isDone()) {  
  90.                 System.out.println("Do something else while writing ...");  
  91.             }  
  92.             System.out.println("Written done: " + result.isDone());  
  93.             System.out.println("Bytes written: " + result.get());  
  94.   
  95.             // file lock  
  96.             Future<FileLock> featureLock = asynchronousFileChannel.lock();  
  97.             System.out.println("Waiting for the file to be locked ...");  
  98.             FileLock lock = featureLock.get();  
  99.             if (lock.isValid()) {  
  100.                 Future<Integer> featureWrite = asynchronousFileChannel.write(  
  101.                         buffer, 0);  
  102.                 System.out.println("Waiting for the bytes to be written ...");  
  103.                 int written = featureWrite.get();  
  104.                 // or, use shortcut  
  105.                 // int written = asynchronousFileChannel.write(buffer,0).get();  
  106.                 System.out.println("I’ve written " + written + " bytes into "  
  107.                         + path.getFileName() + " locked file!");  
  108.                 lock.release();  
  109.             }  
  110.   
  111.             // asynchronousFileChannel.lock("Lock operation status:", new  
  112.             // CompletionHandler<FileLock, Object>() ;  
  113.   
  114.         } catch (Exception ex) {  
  115.             System.err.println(ex);  
  116.         }  
  117.     }  
  118.   
  119.     // public static AsynchronousFileChannel open(Path file, Set<? extends  
  120.     // OpenOption> options,ExecutorService executor, FileAttribute<?>... attrs)  
  121.     // throws IOException  
  122.     private static Set withOptions() {  
  123.         final Set options = new TreeSet<>();  
  124.         options.add(StandardOpenOption.READ);  
  125.         return options;  
  126.     }  
  127.   
  128.     // 使用AsynchronousFileChannel.open(path, withOptions(),  
  129.     // taskExecutor))这个API对异步文件IO的处理  
  130.     public static void asyFileChannel2() {  
  131.         final int THREADS = 5;  
  132.         ExecutorService taskExecutor = Executors.newFixedThreadPool(THREADS);  
  133.         String encoding = System.getProperty("file.encoding");  
  134.         List<Future<ByteBuffer>> list = new ArrayList<>();  
  135.         int sheeps = 0;  
  136.         Path path = Paths.get("/tmp",  
  137.                 "store.txt");  
  138.         try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel  
  139.                 .open(path, withOptions(), taskExecutor)) {  
  140.             for (int i = 0; i < 50; i++) {  
  141.                 Callable<ByteBuffer> worker = new Callable<ByteBuffer>() {  
  142.                     @Override  
  143.                     public ByteBuffer call() throws Exception {  
  144.                         ByteBuffer buffer = ByteBuffer  
  145.                                 .allocateDirect(ThreadLocalRandom.current()  
  146.                                         .nextInt(100200));  
  147.                         asynchronousFileChannel.read(buffer, ThreadLocalRandom  
  148.                                 .current().nextInt(0100));  
  149.                         return buffer;  
  150.                     }  
  151.                 };  
  152.                 Future<ByteBuffer> future = taskExecutor.submit(worker);  
  153.                 list.add(future);  
  154.             }  
  155.             // this will make the executor accept no new threads  
  156.             // and finish all existing threads in the queue  
  157.             taskExecutor.shutdown();  
  158.             // wait until all threads are finished  
  159.             while (!taskExecutor.isTerminated()) {  
  160.                 // do something else while the buffers are prepared  
  161.                 System.out  
  162.                         .println("Counting sheep while filling up some buffers!So far I counted: "  
  163.                                 + (sheeps += 1));  
  164.             }  
  165.             System.out.println("\nDone! Here are the buffers:\n");  
  166.             for (Future<ByteBuffer> future : list) {  
  167.                 ByteBuffer buffer = future.get();  
  168.                 System.out.println("\n\n" + buffer);  
  169.                 System.out  
  170.                         .println("______________________________________________________");  
  171.                 buffer.flip();  
  172.                 System.out.print(Charset.forName(encoding).decode(buffer));  
  173.                 buffer.clear();  
  174.             }  
  175.         } catch (Exception ex) {  
  176.             System.err.println(ex);  
  177.         }  
  178.     }  
  179.   
  180.     //异步server socket channel io处理示例  
  181.     public static void asyServerSocketChannel() {  
  182.           
  183.         //使用threadGroup  
  184. //      AsynchronousChannelGroup threadGroup = null;  
  185. //      ExecutorService executorService = Executors  
  186. //      .newCachedThreadPool(Executors.defaultThreadFactory());  
  187. //      try {  
  188. //      threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);  
  189. //      } catch (IOException ex) {  
  190. //      System.err.println(ex);  
  191. //      }  
  192. //      AsynchronousServerSocketChannel asynchronousServerSocketChannel =  
  193. //              AsynchronousServerSocketChannel.open(threadGroup);  
  194.           
  195.         final int DEFAULT_PORT = 5555;  
  196.         final String IP = "127.0.0.1";  
  197.         ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors  
  198.                 .defaultThreadFactory());  
  199.         // create asynchronous server socket channel bound to the default group  
  200.         try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel  
  201.                 .open()) {  
  202.             if (asynchronousServerSocketChannel.isOpen()) {  
  203.                 // set some options  
  204.                 asynchronousServerSocketChannel.setOption(  
  205.                         StandardSocketOptions.SO_RCVBUF, 4 * 1024);  
  206.                 asynchronousServerSocketChannel.setOption(  
  207.                         StandardSocketOptions.SO_REUSEADDR, true);  
  208.                 // bind the server socket channel to local address  
  209.                 asynchronousServerSocketChannel.bind(new InetSocketAddress(IP,  
  210.                         DEFAULT_PORT));  
  211.                 // display a waiting message while ... waiting clients  
  212.                 System.out.println("Waiting for connections ...");  
  213.                 while (true) {  
  214.                     Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel.accept();  
  215.                     //使用CompletionHandler来处理IO事件  
  216. //                  asynchronousServerSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()   
  217.                     //client使用CompletionHandler来处理IO事件  
  218.                     //asynchronousSocketChannel.connect(new InetSocketAddress(IP, DEFAULT_PORT), null,new CompletionHandler<Void, Void>()   
  219.                     try {  
  220.                         final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture  
  221.                                 .get();  
  222.                         Callable<String> worker = new Callable<String>() {  
  223.                             @Override  
  224.                             public String call() throws Exception {  
  225.                                 String host = asynchronousSocketChannel  
  226.                                         .getRemoteAddress().toString();  
  227.                                 System.out.println("Incoming connection from: "  
  228.                                         + host);  
  229.                                 final ByteBuffer buffer = ByteBuffer  
  230.                                         .allocateDirect(1024);  
  231.                                 // transmitting data  
  232.                                 while (asynchronousSocketChannel.read(buffer)  
  233.                                         .get() != -1) {  
  234.                                     buffer.flip();  
  235.                                 }  
  236.                                 asynchronousSocketChannel.write(buffer).get();  
  237.                                 if (buffer.hasRemaining()) {  
  238.                                     buffer.compact();  
  239.                                 } else {  
  240.                                     buffer.clear();  
  241.                                 }  
  242.                                 asynchronousSocketChannel.close();  
  243.                                 System.out.println(host  
  244.                                         + " was successfully served!");  
  245.                                 return host;  
  246.                             }  
  247.                         };  
  248.                         taskExecutor.submit(worker);  
  249.                     } catch (InterruptedException | ExecutionException ex) {  
  250.                         System.err.println(ex);  
  251.                         System.err.println("\n Server is shutting down ...");  
  252.                         // this will make the executor accept no new threads  
  253.                         // and finish all existing threads in the queue  
  254.                         taskExecutor.shutdown();  
  255.                         // wait until all threads are finished  
  256.                         while (!taskExecutor.isTerminated()) {  
  257.                         }  
  258.                         break;  
  259.                     }  
  260.                 }  
  261.             } else {  
  262.                 System.out  
  263.                         .println("The asynchronous server-socket channel cannot be opened!");  
  264.             }  
  265.         } catch (IOException ex) {  
  266.             System.err.println(ex);  
  267.         }  
  268.   
  269.     }  
  270. }  

输出:
  1. Do something else while reading ...  
  2. Do something else while reading ...  
  3. Do something else while reading ...  
  4. Do something else while reading ...  
  5. Do something else while reading ...  
  6. Do something else while reading ...  
  7. Do something else while reading ...  
  8. Do something else while reading ...  
  9. Do something else while reading ...  
  10. Do something else while reading ...  
  11. Do something else while reading ...  
  12. Do something else while reading ...  
  13. Do something else while reading ...  
  14. Do something else while reading ...  
  15. Do something else while reading ...  
  16. Do something else while reading ...  
  17. Do something else while reading ...  
  18. Do something else while reading ...  
  19. Do something else while reading ...  
  20. Do something else while reading ...  
  21. Do something else while reading ...  
  22. Do something else while reading ...  
  23. Do something else while reading ...  
  24. Do something else while reading ...  
  25. Do something else while reading ...  
  26. Do something else while reading ...  
  27. Do something else while reading ...  
  28. Do something else while reading ...  
  29. Do something else while reading ...  
  30. Do something else while reading ...  
  31. Do something else while reading ...  
  32. Do something else while reading ...  
  33. Do something else while reading ...  
  34. Do something else while reading ...  
  35. Do something else while reading ...  
  36. Do something else while reading ...  
  37. Do something else while reading ...  
  38. Do something else while reading ...  
  39. Do something else while reading ...  
  40. Do something else while reading ...  
  41. Do something else while reading ...  
  42. Do something else while reading ...  
  43. Do something else while reading ...  
  44. Do something else while reading ...  
  45. Do something else while reading ...  
  46. Do something else while reading ...  
  47. Do something else while reading ...  
  48. Do something else while reading ...  
  49. Do something else while reading ...  
  50. Do something else while reading ...  
  51. Do something else while reading ...  
  52. Do something else while reading ...  
  53. Do something else while reading ...  
  54. Do something else while reading ...  
  55. Do something else while reading ...  
  56. Do something else while reading ...  
  57. Do something else while reading ...  
  58. Do something else while reading ...  
  59. Do something else while reading ...  
  60. Do something else while reading ...  
  61. Do something else while reading ...  
  62. Do something else while reading ...  
  63. Do something else while reading ...  
  64. Do something else while reading ...  
  65. Do something else while reading ...  
  66. Do something else while reading ...  
  67. Do something else while reading ...  
  68. Do something else while reading ...  
  69. Do something else while reading ...  
  70. Do something else while reading ...  
  71. Do something else while reading ...  
  72. Do something else while reading ...  
  73. Do something else while reading ...  
  74. Do something else while reading ...  
  75. Do something else while reading ...  
  76. Do something else while reading ...  
  77. Do something else while reading ...  
  78. Do something else while reading ...  
  79. Do something else while reading ...  
  80. Do something else while reading ...  
  81. Do something else while reading ...  
  82. Do something else while reading ...  
  83. Do something else while reading ...  
  84. Do something else while reading ...  
  85. Do something else while reading ...  
  86. Do something else while reading ...  
  87. Do something else while reading ...  
  88. Do something else while reading ...  
  89. Do something else while reading ...  
  90. Do something else while reading ...  
  91. Do something else while reading ...  
  92. Read done: true  
  93. Bytes read: 18  
  94. hello,filechannel  
  95. Read operation status ...  
  96. Error:java.nio.channels.AsynchronousCloseException  
  97. Do something else while writing ...  
  98. Do something else while writing ...  
  99. Do something else while writing ...  
  100. Written done: true  
  101. Bytes written: 51  
  102. Waiting for the file to be locked ...  
  103. Waiting for the bytes to be written ...  
  104. I’ve written 100 bytes into store.txt locked file!  
  105. Counting sheep while filling up some buffers!So far I counted: 1  
  106. Counting sheep while filling up some buffers!So far I counted: 2  
  107. Counting sheep while filling up some buffers!So far I counted: 3  
  108. Counting sheep while filling up some buffers!So far I counted: 4  
  109. Counting sheep while filling up some buffers!So far I counted: 5  
  110. Counting sheep while filling up some buffers!So far I counted: 6  
  111. Counting sheep while filling up some buffers!So far I counted: 7  
  112. Counting sheep while filling up some buffers!So far I counted: 8  
  113. Counting sheep while filling up some buffers!So far I counted: 9  
  114. Counting sheep while filling up some buffers!So far I counted: 10  
  115. Counting sheep while filling up some buffers!So far I counted: 11  
  116.   
  117. Done! Here are the buffers:  
  118.   
  119. java.lang.InterruptedException  
  120. Waiting for connections ... 
目录
相关文章
|
22天前
|
Java 应用服务中间件 Linux
java中的NIO,BIO,AIO
java中的NIO,BIO,AIO
10 0
|
1月前
|
设计模式 网络协议 Java
Java NIO 网络编程 | Netty前期知识(二)
Java NIO 网络编程 | Netty前期知识(二)
48 0
|
1月前
|
Java 索引
📌 Java NIO Buffer
Java NIO缓冲区在与NIO通道交互时使用。数据从通道读取到缓冲区,然后从缓冲区写入通道。 缓冲区本质上是一块内存,可以在其中写入数据,然后再进行读取。这个内存块被封装在一个NIOBuffer对象中,该对象提供了一组方法,可以更容易地使用内存块。
|
1月前
|
缓存 网络协议 Java
📌 Java NIO Channel
Java NIOChannel和传统的流相似,但是也存在一些差异: • 在同一个Channel通道中,既可以进行 读操作 也可以进行 写操作,但是 流 只能进行 读 或者 写 其中一种操作。 • Channel通道可以进行异步读写。 • Channel可以从 Buffer中进行读写操作。将数据从Channel通道读取到Buffer缓冲区,并将数据从Buffer缓冲区写入Channel通道。
|
1月前
|
Java API 容器
📌 Java NIO
Java NIO(New IO或 Non Blocking IO)是从Java 1.4版本开始引入的一个新的IOAPI,可以替代标准的Java IO API。NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。(NIO非阻塞 IO操作)。Java NIO包含了如下三个核心组件:Channel、Buffers、Selectors。
|
2月前
|
存储 监控 Java
Java输入输出:什么是NIO(New I/O)?
Java输入输出:什么是NIO(New I/O)?
23 1
|
2月前
|
存储 缓存 监控
Java NIO三大核心组件
用户程序进行IO的读写,依赖于底层的IO读写,基本上会用到底层的read&write两大系统调用。在不同的操作系统中,IO读写的系统调用的名称可能完全不一样,但是基本功能是一样的。 read系统调用并不是直接从物理设备把数据读取到内存中,write系统调用也不是直接把数据写入到物理设备。上层应用无论是调用操作系统的read还是write,都会涉及缓冲区。**具体来说,调用操作系统的read,是把数据从内核缓冲区复制到进程缓冲区;而调用系统调用的write,是把数据从进程缓冲区复制到内核缓冲区。**因为外部设备的读写设计到操作系统的中断,引入缓冲区可以减少频繁地与设备之间的物理交换,操作系统会
|
2月前
|
存储 Java 容器
Java 中的 java.nio.FloatBuffer 类
Java 中的 java.nio.FloatBuffer 类
37 0
|
2月前
|
Java
Java 中的 java.nio.IntBuffer 类
Java 中的 java.nio.IntBuffer 类
22 0
|
3月前
|
存储 Java API
使用Java NIO进行文件操作、网络通信和多路复用的案例
使用Java NIO进行文件操作、网络通信和多路复用的案例

相关产品

  • 云迁移中心