前言
前面我们对NIO的三大核心做了学习,这章我们来基于NIO来做一个聊天室案例。
聊天室案例
先来看下我们要实现的效果
对于服务端而言需要做如下事情
- selector监听客户端的链接
- 如果有“读”事件,就从通道读取数据
- 把数据转发给其他所有的客户端,要过滤掉发消息过来的客户端不用转发
对于客户端而言需要做如下事情
- selector监听服务端的“读”事件
- 如果有数据从通道中读取数据,打印到控制台
- 监听键盘输入,向服务端发送消息
服务端代码
publicclassGroupChatServer { //选择器privateSelectorselector ; //服务端通道ServerSocketChannelserverSocketChannel ; //初始化服务端publicGroupChatServer(){ try { //创建选择器selector=Selector.open(); //创建通道serverSocketChannel=ServerSocketChannel.open(); //绑定监听端口serverSocketChannel.bind(newInetSocketAddress("127.0.0.1",5000)); //配置为异步serverSocketChannel.configureBlocking(false); //注册通道到选择器serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (Exceptione) { e.printStackTrace(); } } //处理客户端监听publicvoidlisten(){ //轮询监听while(true){ //监听try { if(selector.select() >0){ //监听到事件,获取到有事件的keySet<SelectionKey>selectionKeys=selector.selectedKeys(); Iterator<SelectionKey>iterator=selectionKeys.iterator(); while(iterator.hasNext()){ SelectionKeyselectionKey=iterator.next(); //如果是“接收就绪”if(selectionKey.isAcceptable()){ //注册通道SocketChannelchannel=serverSocketChannel.accept(); //设置为异步channel.configureBlocking(false); //注册通道,监听类型设置为:“读” ,并指定装数据的Bufferchannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024)); System.out.println("客户端:"+channel.getRemoteAddress()+" 上线啦..."); } //如果是“读就绪”if(selectionKey.isReadable()){ //读取数据 , 转发数据给其他客户readAndForwardMessage(selectionKey); } //删除Key防止重复处理iterator.remove(); } }else{ System.out.println("等待客户端加入..."); } } catch (IOExceptione) { e.printStackTrace(); } } } //读取数据publicvoidreadAndForwardMessage(SelectionKeyselectionKey){ SocketChannelchannel=null; try { //得到通道channel= (SocketChannel) selectionKey.channel(); //异步channel.configureBlocking(false); //准备缓冲区ByteBufferbyteBuffer=ByteBuffer.allocate(1024); //把数据读取到缓冲区channel.read(byteBuffer); //打印结果Stringmessage=channel.getRemoteAddress()+" : "+newjava.lang.String(byteBuffer.array()); System.out.println(message); //转发消息forwardToOthers(channel , message); } catch (IOExceptione) { e.printStackTrace(); try { //出现异常,关闭通道selectionKey.channel(); if(channel!=null){ channel.close(); } } catch (IOExceptionex) { ex.printStackTrace(); } } } //转发数据publicvoidforwardToOthers(SocketChannelself,Stringmessage){ //转发给所有通道Set<SelectionKey>keys=selector.keys(); Iterator<SelectionKey>iterator=keys.iterator(); while(iterator.hasNext()){ //拿到某一个客户端的keySelectionKeyselectionKey=iterator.next(); SelectableChannelselectableChannel=selectionKey.channel(); //排除当前发消息的客户端不用转发if(selectableChannelinstanceofSocketChannel&&selectableChannel!=self){ //转发SocketChannelchannel= (SocketChannel) selectableChannel; try { channel.write(ByteBuffer.wrap(message.getBytes())); } catch (IOExceptione) { e.printStackTrace(); } } } } publicstaticvoidmain(String[] args) { newGroupChatServer().listen(); } }
客户端代码
publicclassGroupChatClient { //选择且privateSelectorselector ; //服务端通道SocketChannelsocketChannel ; //初始化服务端publicGroupChatClient(){ try { //创建选择器selector=Selector.open(); //创建通道socketChannel=SocketChannel.open(newInetSocketAddress("127.0.0.1",5000)); //配置为异步socketChannel.configureBlocking(false); //注册通道到选择器,客户端监听读事件socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getLocalAddress()+":准备完毕"); } catch (Exceptione) { e.printStackTrace(); } } //发送消息publicvoidsendMessage(Stringmessage){ try { socketChannel.write(ByteBuffer.wrap(message.getBytes())); } catch (IOExceptione) { e.printStackTrace(); } } //处理客户端监听publicvoidreadMessage(){ while (true){ try { if(selector.select() >0){ //监听到有事件Iterator<SelectionKey>iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKeykey=iterator.next(); //如果是可读事件if(key.isReadable()){ //获取通道SocketChannelchannel= (SocketChannel) key.channel(); channel.configureBlocking(false); //读取通道数据ByteBufferbyteBuffer=ByteBuffer.allocate(1024); channel.read(byteBuffer); //打印内容System.out.println(newString(byteBuffer.array())); } //防止重复操作iterator.remove(); } }else{ try { Thread.sleep(3000); } catch (InterruptedExceptione) { e.printStackTrace(); } } } catch (IOExceptione) { //e.printStackTrace();System.out.println("服务器关闭..."); } } } publicstaticvoidmain(String[] args) { GroupChatClientgroupChatClient=newGroupChatClient(); //新开线程,专门处理服务端发送过来的消息,不然会阻塞主线程newThread(()->{ //读取消息groupChatClient.readMessage(); }).start(); //写消息 , 键盘输入Scannerscanner=newScanner(System.in); while(scanner.hasNextLine()){ Stringmessage=scanner.nextLine(); //发消息groupChatClient.sendMessage(message); } } }
文章结束希望对你有所帮助