前言
上一章节我们理解了Java NIO三大核心,以及重点讲解了Buffer的原理和几个使用场景,其中也用到了channel。这一章我们来理解一下selector,结合channel来做一个c/s通信。
理解Selector 和 Channel
Selector 选择器,也叫多路复用器,可以同时处理多个客户端连接,多路复用器采用轮询机制来选择有读写事件的客户端链接进行处理。
通过 Selector ,一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
由于它的读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
这里在整理一下它的工作原理,如图:
Selector :多路复用器(也叫选择器)的作用就是提供给SocketChannel通道来注册,然后Selector会轮询的去监听通道通道的读写事件从而做出相应的IO处理,它 的工作流程如下:
- 首先需要创建一个ServerSocketChannel,它类似于(ServerSocket)需要指定一个监听的IP和Port。需要注意的是ServerSocketChannel为了兼容BIO,默认是阻塞的,可以通过ServerSocketChannel#configureBlocking(false)来指定为NIO模式。
通过ServerSocketChannel 可以获取一个客户端的SocketChannel ,客户端的SocketChannel 需要注册到Selector上,然后每个通道都会对应一个SelectionKey - 选择器可以通过Selector.open() 创建,然后将 ServerSocketChannel 注册给Selector 。选择器的 Selector#select() 方法 可以选择有事件的通道(SocketChannel ),并返回已就绪的通道数量.事件类型包括:“连接”,“接收” ,“读”,“写”。
- 如果 Selector#select() 返回值大于0代表某些通道有事件发生,可以通过 selector.selectedKeys() 来得到所有有事件通道的SelectionKey。
然后可以通过SelectionKey方向拿到SocketChannel , 从而将SocketChannel 中的数据读取到Buffer中,完成IO操作。
ServerSocketChannel
ServerSocketChannel 是服务端用来用来监听客户端Socket链接,通过accept方法可以获取客户端SocketChannel,从而将 SocketChannel 注册到Selector,相关方法如下
- open : 创建一个 ServerSocketChannel 通道
- bind(SocketAddress local):设置服务器端监听的地址和端口号
- configureBlocking(boolean block) : 设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
- accept() :接受一个连接,返回代表这个连接的通道对象SocketChannel
- register(Selector sel, int ops): 把当前通道注册到选择器,并设置监听事件
SocketChannel
一个客户端链接服务端就会产生通道:ServerChannel ,需要注册到Selector,被Selector监听通道的读写事件。ServerChannel 负责具体的读写,把缓冲区的数据写入通道,或者把通道中的数据写入缓冲区。
- open() : 得到一个 SocketChannel 通道
- configureBlocking(boolean block):设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
- connect(SocketAddress remote) :连接远程服务器,通过SocketAddress 指定IP和端口
- finishConnect(): 如果connect连接失败,就要通过finishConnect方法完成连接操作
- write(ByteBuffer src) : 把ByteBuffer中的数据,往通道里写
- read(ByteBuffer dst) :从通道里读数据,写入ByteBuffer
- register(Selector sel, int ops, Object att): 把该通道注册到selector,并设置监听事件(OPS),最后一个参数可以设置共享数据,该方法会返回一个 SelectionKey ,这个key对应了该通道。
- close() :关闭通道
SelectionKey
当 ServerChannel 注册到Selector就会产生SelectionKey , 通过SelectionKey可以反向获得SocketChannel通道对象,从而进行IO读写操作。
- selector(): 通过SelectionKey 获取得到与之关联的 Selector 对象
- channel():得到与之关联的通道
- attachment():得到与之关联的共享数据
- interestOps(int ops):设置或改变监听事件
- isAcceptable():是否可以 accept,“接收就绪”事件
- isReadable():是否可以读
- isWritable():是否可以写
事件包括:
SelectionKey.OP_CONNECT : 值16,连接就绪,比如:一个channel连接到一个服务器SelectionKey.OP_ACCEPT:值8,接收就绪,比如:ServerSocketChannel准备好接入新的连接SelectionKey.OP_READ:值1,“读就绪”,通道有可以读数据可以说是SelectionKey.OP_WRITE:值4,“写就绪”,等待写数据的通道可以说
Selector
负责采用轮询方式监听通道,当通道有读写事件就进行IO操作。
- open() :得到一个选择器对象
- select(long timeout):选择有事件的通道,将其对应的 SelectionKey 加入到内部集合中,返回通道的数量
- selectedKeys() : 从内部集合中得到所有有事件 SelectionKey
- keys() : 从内部集合中得到所有SelectionKey
- close:关闭选择器
综合案例
使用selector ,ServerSocketChannel,SocketChannel完成一个 C/S 案例,
服务端代码
//通道publicvoidserverSocketChannelTest() throwsIOException { //创建服务端通道ServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); //socket监听地址和端口SocketAddresssocketAddress=newInetSocketAddress("127.0.0.1",5000); //和某个SocketAddress绑定serverSocketChannel.bind(socketAddress); //NIO默认采用阻塞,为了兼容BIOserverSocketChannel.configureBlocking(false); //创建选择器Selectorselector=Selector.open(); //通道注册到选择器,事件类型为:OP_ACCEPT “接受”serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //==选择器轮询=======================================================================while(true){ //select,选择有事件的通道,返回有事件发生通道的key的个数 ,超时时间 1sif(selector.select(1000) ==0){ System.out.println("无连接...轮询等待...\");continue; } //有事件发生,得到有事件的通道的key的集合Set<SelectionKey>selectionKeys=selector.selectedKeys(); Iterator<SelectionKey>iterator=selectionKeys.iterator(); //遍历key的集合while (iterator.hasNext()){ //拿到每个通道的keySelectionKeykey=iterator.next(); //如果当前通道事件是: OP_ACCEPT ,就注册通道if(key.isAcceptable()){ //接收一个socketChannelSocketChannelsocketChannel=serverSocketChannel.accept(); System.out.println("客户端链接成功..."); socketChannel.configureBlocking(false); //把socketChannel注册到选择器 ,并给通道绑定一个buffersocketChannel.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024)); } //如果通道事件是: OP_READ,说明通道有数据if(key.isReadable()){ //通过key得到SocketChannelSocketChannelchannel= (SocketChannel)key.channel(); //得到channel绑定的bufferByteBufferbyteBuffer= (ByteBuffer)key.attachment(); //从通道把数据读取到bufferchannel.read(byteBuffer); System.out.println(newString(byteBuffer.array())); } //删除当前keyiterator.remove(); } } }
客户端代码
//通道publicvoidsocketChannelTest() throwsIOException { //创建一个SocketChannelSocketChannelsocketChannel=SocketChannel.open(); //使用非阻塞模式socketChannel.configureBlocking(false); //链接的地址和端口InetSocketAddressinetSocketAddress=newInetSocketAddress("127.0.0.1", 5000); //尝试链接,如果使用的异步,那么需要使用 socketChannel.finishConnect() 来确保连接成功。if(!socketChannel.connect(inetSocketAddress)){ //如果没链接成功,会通过while循环,直到 finishConnect 链接成功,跳出whilewhile(!socketChannel.finishConnect()){ System.out.println("还未完成链接...等待中..."); } } //链接成功,把数据写出去socketChannel.write(ByteBuffer.wrap("你好".getBytes())); System.out.println("向服务端发送数据..."); //防止客户端结束,所以使用read()阻塞System.in.read(); }
总结
本篇文章介绍了Selector ,ServerSocketChannel ,SocketChannel的作用,场景API,和三种的工作原理,并通过一个C/S综合案例来演示三者之间的关系。