前面我们已经简单的学习了channel,知道channel作为通道,可以在通道中进行读写操作,同时知道ByteChannel是双向的。对于NIO的优势在于多路复用选择器上,在Nginx、Redis、Netty中都有多路复用的体现。因此学习Selector是有必要的。
1.使用多路复用选择器的方式
/*** selector 选择器 多路复用,选择器结合selectable-channel实现非阻塞效果,提高效率* 可以将通道注册进选择器中,其主要注意是使用一个线程来对多个通道中的已就绪进行选择,然后就可以对选择* 的通道进行数据处理,属于一对多的关系*/publicclassSelectorTest { publicstaticvoidmain(String[] args) throwsIOException { //创建serverSocketChannel对象ServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); //设置websocket通道为非阻塞方式serverSocketChannel.configureBlocking(false); //获取websocketServerSocketserverSocket=serverSocketChannel.socket(); //进行绑定操作serverSocket.bind(newInetSocketAddress("localhost", 8888)); //核心代码开始Selectorselector=Selector.open(); SelectionKeykey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //核心代码结束System.out.println("selector="+selector); System.out.println("key="+key); serverSocket.close(); serverSocketChannel.close(); } }
通常的步骤是:打开ServerSocket通道,然后将通道配置成非阻塞模式,同时拿到socket进行绑定操作。然后打开选择器,将通道注册到选择器中,进行业务处理操作,然后关闭socket,如果需要长连接,此时就不关闭了。
2.判断当前是否向任何选择器进行了注册
/*** 判断注册的状态:判断当前是否向任何选择器进行了注册。可以看到新创建的通道总是未注册的*/publicclassSelectorTest1 { publicstaticvoidmain(String[] args) throwsIOException { //打开serverSocket通道,同时设置为非阻塞,拿到serverSocket,进行ip和端口绑定//将选择器打开,将选择器key进行注册,关闭socket和socket通道ServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); //需要部分,通常需要将其设置为非阻塞ServerSocketserverSocket=serverSocketChannel.socket(); serverSocket.bind(newInetSocketAddress("localhost", 8888)); System.out.println("A isRegistered="+serverSocketChannel.isRegistered()); Selectorselector=Selector.open(); SelectionKeykey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("B isRegistered="+serverSocketChannel.isRegistered()); serverSocket.close(); serverSocketChannel.close(); } }
3.获取支持的socketOption列表
/*** 获取支持的socketOption列表* Set<SocketOption<?> supportedOption()方法:返回通道支持的Socket Option*/publicclassSelectorTest2 { publicstaticvoidmain(String[] args) throwsIOException { Threadt=newThread() { publicvoidrun() { try { Thread.sleep(2000); Socketsocket=newSocket("localhost", 8088); socket.close(); } catch (Exceptione) { e.printStackTrace(); } } }; t.start(); ServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.bind(newInetSocketAddress("localhost", 8088)); SocketChannelsocketChannel=serverSocketChannel.accept(); Set<SocketOption<?>>set1=serverSocketChannel.supportedOptions(); Set<SocketOption<?>>set2=socketChannel.supportedOptions(); Iteratoriterator1=set1.iterator(); Iteratoriterator2=set2.iterator(); System.out.println("ServerSocketChannel supportedOptions:"); while (iterator1.hasNext()) { SocketOptioneach= (SocketOption) iterator1.next(); System.out.println(each.name() +" "+each.getClass().getName()); } System.out.println(); System.out.println(); System.out.println("SocketChannel supportedOptions:"); while (iterator2.hasNext()) { SocketOptioneach1= (SocketOption) iterator2.next(); System.out.println(each1.name() +" "+each1.getClass().getName()); } socketChannel.close(); serverSocketChannel.close(); } }
4.进行socket地址获取、设置阻塞模式
/*** 进行socket地址获取、设置阻塞模式*/publicclassSocketAddressTest { publicstaticvoidmain(String[] args) throwsIOException { ServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.bind(newInetSocketAddress("localhost",8888)); InetSocketAddressaddress= (InetSocketAddress)serverSocketChannel.getLocalAddress(); //获取ip和端口System.out.println(address.getHostString()); System.out.println(address.getPort()); //查看阻塞模式System.out.println(serverSocketChannel.isBlocking()); serverSocketChannel.configureBlocking(false); System.out.println(serverSocketChannel.isBlocking()); //获取选择器Selectorselector=Selector.open(); SelectionKeyselectionKey=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); System.out.println("A = "+selectionKey+" "+selectionKey.hashCode()); SelectionKeyselectionKey1=serverSocketChannel.keyFor(selector); System.out.println("B = "+selectionKey1.hashCode()); serverSocketChannel.close(); } }
5.SelectionKey不是同一个对象
/*** 相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象*/publicclassSelectorKeyDemo { publicstaticvoidmain(String[] args) throwsIOException { //相同的通道可以注册不同的选择器,返回的SelectionKey不是同一个对象selectionKeyTest1(); selectionKeyTest2(); } privatestaticvoidselectionKeyTest1() throwsIOException { //打开ServerSocketChannelServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); //进行ip和端口绑定serverSocketChannel.bind(newInetSocketAddress("localhost",8888)); //配置非阻塞状态serverSocketChannel.configureBlocking(false); //打开选择器Selectorselector1=Selector.open(); Selectorselector2=Selector.open(); //将通道注册到选择器中,返回keySelectionKeyselectionKey1=serverSocketChannel.register(selector1,SelectionKey.OP_ACCEPT); System.out.println("SelectionKey1="+selectionKey1.hashCode()); SelectionKeyselectionKey2=serverSocketChannel.register(selector2,SelectionKey.OP_ACCEPT); System.out.println("SelectionKey2="+selectionKey2.hashCode()); serverSocketChannel.close(); } //不同的通道注册到相同的选择器中,返回的SelectionKey不是同一个对象privatestaticvoidselectionKeyTest2() throwsIOException { ServerSocketChannelserverSocketChannel1=ServerSocketChannel.open(); serverSocketChannel1.bind(newInetSocketAddress("localhost",8888)); serverSocketChannel1.configureBlocking(false); ServerSocketChannelserverSocketChannel2=ServerSocketChannel.open(); serverSocketChannel2.bind(newInetSocketAddress("localhost",8888)); serverSocketChannel2.configureBlocking(false); Selectorselector=Selector.open(); SelectionKeyselectionKey1=serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT); System.out.println("SelectionKey1="+selectionKey1.hashCode()); SelectionKeyselectionKey2=serverSocketChannel2.register(selector,SelectionKey.OP_ACCEPT); System.out.println("SelectionKey2="+selectionKey2.hashCode()); serverSocketChannel1.close(); serverSocketChannel2.close(); } }
6.获取selectorProvider
/*** 获取selectorProvider*/publicclassSelectorProviderTest { publicstaticvoidmain(String[] args) throwsIOException { SelectorProviderselectorProvider=SelectorProvider.provider(); System.out.println(selectorProvider); ServerSocketChannelserverSocketChannel=null; serverSocketChannel=serverSocketChannel.open(); SelectorProviderprovider=SelectorProvider.provider(); System.out.println(provider); serverSocketChannel.close(); } }
学习了Selector,我们来学习应答模式案例
BIO模式下的客户端:
/*** BIO服务端*/publicclassBIOServer { publicstaticvoidmain(String[] args) throwsIOException { //创建一个ServerSocket对象,带端口ServerSocketserverSocket=newServerSocket(8888); while(true){ //监听客户端,阻塞Socketsocket=serverSocket.accept(); //从serverSocket中拿到输入流,进行消息的接收,阻塞InputStreamis=socket.getInputStream(); byte[] b=newbyte[20]; is.read(b); StringclientIp=socket.getInetAddress().getHostAddress(); System.out.println(clientIp+"说:"+newString(b).trim()); //从serverScoket中拿到输出流,进行消息的响应OutputStreamos=socket.getOutputStream(); os.write("你好,客户端".getBytes()); //关闭socketsocket.close(); } } }
BIO模式下的客户端
/*** BIO客户端*/publicclassBIOClient { publicstaticvoidmain(String[] args) throwsIOException { while (true){ //创建客户端socketSocketsocket=newSocket("localhost",8888); //从客户端socket中拿到输出流,进行消息发送OutputStreamos=socket.getOutputStream(); System.out.println("输入信息:"); //你好,服务端Scannersc=newScanner(System.in); Stringmsg=sc.nextLine(); os.write(msg.getBytes()); //从客户端socket中拿到输入流,进行消息回复InputStreamis=socket.getInputStream(); byte[] b=newbyte[20]; is.read(b); System.out.println("服务端说:"+newString(b).trim()); } } }
运行:客户端输入
可以看到服务端
NIO的服务端
/*** NIO服务端*/publicclassNIOServer { publicstaticvoidmain(String[] args) throwsIOException { //开启ServerScoketChannelServerSocketChannelserverSocketChannel=ServerSocketChannel.open(); //开启selectorSelectorselector=Selector.open(); //绑定端口号serverSocketChannel.bind(newInetSocketAddress(8888)); //设置非阻塞模式serverSocketChannel.configureBlocking(false); //将serverSocketChannel对象注册给Selector对象serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //进行操作while(true){ //如果在限定时间没有客户端的请求,则进行别的操作if(selector.select(2000)==0){ System.out.println("server:没有客户端信息需要处理,做别的事情"); continue; } //拿到所以的selectionkey,进行迭代,获取SelectorKey,判断通道里的时间Iterator<SelectionKey>keyIterator=selector.selectedKeys().iterator(); while (keyIterator.hasNext()){ SelectionKeykey=keyIterator.next(); //可接收if(key.isAcceptable()){ System.out.println("OP_ACCEPT"); SocketChannelsocketChannel=serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } //可读if (key.isReadable()){ SocketChannelchannel= (SocketChannel) key.channel(); ByteBufferbuffer= (ByteBuffer) key.attachment(); channel.read(buffer); System.out.println("客户端发来请求:"+newString(buffer.array())); } //移除所有的keykeyIterator.remove(); } } } }
NIO的客户端
/*** NIO客户端*/publicclassNIOClient { publicstaticvoidmain(String[] args) throwsIOException { //开启网络通道SocketChannelchannel=SocketChannel.open();////设置非阻塞channel.configureBlocking(false); //绑定ip和端口InetSocketAddressaddress=newInetSocketAddress("localhost",8888); if(!channel.connect(address)){ while (!channel.finishConnect()){ System.out.println("连接服务器socket进行对话,做别的事情"); } //获取缓冲区并存入数据Stringmsg="hello,l'm Client"; ByteBufferwiterBuffer=ByteBuffer.wrap(msg.getBytes()); //发送数据信息channel.write(witerBuffer); System.in.read(); } } }
基于NIO的聊天:
服务器端
/*** 聊天室服务端*/publicclassChatServer { privateServerSocketChannellistenerChannel; //监听通道 老大privateSelectorselector;//选择器对象 间谍privatestaticfinalintPORT=9999; //服务器端口//构造方法publicChatServer() { try { // 1. 得到监听通道listenerChannel=ServerSocketChannel.open(); // 2. 得到选择器selector=Selector.open(); // 3. 绑定端口listenerChannel.bind(newInetSocketAddress(PORT)); // 4. 设置为非阻塞模式listenerChannel.configureBlocking(false); // 5. 将选择器绑定到监听通道并监听accept事件listenerChannel.register(selector, SelectionKey.OP_ACCEPT); printInfo("Chat Server is ready......."); } catch (IOExceptione) { e.printStackTrace(); } } //6.业务处理,首先匹配selectorkey的状态,是连接请求事件还是读取数据事件//如果是连接请求事件,则进行key的迭代,进行连接请求操作,否者进行数据的读取//读取完成或者请求之后,将selectorkey进行删除,避免重复处理publicvoidstart() throwsException{ try { while (true) { //不停监控if (selector.select(2000) ==0) { System.out.println("Server:没有客户端找我, 我就干别的事情"); continue; } Iterator<SelectionKey>iterator=selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKeykey=iterator.next(); if (key.isAcceptable()) { //连接请求事件SocketChannelsc=listenerChannel.accept(); sc.configureBlocking(false); sc.register(selector,SelectionKey.OP_READ); System.out.println(sc.getRemoteAddress().toString().substring(1)+"上线了..."); } if (key.isReadable()) { //读取数据事件readMsg(key); } //一定要把当前key删掉,防止重复处理iterator.remove(); } } } catch (IOExceptione) { e.printStackTrace(); } } //读取客户端发来的消息并广播出去publicvoidreadMsg(SelectionKeykey) throwsException{ SocketChannelchannel=(SocketChannel) key.channel(); ByteBufferbuffer=ByteBuffer.allocate(1024); intcount=channel.read(buffer); if(count>0){ Stringmsg=newString(buffer.array()); printInfo(msg); //发广播broadCast(channel,msg); } } //给所有的客户端发广播publicvoidbroadCast(SocketChannelexcept,Stringmsg) throwsException{ System.out.println("服务器发送了广播..."); for(SelectionKeykey:selector.keys()){ ChanneltargetChannel=key.channel(); if(targetChannelinstanceofSocketChannel&&targetChannel!=except){ SocketChanneldestChannel=(SocketChannel)targetChannel; ByteBufferbuffer=ByteBuffer.wrap(msg.getBytes()); destChannel.write(buffer); } } } privatevoidprintInfo(Stringstr) { //往控制台打印消息SimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("["+sdf.format(newDate()) +"] -> "+str); } publicstaticvoidmain(String[] args) throwsException { newChatServer().start(); } }
客户端
//聊天程序客户端publicclassChatClient { privatefinalStringHOST="127.0.0.1"; //服务器地址privateintPORT=9999; //服务器端口privateSocketChannelsocketChannel; //网络通道privateStringuserName; //聊天用户名//构造方法publicChatClient() throwsIOException { //1. 得到一个网络通道socketChannel=SocketChannel.open(); //2. 设置非阻塞方式socketChannel.configureBlocking(false); //3. 提供服务器端的IP地址和端口号InetSocketAddressaddress=newInetSocketAddress(HOST,PORT); //4. 连接服务器端if(!socketChannel.connect(address)){ while(!socketChannel.finishConnect()){ //nio作为非阻塞式的优势System.out.println("Client:连接服务器端的同时,我还可以干别的一些事情"); } } //5. 得到客户端IP地址和端口信息,作为聊天用户名使用userName=socketChannel.getLocalAddress().toString().substring(1); System.out.println("---------------Client("+userName+") is ready---------------"); } //向服务器端发送数据publicvoidsendMsg(Stringmsg) throwsException{ if(msg.equalsIgnoreCase("bye")){ socketChannel.close(); return; } msg=userName+"说:"+msg; ByteBufferbuffer=ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buffer); } //从服务器端接收数据publicvoidreceiveMsg() throwsException{ ByteBufferbuffer=ByteBuffer.allocate(1024); intsize=socketChannel.read(buffer); if(size>0){ Stringmsg=newString(buffer.array()); System.out.println(msg.trim()); } } }
//启动聊天程序客户端 public class TestChat { public static void main(String[] args) throws Exception { ChatClient chatClient=new ChatClient(); new Thread(){ public void run(){ while(true){ try { chatClient.receiveMsg(); Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } } } }.start(); Scanner scanner=new Scanner(System.in); while (scanner.hasNextLine()){ String msg=scanner.nextLine(); chatClient.sendMsg(msg); } } }
启动运行:
客户端输入信息和服务端看到的信息