java Nio 异步操作(四)channel

简介: Java NIO 的核心组成部分: 1.Channels 2.Buffers 3.Selectors   我们首先来学习Channels(java.nio.channels): 通道   1)通道基础   通道(Channel)是java.nio的第二个主要创新。它们既不是一个扩展也不是一项增强,而是全新、极好的Java I/O示例,提供与I/O服务的直接连接。Channel

Java NIO 的核心组成部分:

1.Channels

2.Buffers

3.Selectors

  我们首先来学习Channels(java.nio.channels):

通道

  1)通道基础

  通道(Channel)是java.nio的第二个主要创新。它们既不是一个扩展也不是一项增强,而是全新、极好的Java I/O示例,提供与I/O服务的直接连接。Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。

  channel的jdk源码:

1
2
3
4
5
6
package java.nio.channels;
     public interface Channel;
     {
         public boolean isOpen();
         public void close() throws IOException;
     }

  与缓冲区不同,通道API主要由接口指定。不同的操作系统上通道实现(Channel Implementation)会有根本性的差异,所以通道API仅仅描述了可以做什么。因此很自然地,通道实现经常使用操作系统的本地代码。通道接口允许您以一种受控且可移植的方式来访问底层的I/O服务。

  Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。所有数据都通过 Buffer 对象来处理。您永远不会将字节直接写入通道中,相反,您是将数据写入包含一个或者多个字节的缓冲区。同样,您不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

Java NIO 的通道类似流,但又有些不同:

  • 既可以从通道中读取数据,又可以写数据到通道。但流的读写通常是单向的。
  • 通道可以异步地读写。
  • 通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。

基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。这里有个图示: 
                                                      

下面是JAVA NIO中的一些主要Channel的实现: java.nio.channels包中的Channel接口的已知实现类

  • FileChannel:从文件中读写数据。
  • DatagramChannel:能通过UDP读写网络中的数据。
  • SocketChannel:能通过TCP读写网络中的数据。
  • ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

正如你所看到的,这些通道涵盖了UDP 和 TCP 网络IO,以及文件IO。 

  2)通道API
  1.文件通道API
  FileChannel类可以实现常用的read,write以及scatter/gather操作,同时它也提供了很多专用于文件的新方法。这些方法中的许多都是我们所熟悉的文件操作。
  FileChannel类的JDK源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package java.nio.channels;
     public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel
     {
         // This is a partial API listing
         // All methods listed here can throw java.io.IOException
         public abstract int read (ByteBuffer dst, long position);
         public abstract int write (ByteBuffer src, long position);
         public abstract long size();
         public abstract long position();
         public abstract void position ( long newPosition);
         public abstract void truncate ( long size);
         public abstract void force ( boolean metaData);
         public final FileLock lock();
         public abstract FileLock lock ( long position, long size, boolean shared);
         public final FileLock tryLock();
         public abstract FileLock tryLock ( long position, long size, boolean shared);
         public abstract MappedByteBuffer map (MapMode mode, long position, long size);
         public static class MapMode;
         public static final MapMode READ_ONLY;
         public static final MapMode READ_WRITE;
         public static final MapMode PRIVATE;
         public abstract long transferTo ( long position, long count, WritableByteChannel target);
         public abstract long transferFrom (ReadableByteChannel src, long position, long count);
     } 

  文件通道总是阻塞式的,因此不能被置于非阻塞模式。现代操作系统都有复杂的缓存和预取机制,使得本地磁盘I/O操作延迟很少。网络文件系统一般而言延迟会多些,不过却也因该优化而受益。面向流的I/O的非阻塞范例对于面向文件的操作并无多大意义,这是由文件I/O本质上的不同性质造成的。对于文件I/O,最强大之处在于异步I/O(asynchronous I/O),它允许一个进程可以从操作系统请求一个或多个I/O操作而不必等待这些操作的完成。发起请求的进程之后会收到它请求的I/O操作已完成的通知。

  FileChannel对象是线程安全(thread-safe)的。多个进程可以在同一个实例上并发调用方法而不会引起任何问题,不过并非所有的操作都是多线程的(multithreaded)。影响通道位置或者影响文件大小的操作都是单线程的(single-threaded)。如果有一个线程已经在执行会影响通道位置或文件大小的操作,那么其他尝试进行此类操作之一的线程必须等待。并发行为也会受到底层的操作系统或文件系统影响。

  每个FileChannel对象都同一个文件描述符(file descriptor)有一对一的关系,所以上面列出的API方法与在您最喜欢的POSIX(可移植操作系统接口)兼容的操作系统上的常用文件I/O系统调用紧密对应也就不足为怪了。本质上讲,RandomAccessFile类提供的是同样的抽象内容。在通道出现之前,底层的文件操作都是通过RandomAccessFile类的方法来实现的。FileChannel模拟同样的I/O服务,因此它的API自然也是很相似的。

  三者之间的方法对比:

  
