本博客是《Netty权威指南》的读书笔记,如有错误环境指正、探讨,谢谢!此书源码见附件。
此博客涉及的代码地址:https://gitee.com/wuzhengfei/great-truth;参考com.wzf.greattruth.aio包中的代码。
IO模型参考:https://yq.aliyun.com/articles/277102
JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,此版本正式提供了异步IO操作,即AIO。
1. AIO服务端序列
2. AIO服务端序列分析
1)打开AsynchronousServerSocketChannel
asynServerSocketChannel = AsynchronousServerSocketChannel.open();
2) 绑定监听地址InetSocketAddress
asynServerSocketChannel.bind(new InetSocketAddress(port));
3) 创建线程并启动
AsyncServerHandler timeServer = new AsyncServerHandler(port);
new Thread(timeServer, "AIOServerHandler").start();
4)注册接收数据的Handler
asynServerSocketChannel.accept(this, new ServerAcceptCompletionHandler());
5)接收数据,实现ServerAcceptCompletionHandler的completed、failed方法
public void completed(AsynchronousSocketChannel channel, AsyncServerHandler attachment) {
/**
* 为什么需要再次调用accept方法?
* 因为如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler示例的complete方法,表示新的客户端接入成功
* 因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用他的accept方法,
* 接收其他客户端连接,最终形成一个循环。每当接收一个客户连接成功后,再异步接收新的客户端连接
*
*/
attachment.asynServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
/**
* ByteBuffer:接收缓冲区,用于从异步的Channel中读取数据包
* Attachment:异步Channel携带的附件,通知回调的时候作为入参使用
* CompletionHandler:接收通知回调的业务Handler
*/
channel.read(buffer, buffer, new ServerReadCompletionHandler(channel));
}
public void failed(Throwable exc, AsyncServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
6)读取数据,实现ServerReadCompletionHandler的Complete、faild方法
public void completed(Integer result, ByteBuffer attachment) {
// handler with data
}
public void failed(Throwable exc, ByteBuffer attachment) {
this.channel.close();
}
7)decode数据
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
String req = new String(body, "UTF-8");
8)异步写数据到Channel
byte[] bytes = (response).getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果没有发送完成,继续发送
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
3.AIO客户端序列
4. AIO客户端序列分析
1)打开AsynchronousSocketChannel
asynSocketChannel = AsynchronousSocketChannel.open();
2)异步连接服务器
asynSocketChannel.connect(new InetSocketAddress(host, port), this, this);
3)创建线程并启动
AsyncClientHandler asyncClientHandler = new AsyncClientHandler("127.0.0.1", port);
new Thread(asyncClientHandler, "AIOClientHandler").start();
4)注册连接Server成功的Handler
ClientConnectCompletionHandler connectCompletionHandler = new ClientConnectCompletionHandler(asynSocketChannel, latch) ;
asynSocketChannel.connect(new InetSocketAddress(host, port), connectCompletionHandler, connectCompletionHandler);
5)连接成功后,注册向Server写数据的Handler,实现Completed、Failed方法
public void completed(Void result, AsyncClientHandler attachment) {
byte[] req = "timestemp".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
ClientWriteCompletionHandler writeCompletionHandler = new ClientWriteCompletionHandler(
attachment.asynSocketChannel, attachment.latch);
attachment.asynSocketChannel.write(writeBuffer, writeBuffer, writeCompletionHandler);
}
@Override
public void failed(Throwable exc, AsyncClientHandler attachment) {
exc.printStackTrace();
try {
attachment.asynSocketChannel.close();
attachment.latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
6)向Server发送数据,实现CompletionHandler的Completed、faild方法
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
asynSocketChannel.write(buffer, buffer, this);
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
try {
asynSocketChannel.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
7)注册读数据的Handler,实现completed、failed方法
asynSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
asynSocketChannel.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
8)读取并decode数据
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body = new String(bytes, "UTF-8");