声明:本文是《Netty 权威指南》的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文。
我们首先还是看下如何对TimeClient进行改造:
01 |
public class TimeClient { |
02 |
03 |
/** |
04 |
* @param args |
05 |
*/ |
06 |
public static void main(String[] args) { |
07 |
08 |
int port = 8080 ; |
09 |
if (args != null && args.length > 0 ) { |
10 |
try { |
11 |
port = Integer.valueOf(args[ 0 ]); |
12 |
} catch (NumberFormatException e) { |
13 |
// 采用默认值 |
14 |
} |
15 |
} |
16 |
new Thread( new TimeClientHandle( "127.0.0.1" , port), "TimeClient-001" ) |
17 |
.start(); |
18 |
} |
19 |
} |
与之前唯一不同的就是我们通过创建TimeClientHandle线程来处理异步连接、读写操作,由于TimeClient非常简单且变更不大,我们重点分析TimeClientHandle,代码如下:
001 |
public class TimeClientHandle implements Runnable { |
002 |
private String host; |
003 |
private int port; |
004 |
private Selector selector; |
005 |
private SocketChannel socketChannel; |
006 |
private volatile boolean stop; |
007 |
008 |
public TimeClientHandle(String host, int port) { |
009 |
this .host = host == null ? "127.0.0.1" : host; |
010 |
this .port = port; |
011 |
try { |
012 |
selector = Selector.open(); |
013 |
socketChannel = SocketChannel.open(); |
014 |
socketChannel.configureBlocking( false ); |
015 |
} catch (IOException e) { |
016 |
e.printStackTrace(); |
017 |
System.exit( 1 ); |
018 |
} |
019 |
} |
020 |
021 |
/* |
022 |
* (non-Javadoc) |
023 |
* |
024 |
* @see java.lang.Runnable#run() |
025 |
*/ |
026 |
@Override |
027 |
public void run() { |
028 |
try { |
029 |
doConnect(); |
030 |
} catch (IOException e) { |
031 |
e.printStackTrace(); |
032 |
System.exit( 1 ); |
033 |
} |
034 |
while (!stop) { |
035 |
try { |
036 |
selector.select( 1000 ); |
037 |
Set<SelectionKey> selectedKeys = selector.selectedKeys(); |
038 |
Iterator<SelectionKey> it = selectedKeys.iterator(); |
039 |
SelectionKey key = null ; |
040 |
while (it.hasNext()) { |
041 |
key = it.next(); |
042 |
it.remove(); |
043 |
try { |
044 |
handleInput(key); |
045 |
} catch (Exception e) { |
046 |
if (key != null ) { |
047 |
key.cancel(); |
048 |
if (key.channel() != null ) |
049 |
key.channel().close(); |
050 |
} |
051 |
} |
052 |
} |
053 |
} catch (Exception e) { |
054 |
e.printStackTrace(); |
055 |
System.exit( 1 ); |
056 |
} |
057 |
} |
058 |
059 |
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 |
060 |
if (selector != null ) |
061 |
try { |
062 |
selector.close(); |
063 |
} catch (IOException e) { |
064 |
e.printStackTrace(); |
065 |
} |
066 |
} |
067 |
068 |
private void handleInput(SelectionKey key) throws IOException { |
069 |
070 |
if (key.isValid()) { |
071 |
// 判断是否连接成功 |
072 |
SocketChannel sc = (SocketChannel) key.channel(); |
073 |
if (key.isConnectable()) { |
074 |
if (sc.finishConnect()) { |
075 |
sc.register(selector, SelectionKey.OP_READ); |
076 |
doWrite(sc); |
077 |
} else |
078 |
System.exit( 1 ); // 连接失败,进程退出 |
079 |
} |
080 |
if (key.isReadable()) { |
081 |
ByteBuffer readBuffer = ByteBuffer.allocate( 1024 ); |
082 |
int readBytes = sc.read(readBuffer); |
083 |
if (readBytes > 0 ) { |
084 |
readBuffer.flip(); |
085 |
byte [] bytes = new byte [readBuffer.remaining()]; |
086 |
readBuffer.get(bytes); |
087 |
String body = new String(bytes, "UTF-8" ); |
088 |
System.out.println( "Now is : " + body); |
089 |
this .stop = true ; |
090 |
} else if (readBytes < 0 ) { |
091 |
// 对端链路关闭 |
092 |
key.cancel(); |
093 |
sc.close(); |
094 |
} else |
095 |
; // 读到0字节,忽略 |
096 |
} |
097 |
} |
098 |
099 |
} |
100 |
101 |
private void doConnect() throws IOException { |
102 |
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 |
103 |
if (socketChannel.connect( new InetSocketAddress(host, port))) { |
104 |
socketChannel.register(selector, SelectionKey.OP_READ); |
105 |
doWrite(socketChannel); |
106 |
} else |
107 |
socketChannel.register(selector, SelectionKey.OP_CONNECT); |
108 |
} |
109 |
110 |
private void doWrite(SocketChannel sc) throws IOException { |
111 |
byte [] req = "QUERY TIME ORDER" .getBytes(); |
112 |
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); |
113 |
writeBuffer.put(req); |
114 |
writeBuffer.flip(); |
115 |
sc.write(writeBuffer); |
116 |
if (!writeBuffer.hasRemaining()) |
117 |
System.out.println( "Send order 2 server succeed." ); |
118 |
} |
119 |
} |
与服务端类似,我们通过对关键步骤的源码进行分析和解读,让大家深入了解如何创建NIO客户端以及如何使用NIO的API。
8-19行构造函数用于初始化NIO的多路复用器和SocketChannel对象,需要注意的是创建SocketChannel之后,需要将其设置为异步非阻塞模式。就像在2.3.3章节中所讲的,我们可以设置SocketChannel的TCP参数,例如接收和发送的TCP缓冲区大小
28-33行用于发送连接请求,作为示例,连接是成功的,所以不需要做重连操作,因此将其放到循环之前。下面我们具体看看doConnect的实现,代码跳到第116-123行,首先对SocketChannel的connect()操作进行判断,如果连接成功,则将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_READ,如果没有直接连接成功,说明服务端没有返回TCP握手应答消息,这并不代表连接失败,我们需要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT,当服务端返回TCP syn-ack消息后,Selector就能够轮询到这个SocketChannel处于连接就绪状态
4-72行在循环体中轮询多路复用器Selector,当有就绪的Channel时,执行第59行的handleInput(key)方法,下面我们就对handleInput方法进行分析。
跳到第68行,我们首先对SelectionKey进行判断,看它处于什么状态。如果是处于连接状态,说明服务端已经返回ACK应答消息,我们需要对连接结果进行判断,调用SocketChannel的finishConnect()方法,如果返回值为true,说明客户端连接成功,如果返回值为false或者直接抛出IOException,说明连接失败。在本例程中,返回值为true,说明连接成功。将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,监听网络读操作。然后发送请求消息给服务端,下面我们对doWrite(sc)进行分析。代码跳到110行,我们构造请求消息体,然后对其编码,写入到发送缓冲区中,最后调用SocketChannel的write方法进行发送,由于发送是异步的,所以会存在“半包写”问题,此处不再赘述。最后通过hasRemaining()方法对发送结果进行判断,如果缓冲区中的消息全部发送完成,打印”Send order 2 server succeed.
代码返回第80行,我们继续分析下客户端是如何读取时间服务器应答消息的。如果客户端接收到了服务端的应答消息,则SocketChannel是可读的,由于无法事先判断应答码流的大小,我们就预分配1M的接收缓冲区用于读取应答消息,调用SocketChannel的read()方法进行异步读取操作,由于是异步操作,所以必须对读取的结果进行判断,这部分的处理逻辑已经在2.3.3章节详细介绍过,此处不再赘述。如果读取到了消息,则对消息进行解码,最后打印结果。执行完成后将stop置为true,线程退出循环
线程退出循环后,我们需要对连接资源进行释放,以实现“优雅退出”。60-66行用于多路复用器的资源释放,由于多路复用器上可能注册成千上万的Channel或者pipe,如果一一对这些资源进行释放显然不合适。因此,JDK底层会自动释放所有跟此多路复用器关联的资源,JDK的API DOC如下:
到此为止,我们已经将时间服务器通过NIO完成了改造,并对源码进行了分析和解读,下面分别执行时间服务器的服务端和客户端,看执行结果。
服务端执行结果:
客户端执行结果:
通过源码对比分析,我们发现NIO编程难度确实比同步阻塞BIO大很多,我们的NIO例程并没有考虑“半包读”和“半包写”,如果加上这些,代码将会更加复杂。NIO代码既然这么复杂,为什么它的应用却越来越广泛呢,使用NIO编程的优点总结如下:
1) 客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞;
2) SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样IO通信线程就可以处理其它的链路,不需要同步等待这个链路可用;
3) 线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。
JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,引人注目的是Java正式提供了异步文件IO操作,同时提供了与Unix网络编程事件驱动IO对应的AIO,下面的2.4章节我们学习下如何利用NIO2.0编写AIO程序,我们还是以时间服务器为例进行讲解。
文章转自 并发编程网-ifeve.com