FILECHANNEL RANDOMACCESSFILE POSIX SYSTEM CALL
read( ) read( ) read( )
write( ) write( ) write( )
size( ) length( ) fstat( )
position( ) getFilePointer( ) lseek( )
position (long newPosition) seek( ) lseek( )
truncate( ) setLength( ) ftruncate( )
force( ) getFD().sync( ) fsync( )

 

  2.Socket通道
  新的socket通道类可以运行非阻塞模式并且是可选择的。这两个性能可以激活大程序(如网络服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个socket连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换总开销。借助新的NIO类,一个或几个线程就可以管理成百上千的活动socket连接了并且只有很少甚至可能没有性能损失。所有的socket通道类(DatagramChannel、SocketChannel和ServerSocketChannel)都继承了位于java.nio.channels.spi包中的AbstractSelectableChannel。这意味着我们可以用一个Selector对象来执行socket通道的就绪选择(readiness selection)。

  请注意DatagramChannel和SocketChannel实现定义读和写功能的接口而ServerSocketChannel不实现。ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身从不传输数据。

  在我们具体讨论每一种socket通道前,您应该了解socket和socket通道之间的关系。之前的章节中有写道,通道是一个连接I/O服务导管并提供与该服务交互的方法。就某个socket而言,它不会再次实现与之对应的socket通道类中的socket协议API,而java.net中已经存在的socket通道都可以被大多数协议操作重复使用。

  全部socket通道类(DatagramChannel、SocketChannel和ServerSocketChannel)在被实例化时都会创建一个对等socket对象。这些是我们所熟悉的来自java.net的类(Socket、ServerSocket和DatagramSocket),它们已经被更新以识别通道。对等socket可以通过调用socket( )方法从一个通道上获取。此外,这三个java.net类现在都有getChannel( )方法。

  Socket通道将与通信协议相关的操作委托给相应的socket对象。socket的方法看起来好像在通道类中重复了一遍,但实际上通道类上的方法会有一些新的或者不同的行为。

  要把一个socket通道置于非阻塞模式,我们要依靠所有socket通道类的公有超级类:SelectableChannel。就绪选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻塞I/O和可选择性是紧密相连的,那也正是管理阻塞模式的API代码要在SelectableChannel超级类中定义的原因。

  设置或重新设置一个通道的阻塞模式是很简单的,只要调用configureBlocking( )方法即可,传递参数值为true则设为阻塞模式,参数值为false值设为非阻塞模式。真的,就这么简单!您可以通过调用isBlocking( )方法来判断某个socket通道当前处于哪种模式。

  非阻塞socket通常被认为是服务端使用的,因为它们使同时管理很多socket通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的socket通道也是有益处的,例如,借助非阻塞socket通道,GUI程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。

  偶尔地,我们也会需要防止socket通道的阻塞模式被更改。API中有一个blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式。

2.2.1 ServerSocketChannel
让我们从最简单的ServerSocketChannel来开始对socket通道类的讨论。以下是ServerSocketChannel的完整API:

1
2
3
4
5
6
7
public abstract class ServerSocketChannel extends AbstractSelectableChannel
   {
       public static ServerSocketChannel open() throws IOException;
       public abstract ServerSocket socket();
       public abstract ServerSocket accept() throws IOException;
       public final int validOps();
   }  

ServerSocketChannel是一个基于通道的socket监听器。它同我们所熟悉的java.net.ServerSocket执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。

由于ServerSocketChannel没有bind( )方法,因此有必要取出对等的socket并使用它来绑定到一个端口以开始监听连接。我们也是使用对等ServerSocket的API来根据需要设置其他的socket选项。

同它的对等体java.net.ServerSocket一样,ServerSocketChannel也有accept( )方法。一旦您创建了一个ServerSocketChannel并用对等socket绑定了它,然后您就可以在其中一个上调用accept( )。如果您选择在ServerSocket上调用accept( )方法,那么它会同任何其他的ServerSocket表现一样的行为:总是阻塞并返回一个java.net.Socket对象。如果您选择在ServerSocketChannel上调用accept( )方法则会返回SocketChannel类型的对象,返回的对象能够在非阻塞模式下运行。

如果以非阻塞模式被调用,当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返回null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册一个ServerSocketChannel对象以实现新连接到达时自动通知的功能。以下代码演示了如何使用一个非阻塞的accept( )方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.ronsoft.books.nio.channels;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.net.InetSocketAddress;
 
    public class ChannelAccept
    {
        public static final String GREETING = "Hello I must be going.\r\n" ;
        public static void main (String [] argv) throws Exception
        {
            int port = 1234 ; // default
            if (argv.length > 0 ) {
                port = Integer.parseInt (argv [ 0 ]);
            }
            ByteBuffer buffer = ByteBuffer.wrap (GREETING.getBytes());
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.socket().bind ( new InetSocketAddress (port));
            ssc.configureBlocking ( false );
            while ( true ) {
                System.out.println ( "Waiting for connections" );
                SocketChannel sc = ssc.accept();
                if (sc == null ) {
                    Thread.sleep ( 2000 );
                } else {
                    System.out.println ( "Incoming connection from: " + sc.socket().getRemoteSocketAddress());
                    buffer.rewind();
                    sc.write (buffer);
                    sc.close();
                }
            }
        }
    }

2.2.2 SocketChannel

下面开始学习SocketChannel,它是使用最多的socket通道类:

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel:

  1. 打开一个SocketChannel并连接到互联网上的某台服务器。
  2. 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel。

打开 SocketChannel

下面是SocketChannel的打开方式:

1
2
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect( new InetSocketAddress( "http://jenkov.com" , 80 ));

关闭 SocketChannel

当用完SocketChannel之后调用SocketChannel.close()关闭SocketChannel:

1
socketChannel.close();  

从 SocketChannel 读取数据

要从SocketChannel中读取数据,调用一个read()的方法之一。以下是例子:

1
2
ByteBuffer buf = ByteBuffer.allocate( 48 );
int bytesRead = socketChannel.read(buf);

  首先,分配一个Buffer。从SocketChannel读取到的数据将会放到这个Buffer中。然后,调用SocketChannel.read()。该方法将数据从SocketChannel 读到Buffer中。read()方法返回的int值表示读了多少字节进Buffer里。如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。

写入 SocketChannel

写数据到SocketChannel用的是SocketChannel.write()方法,该方法以一个Buffer作为参数。示例如下:

1
2
3
4
5
6
7
8
9
10
11
String newData = "New String to write to file..." + System.currentTimeMillis();
 
ByteBuffer buf = ByteBuffer.allocate( 48 );
buf.clear();
buf.put(newData.getBytes());
 
buf.flip();
 
while (buf.hasRemaining()) {
     channel.write(buf);
}

  注意SocketChannel.write()方法的调用是在一个while循环中的。Write()方法无法保证能写多少字节到SocketChannel。所以,我们重复调用write()直到Buffer没有要写的字节为止。

非阻塞模式

可以设置 SocketChannel 为非阻塞模式(non-blocking mode).设置之后,就可以在异步模式下调用connect(), read() 和write()了。

connect()

如果SocketChannel在非阻塞模式下,此时调用connect(),该方法可能在连接建立之前就返回了。为了确定连接是否建立,可以调用finishConnect()的方法。像这样:

1
2
3
4
5
6
socketChannel.configureBlocking( false );
socketChannel.connect( new InetSocketAddress( "http://jenkov.com" , 80 ));
 
while (! socketChannel.finishConnect() ){
     //wait, or do something else...
}

write()

非阻塞模式下,write()方法在尚未写出任何内容时可能就返回了。所以需要在循环中调用write()。前面已经有例子了,这里就不赘述了。

read()

非阻塞模式下,read()方法在尚未读取到任何数据时可能就返回了。所以需要关注它的int返回值,它会告诉你读取了多少字节。

非阻塞模式与选择器

非阻塞模式与选择器搭配会工作的更好,通过将一或多个SocketChannel注册到Selector,可以询问选择器哪个通道已经准备好了读取,写入等。Selector与SocketChannel的搭配使用会在后面详讲。

2.2.3 DatagramChannel
最后一个socket通道是DatagramChannel。正如SocketChannel对应Socket,ServerSocketChannel对应ServerSocket,每一个DatagramChannel对象也有一个关联的DatagramSocket对象。不过原命名模式在此并未适用:“DatagramSocketChannel”显得有点笨拙,因此采用了简洁的“DatagramChannel”名称。

正如SocketChannel模拟连接导向的流协议(如TCP/IP),DatagramChannel则模拟包导向的无连接协议(如UDP/IP)。

DatagramChannel是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据负载。与面向流的的socket不同,DatagramChannel可以发送单独的数据报给不同的目的地址。同样,DatagramChannel对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)。

