Netty 基础 java NIO
Selector
三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键
本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。
Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗? 可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次
SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件
使用 NIO 实现 server与 clinet 通信(没有使用 selector的)
服务端
public static void main(String[] args) throws Exception { //创建服务端通道,用 open 获取 ServerSocketChannel ServerChannel = ServerSocketChannel.open(); //设置 ip 和端口号 SocketAddress address = new InetSocketAddress("127.0.0.1", 4321); //绑定到 服务通道 的 socket ServerChannel.socket().bind(address); //等待客户端连接 SocketChannel socketChannel = ServerChannel.accept(); // 处理数据 通过 buffer 来 ByteBuffer writebuffer = ByteBuffer.allocate(128); writebuffer.put("hello client i am server".getBytes()); writebuffer.flip(); socketChannel.write(writebuffer); // 读取客户端的数据 ByteBuffer readbuffer = ByteBuffer.allocate(128); StringBuffer stringBuffer = new StringBuffer(); socketChannel.read(readbuffer); readbuffer.flip(); while (readbuffer.hasRemaining()) { stringBuffer.append((char) readbuffer.get()); } System.out.println("client data :" + stringBuffer.toString()); socketChannel.close(); ServerChannel.close(); }
客户端
public static void main(String[] args) throws Exception { //开启一个 socket 通道 SocketChannel socketChannel = SocketChannel.open(); //设置 ip 和端口号 SocketAddress address = new InetSocketAddress("127.0.0.1", 4321); //连接 这个 address socketChannel.connect(address); // 先写后读 // 处理数据 通过 buffer 来 ByteBuffer writebuffer = ByteBuffer.allocate(128); writebuffer.put("hello server i am client ".getBytes()); writebuffer.flip(); socketChannel.write(writebuffer); // 读取客户端的数据 ByteBuffer readbuffer = ByteBuffer.allocate(128); StringBuffer stringBuffer = new StringBuffer(); socketChannel.read(readbuffer); readbuffer.flip(); while (readbuffer.hasRemaining()) { stringBuffer.append((char) readbuffer.get()); } System.out.println("server data :" + stringBuffer.toString()); socketChannel.close(); }
这里我们并没有使用 selector 接下来我们编写一个 Nio selector server 来对比学习
Nio selector server
SelectionKey 中我们常用判断的几种操作类型
isAcceptable() : 连接
isConnectable() : 就绪
isReadable() : 读取
isWritable() : 写入
代码执行和编写细节 : 见注释
public static void main(String[] args) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置 ip 和端口号 SocketAddress address = new InetSocketAddress("127.0.0.1", 4321); //绑定到 服务通道 的 socket serverSocketChannel.socket().bind(address); // 将这个 channel 设置成非阻塞的 serverSocketChannel.configureBlocking(false); // 打开一个选择器 Selector selector = Selector.open(); // 将通道注册到选择其中 声明选择器监听事件 // 通常来说,我们监听的是 连接事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); /* * 通过选择器来管理通道 * 需要感知, 被监听的通道 有没有事件触发 * 当 select 方法 返回值 >0 的时候就代表 当前有多少个操作要处理 * 所以我们需要一直轮询 它是否有时间要处理 * */ while (true) { int ready = selector.select(); if (ready == 0) { continue; } // 通过 Selected keys 获取到操作集合 Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> iterator = set.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 为了 避免重复操作 我们处理一个就移出一个 iterator.remove(); //我们通过 key 中的方法 来判断要处理什么操作 /* * isAcceptable() : 连接 * isConnectable() : 就绪 * isReadable() : 读取 * isWritable() : 写入 * */ if (key.isAcceptable()) { //处理 accpt 事件 //获得客户端连接 并且注册 写事件 SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); // 处理完 连接状态之后 后续将写操作加入到选择其中 socketChannel.register(selector, SelectionKey.OP_WRITE); } else if (key.isWritable()) { // 我们处理完连接之后,可以用key来获得对应的事件通道 SocketChannel socketChannel = (SocketChannel) key.channel(); //处理 write 事件 ByteBuffer writebuffer = ByteBuffer.allocate(128); writebuffer.put("hello client i am server from 4321".getBytes()); writebuffer.flip(); socketChannel.write(writebuffer); //我们可以通过 key 来注册接下来发生在这个通道的事件 key.interestOps(SelectionKey.OP_READ); } else if (key.isReadable()) { //处理 read 事件 SocketChannel socketChannel = (SocketChannel) key.channel(); // 读取客户端的数据 ByteBuffer readbuffer = ByteBuffer.allocate(128); //读取数据 int read = socketChannel.read(readbuffer); //read = -1 代表着已经读完了 if (read == -1) { key.cancel(); } //刷新 buffer readbuffer.flip(); //用string buffer 来拼接读取的数据 StringBuffer stringBuffer = new StringBuffer(); while (readbuffer.hasRemaining()) { stringBuffer.append((char) readbuffer.get()); } System.out.println("client data :" + stringBuffer.toString()); } else if (key.isConnectable()) { } } } }
【使用方式】
a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set<SelectionKey> , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