声明:本文是《Netty 权威指南》的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文。
异步非阻塞IO版本的时间服务器服务端已经介绍完毕,下面我们继续看客户端的实现。
首先看下客户端主函数的实现,AIO时间服务器客户端 TimeClient:
01 |
public class TimeClient { |
02 |
03 |
/** |
04 |
* @param args |
05 |
*/ |
06 |
public static void main(String[] args) { |
07 |
int port = 8080 ; |
08 |
if (args != null && args.length > 0 ) { |
09 |
try { |
10 |
port = Integer.valueOf(args[ 0 ]); |
11 |
} catch (NumberFormatException e) { |
12 |
// 采用默认值 |
13 |
} |
14 |
} |
15 |
new Thread( new AsyncTimeClientHandler( "127.0.0.1" , port), |
16 |
"AIO-AsyncTimeClientHandler-001" ).start(); |
17 |
18 |
} |
19 |
} |
第15行我们通过一个独立的IO线程创建异步时间服务器客户端handler,在实际项目中,我们不需要独立的线程创建异步连接对象,因为底层都是通过JDK的系统回调实现的,在后面运行时间服务器程序的时候,我们会抓取线程调用堆栈给大家展示。继续看代码, AsyncTimeClientHandler的实现类源码如下:
001 |
public class AsyncTimeClientHandler implements |
002 |
CompletionHandler<Void, AsyncTimeClientHandler>, Runnable { |
003 |
004 |
private AsynchronousSocketChannel client; |
005 |
private String host; |
006 |
private int port; |
007 |
private CountDownLatch latch; |
008 |
009 |
public AsyncTimeClientHandler(String host, int port) { |
010 |
this .host = host; |
011 |
this .port = port; |
012 |
try { |
013 |
client = AsynchronousSocketChannel.open(); |
014 |
} catch (IOException e) { |
015 |
e.printStackTrace(); |
016 |
} |
017 |
} |
018 |
019 |
@Override |
020 |
public void run() { |
021 |
latch = new CountDownLatch( 1 ); |
022 |
client.connect( new InetSocketAddress(host, port), this , this ); |
023 |
try { |
024 |
latch.await(); |
025 |
} catch (InterruptedException e1) { |
026 |
e1.printStackTrace(); |
027 |
} |
028 |
try { |
029 |
client.close(); |
030 |
} catch (IOException e) { |
031 |
e.printStackTrace(); |
032 |
} |
033 |
} |
034 |
035 |
@Override |
036 |
public void completed(Void result, AsyncTimeClientHandler attachment) { |
037 |
byte [] req = "QUERY TIME ORDER" .getBytes(); |
038 |
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); |
039 |
writeBuffer.put(req); |
040 |
writeBuffer.flip(); |
041 |
client.write(writeBuffer, writeBuffer, |
042 |
new CompletionHandler<Integer, ByteBuffer>() { |
043 |
@Override |
044 |
public void completed(Integer result, ByteBuffer buffer) { |
045 |
if (buffer.hasRemaining()) { |
046 |
client.write(buffer, buffer, this ); |
047 |
} else { |
048 |
ByteBuffer readBuffer = ByteBuffer.allocate( 1024 ); |
049 |
client.read( |
050 |
readBuffer, |
051 |
readBuffer, |
052 |
new CompletionHandler<Integer, ByteBuffer>() { |
053 |
@Override |
054 |
public void completed(Integer result, |
055 |
ByteBuffer buffer) { |
056 |
buffer.flip(); |
057 |
byte [] bytes = new byte [buffer |
058 |
.remaining()]; |
059 |
buffer.get(bytes); |
060 |
String body; |
061 |
try { |
062 |
body = new String(bytes, |
063 |
"UTF-8" ); |
064 |
System.out.println( "Now is : " |
065 |
+ body); |
066 |
latch.countDown(); |
067 |
} catch (UnsupportedEncodingException e) { |
068 |
e.printStackTrace(); |
069 |
} |
070 |
} |
071 |
072 |
@Override |
073 |
public void failed(Throwable exc, |
074 |
ByteBuffer attachment) { |
075 |
try { |
076 |
client.close(); |
077 |
latch.countDown(); |
078 |
} catch (IOException e) { |
079 |
// ingnore on close |
080 |
} |
081 |
} |
082 |
}); |
083 |
} |
084 |
} |
085 |
086 |
@Override |
087 |
public void failed(Throwable exc, ByteBuffer attachment) { |
088 |
try { |
089 |
client.close(); |
090 |
latch.countDown(); |
091 |
} catch (IOException e) { |
092 |
// ingnore on close |
093 |
} |
094 |
} |
095 |
}); |
096 |
} |
097 |
098 |
@Override |
099 |
public void failed(Throwable exc, AsyncTimeClientHandler attachment) { |
100 |
exc.printStackTrace(); |
101 |
try { |
102 |
client.close(); |
103 |
latch.countDown(); |
104 |
} catch (IOException e) { |
105 |
e.printStackTrace(); |
106 |
} |
107 |
} |
108 |
} |
由于在AsyncTimeClientHandler中大量使用了内部匿名类,所以代码看起来稍微有些复杂,下面我们就对主要代码进行详细解说。
9-17行是构造方法,首先通过AsynchronousSocketChannel的open方法创建一个新的AsynchronousSocketChannel对象。然后跳到第36行,创建CountDownLatch进行等待,防止异步操作没有执行完成线程就退出。第37行通过connect方法发起异步操作,它有两个参数,分别如下:
1) A attachment : AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义;
2) CompletionHandler<Void,? super A> handler:异步操作回调通知接口,由调用者实现。
在本例程中,我们的两个参数都使用AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口。
接下来我们看异步连接成功之后的方法回调completed方法,代码第39行,我们创建请求消息体,对其进行编码,然后拷贝到发送缓冲区writeBuffer中,调用AsynchronousSocketChannel的write方法进行异步写,与服务端类似,我们可以实现CompletionHandler<Integer, ByteBuffer>接口用于写操作完成后的回调,代码第45-47行,如果发送缓冲区中仍有尚未发送的字节,我们继续异步发送,如果已经发送完成,则执行异步读取操作。
代码第64-97行是客户端异步读取时间服务器服务端应答消息的处理逻辑,代码第49行我们调用AsynchronousSocketChannel的read方法异步读取服务端的响应消息,由于read操作是异步的,所以我们通过内部匿名类实现CompletionHandler<Integer, ByteBuffer>接口,当读取完成被JDK回调时,我们构造应答消息。第56-63行我们从CompletionHandler的ByteBuffer中读取应答消息,然后打印结果。
第197-96行,当读取发生异常时,我们关闭链路,同时调用CountDownLatch的countDown方法让AsyncTimeClientHandler线程执行完毕,客户端退出执行。
需要指出的是,正如之前的NIO例程,我们并没有完整的处理网络的半包读写,当对例程进行功能测试的时候没有问题,但是,如果对代码稍加改造,进行压力或者性能测试,就会发现输出结果存在问题。
由于半包的读写会作为专门的小节在Netty的应用和源码分析章节进行详细讲解,在NIO的入门章节我们就不详细展开介绍,以便读者能够将注意力集中在NIO的入门知识上来。
下面的小节我们运行AIO版本的时间服务器程序,并通过打印线程堆栈的方式看下JDK回调异步Channel CompletionHandler的调用情况。