最后给出一个基本的channel实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RandomAccessFile aFile = new RandomAccessFile( "data/nio-data.txt" , "rw" );
FileChannel inChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate( 48 );
int bytesRead = inChannel.read(buf);
//读取的字节数,可能为零,如果该通道已到达流的末尾,则返回 -1
while (bytesRead != - 1 ) { System.out.println( "Read " + bytesRead);
buf.flip();
//反转缓冲区
while (buf.hasRemaining()){
System.out.print(( char ) buf.get());
//读取此缓冲区当前位置的字节,然后该位置递增。
} buf.clear();
bytesRead = inChannel.read(buf);
//从缓冲区读取数据
} aFile.close();
目录
相关文章
|
12天前
|
Java 大数据
解析Java中的NIO与传统IO的区别与应用
解析Java中的NIO与传统IO的区别与应用
|
13天前
|
消息中间件 Java Kafka
如何在Java中实现异步消息处理?
如何在Java中实现异步消息处理?
|
13天前
|
安全 Java
【Java】已解决java.nio.channels.OverlappingFileLockException异常
【Java】已解决java.nio.channels.OverlappingFileLockException异常
12 1
|
13天前
|
Java
【Java】已解决java.nio.channels.ClosedChannelException异常
【Java】已解决java.nio.channels.ClosedChannelException异常
13 1
|
13天前
|
Java
【Java】已解决java.nio.channels.FileLockInterruptionException异常
【Java】已解决java.nio.channels.FileLockInterruptionException异常
15 1
|
16天前
|
Java
Java中的NIO编程详解
Java中的NIO编程详解
|
11天前
|
监控 网络协议 Java
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
13 0
|
14天前
|
Java 大数据
解析Java中的NIO与传统IO的区别与应用
解析Java中的NIO与传统IO的区别与应用
|
17天前
|
消息中间件 Java 中间件
Java中的消息中间件与异步通信实现
Java中的消息中间件与异步通信实现
|
11天前
|
Java 调度
Java线程的六种状态
Java线程有六种状态: 初始(NEW)、运行(RUNNABLE)、阻塞(BLOCKED)、等待(WAITING)、超时等待(TIMED_WAITING)、终止(TERMINATED)。
27 1