Java NIO 综合案例
通过 Java NIO 完成一个多人聊天室的案例:
服务端代码:
// 服务端 public class ChatServer { // 服务启动 public void startServer() throws IOException, InterruptedException { // 1、创建 Selector 选择器 Selector selector = Selector.open(); // 2、创建 ServerSocketChannel 通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 3、为 channel 通道绑定监听端口 serverSocketChannel.bind(new InetSocketAddress(25000)); // 设置非阻塞模式 serverSocketChannel.configureBlocking(false); // 4、 把 channel 注册到到 selector 选择器上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器已经启动成功了"); // 5、循环,等待新的连接介入 for (; ; ) { // 获取 channel 数量 int readChannels = selector.select(); if (readChannels == 0) { continue; } // 获取可用的 channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 移除 set 集合当前 selectionKey iterator.remove(); // 6、根据就绪状态,调用对应的方法实现具体的操作 // 6.1 如果 accept 状态 if (selectionKey.isAcceptable()) { acceptOperator(serverSocketChannel, selector); } // 6.2 如果可读状态 else if (selectionKey.isReadable()) { readOperator(selector, selectionKey); } } TimeUnit.SECONDS.sleep(1); } } // 处理可读状态操作 private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException { //1 从 selectionKey 获取已经就绪的通道 SocketChannel channel = (SocketChannel) selectionKey.channel(); //2 创建 buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //3 循环读取客户端发送过来的信息 int readLen = channel.read(buffer); String message = ""; if (readLen > 0) { buffer.flip(); // 读取内容 message += Charset.forName("UTF-8").decode(buffer); } //4 将 channel 再次注册到选择器上,监听可读状态。 channel.register(selector, SelectionKey.OP_READ); //5 把客户端发送的消息,广播到其他的客户端上 if (message != null && message.length() > 0) { // 广播到其他客户端 System.out.println("message: " + message); castOtherClient(message, selector, channel); } } // 广播到其他的客户端 private void castOtherClient(String message, Selector selector, SocketChannel channel) throws IOException { // 1 获取所有已经接入的客户端 Set<SelectionKey> keys = selector.keys(); // 2 循环向所有的 channel 广播消息 for (SelectionKey selectionKey : keys) { // 获取里面的每个通道 SelectableChannel otherChannel = selectionKey.channel(); // 不需要给自己发送 if (otherChannel instanceof SocketChannel && channel != otherChannel) { ((SocketChannel) otherChannel).write(Charset.forName("UTF-8").encode(message)); } } } // 处理接入状态操作 private void acceptOperator(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException { // 1 接入状态,状态 创建 socketChannel SocketChannel socketChannel = serverSocketChannel.accept(); // 2 把 socketChannel 设置为非阻塞模式 socketChannel.configureBlocking(false); // 3 把 channel 注册到 selector 选择器上,监听可读状态 socketChannel.register(selector, SelectionKey.OP_READ); // 4 客户端回复信息 socketChannel.write(Charset.forName("UTF-8").encode("欢迎进入聊天室!")); } public static void main(String[] args) throws IOException, InterruptedException { ChatServer chatServer = new ChatServer(); chatServer.startServer(); } }
客户端代码
// 客户端 // 客户端 public class ChatClient { // 启动客户端 public void startClient(String name) throws IOException { // 连接服务器 SocketChannel socketChannel = SocketChannel.open( new InetSocketAddress("127.0.0.1", 25000)); //接收服务端响应数据 Selector selector = Selector.open(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // 创建线线程 new Thread(new ClientThread(selector)).start(); // 向服务器发送消息 System.out.println("聊天室客户端启动成功!!"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.nextLine(); if (msg != null && msg.length() > 0) { socketChannel.write(Charset.forName("UTF-8").encode(name + " : " + msg)); } } // 接收服务器的消息 } } // 客户端处理线程 public class ClientThread implements Runnable { private Selector selector; public ClientThread(Selector selector) { this.selector = selector; } @Override public void run() { try { // 循环,等待新的连接介入 for (; ; ) { // 获取 channel 数量 int readChannels = 0; readChannels = selector.select(); if (readChannels == 0) { continue; } // 获取可用的 channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 移除 set 集合当前 selectionKey iterator.remove(); // 根据就绪状态,调用对应的方法实现具体的操作 // 如果可读状态 if (selectionKey.isReadable()) { readOperator(selector, selectionKey); } } // TimeUnit.SECONDS.sleep(1); } } catch (Throwable e) { e.printStackTrace(); } } // 处理可读状态操作 private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException { //1 从 selectionKey 获取已经就绪的通道 SocketChannel channel = (SocketChannel) selectionKey.channel(); //2 创建 buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //3 循环读取客户端发送过来的信息 int readLen = channel.read(buffer); String message = ""; if (readLen > 0) { buffer.flip(); // 读取内容 message += Charset.forName("UTF-8").decode(buffer); } //4 将 channel 再次注册到选择器上,监听可读状态。 channel.register(selector, SelectionKey.OP_READ); if (message.length() > 0) { // 输出 System.out.println("收到 message: " + message); } } }