通过聊天室项目的演化。介绍BIO的基本用法与优缺点。
- 提示: 注意阅读代码和注释。
# 提要:
- 第一版:
echo聊天室
- 服务器接收到客户端发送的消息,并打印
- 服务端将客户端发送的消息经过包装后再次发送给客户端
- 客户端断开连接
- eg:
- client:
greet from socket.
- server:
echo from server: <greet from socket.>
- 第二版:
群聊聊天室
- 服务器接收客户端发送的消息,并打印
- 服务端将客户端发送的消息转发给其他在线的客户端
- 客户端可一直保持在线状态
# 基础配置与工具类
- 基础配置
/** * 常量 * * @author futao * @date 2020/7/2 */ public class Constants { /** * 服务器端口 */ public static final int SERVER_PORT = 9507; /** * 关键字-客户端退出系统 */ public static final String KEY_WORD_QUIT = "quit"; /** * 使用的字符集编码 */ public static final Charset CHARSET = StandardCharsets.UTF_8; }
- IO工具类
/** * @author futao * @date 2020/7/2 */ public class IOUtils { /** * 从输入流中读取字符串 * * @param is 输入流 * @return 读取到的字符串 * @throws IOException */ public static String readString(InputStream is) throws IOException { //使用带有缓冲区的BufferInputStream以提高读取性能 BufferedInputStream bufferedInputStream = new BufferedInputStream(is); //从缓冲区中一次读取的数据 byte[] buffer = new byte[1024 * 4]; //读取的字符串 StringBuilder fullMessage = new StringBuilder(); //本次读取到的字节个数 int curBufferSize; //循环将数据写入缓冲区buffer,并返回读取到的字节个数。当前数据读取完毕会返回-1。 // curBufferSize这个参数的作用有两个 // 1. 判断是否读取到了流的末尾(==-1?) // 2. 缓冲区字节数组buffer可能并没有写满,只写了curBufferSize, // 那么我们只需要将字节数组中前面curBufferSize个字节转换成字符串就行。 while ((curBufferSize = bufferedInputStream.read(buffer)) != -1) { //将buffer中的数据转换成字符串,从buffer的第0个字节开始,读取curBufferSize个字节 fullMessage.append(new String(buffer, 0, curBufferSize)); } return fullMessage.toString(); } }
1. echo聊天室
- 需求描述:
- 服务器接收到客户端发送的消息,并打印
- 服务端将客户端发送的消息经过包装后再次发送给客户端
- 客户端断开连接
- eg:
- client:
greet from socket.
- server:
echo from server: <greet from socket.>
- 实现思路:
- 创建服务端
ServerSocket
并绑定所监听的端口 - 调用
serverSocket.accept()
阻塞监听客户端的接入 - 客户端接入后获取到客户端
Socket
,并将该Socket
上流的读写操作交给子线程去处理,主线程继续阻塞在accept()
监听客户端的接入,否则同一时刻只能有一个客户端接入。
- 服务器端
BioChatServer
/** * @author futao * @date 2020/7/2 */ public class BioChatServer { private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class); /** * 启动服务器 */ public void start() { //创建服务端ServerSocket,并监听端口 try (ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT)) { logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT); //循环accept()监听 while (true) { //accept()将阻塞,直到有客户端Socket接入。并在服务端创建一个Socket与其对应 Socket socket = serverSocket.accept(); logger.debug("客户端[{}]成功接入", socket.getPort()); //将获取到的客户端连接交给子线程去处理,不影响主线程继续监听,等待下一个客户端连接 new Thread(() -> { try ( //用于从客户端读取数据 InputStream inputStream = socket.getInputStream(); //用于将数据写给客户端 OutputStream outputStream = socket.getOutputStream() ) { //从输入流中读取数据 String fullMessage = IOUtils.readString(inputStream); logger.info("接收到客户端【{}】发来的消息[{}]", socket.getPort(), fullMessage); //编码与响应 outputStream.write(String.format("echo from server: <%s>", fullMessage).getBytes(Constants.CHARSET)); } catch (IOException e) { logger.error("客户端异常", e); } }).start(); } } catch (IOException e) { logger.error("服务器启动失败", e); return; } } public static void main(String[] args) { new BioChatServer().start(); } }
- 客户端
BioChatClient
/** * @author futao * @date 2020/7/2 */ public class BioChatClient { private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class); /** * 启动客户端 */ private void start() { try ( //尝试连接到服务器 Socket socket = new Socket("localhost", Constants.SERVER_PORT); //获取到输入流 InputStream inputStream = socket.getInputStream(); //输出流 OutputStream outputStream = socket.getOutputStream() ) { logger.debug("========== 成功连接到聊天服务器 =========="); //获取到用户输入的字符串 String userInputStr = new Scanner(System.in).nextLine(); //将字符串转换成字节,写入输出流 outputStream.write(userInputStr.getBytes(Constants.CHARSET)); //刷新缓冲区 outputStream.flush(); //【重要】关闭输出流,通知服务器客户端消息已经发送完毕 socket.shutdownOutput(); //读取服务端的响应 logger.info("接收到消息:[{}]", IOUtils.readString(inputStream)); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new BioChatClient().start(); } }
- 需要注意的代码为
BioChatClient.start()
的socket.shutdownOutput();
,如果不将客户端socket
对应的输出流关闭,服务器端将不知道客户端消息是否发送完毕,服务端会一直阻塞在inputstream.read()
。 - 测试
- 启动服务端
- 分别启动两个客户端,向服务端发送消息
两个客户端分别发送了一条消息,并接收到了服务器的响应。
- 通过服务器端日志可以看出
- 客户端接入事件都是在主线程
main
线程上发生的。 - 而客户端消息的收发都是在新的子线程上发生的。而且每一个连接都需要一个全新的线程来处理。
- 通过对线程运行状态的分析也可以看出,子线程在完成消息读取和发送之后立马就销毁了
- 缺点:
- 客户端每次接入只能发送一条消息就下线了,无法保持长期在线。
- 每有一个客户端接入,就需要创建一个线程,如果有大量客户端接入,将对服务器产生较大压力。
- 且每个创建的线程只执行了非常少量的任务就被销毁了,对资源的消耗比较大。
2. 群聊聊天室
- 服务器接收客户端发送的消息,并打印
- 服务端将客户端发送的消息转发给其他在线的客户端
- 客户端可一直保持在线状态
2.1 常规思路
- 服务端
客户端
- 但是测试下来会发现,如果客户端不断开连接,服务端将一直阻塞在
Inputstream.read()
,因为服务端根本不知道客户端的数据已经发送完毕了。 - 所以现在的问题是:如何告知对方数据已经发送完毕?
# 如何告知对方数据已经发送完毕?
客户端打开一个输出流,如果不做约定,也不关闭它,那么服务端永远不知道客户端是否发送完消息,那么服务端会一直等待下去,直到读取超时。
- 关闭Socket连接。
- 缺点: 客户端Socket关闭后,将不能接受服务端发送的消息,也不能再次发送消息。
- 关闭Socket的输出流(而不是Socket)
- 参照Echo聊天室的实现
- 关闭输出流
socket.shutdownOutput();
而不是outputStream.close();
,这样还能继续监听从服务端响应的输入流。
- 缺点:还是不能再次发送消息给服务端。
- 通过约定的符号
- 通过
Writer.writeLine()/Reader.readLine()
Reader.readLine()
将会在读取到回车\r
,换行\n
或者回车紧跟着换行\r\n
时返回读取到的数据。
- 通过指定长度告知对方已发送完命令
- 先在输出流的第一个字节写入本次传输将会传递的数据的字节大小,接收方在获取到这个值之后,从输入流中读取指定个数的字节即可。
2.2 通过约定的符号\r\n
,标识消息发送完毕。
- 服务端代码
/** * 使用标记符号的方式通知消息发送完毕 * * @author futao * @date 2020/7/2 */ public class BioChatServer { private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class); private static final Set<Socket> CLIENT_SOCKET_SET = new HashSet<Socket>() { @Override public synchronized boolean add(Socket o) { return super.add(o); } @Override public synchronized boolean remove(Object o) { return super.remove(o); } }; public void start() { //创建服务端ServerSocket,并监听端口 try (ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT)) { logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT); //循环accept()监听 while (true) { //accept()将阻塞,直到有客户端Socket接入。并在服务端创建一个Socket与其对应 Socket socket = serverSocket.accept(); logger.debug("客户端[{}]上线", socket.getPort()); CLIENT_SOCKET_SET.add(socket); //将获取到的客户端连接交给子线程去处理,不影响主线程继续监听,等待下一个客户端连接 new Thread(() -> { try { //用于从客户端读取数据(将字节流转换成字符流) BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream()))); while (true) { //从输入流中读取数据 String message = bufferedReader.readLine(); if (StringUtils.isNotBlank(message)) { logger.info("接收到客户端【{}】发来的消息[{}]", socket.getPort(), message); } else { isQuit(message, socket); } //判断是否为下线 boolean isQuit = isQuit(message, socket); //转发消息 if (!isQuit) { forwardMessage(socket.getPort(), String.format("<from %s>", socket.getPort()) + message); } else { break; } } } catch (IOException e) { logger.error("客户端异常", e); } }).start(); } } catch (IOException e) { logger.error("服务器启动失败", e); return; } } public boolean isQuit(String message, Socket socket) throws IOException { boolean isQuit = Constants.KEY_WORD_QUIT.equals(message); if (isQuit) { CLIENT_SOCKET_SET.remove(socket); int port = socket.getPort(); socket.close(); logger.debug("客户端[{}]下线", port); } return isQuit; } /** * 转发消息 * * @param curSocketPort 当前发送消息的客户端Socket的端口 * @param message 需要转发的消息 */ public void forwardMessage(int curSocketPort, String message) { message += "\r\n"; if (StringUtils.isBlank(message)) { return; } for (Socket socket : CLIENT_SOCKET_SET) { if (socket.isClosed() || socket.getPort() == curSocketPort) { continue; } if (socket.getPort() != curSocketPort) { try { OutputStream outputStream = socket.getOutputStream(); //将字符串编码之后写入客户端 outputStream.write(message.getBytes(Constants.CHARSET)); //刷新缓冲区 outputStream.flush(); } catch (IOException e) { logger.error("消息转发失败", e); } } } } public static void main(String[] args) { new BioChatServer().start(); } }
- 客户端
/** * @author futao * @date 2020/7/2 */ public class BioChatClient { private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class); /** * 开启这个线程的目的是,当用户输入了退出指令,需要通知监听响应的线程也结束, * 否则如果监听响应的线程还处于阻塞状态的话,客户端应用是无法停止的 */ private static final ExecutorService executorService = Executors.newSingleThreadExecutor(); private void start() { try { Socket socket = new Socket("localhost", Constants.SERVER_PORT); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Constants.CHARSET)); OutputStream outputStream = socket.getOutputStream(); logger.debug("========== 成功连接到聊天服务器 =========="); new Thread(() -> { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, Constants.CHARSET)); while (true) { try { String userInputStr = bufferedReader.readLine(); //需要加上换行符 outputStream.write((userInputStr + "\n").getBytes(Constants.CHARSET)); outputStream.flush(); if (Constants.KEY_WORD_QUIT.equals(userInputStr)) { reader.close(); outputStream.close(); socket.close(); //通知监听响应的线程也结束 executorService.shutdownNow(); break; } } catch (IOException e) { e.printStackTrace(); } } logger.debug("========== 退出聊天 =========="); }).start(); executorService.execute(() -> { //线程一直监听服务端发送的消息 String message; try { while (!socket.isInputShutdown() && (message = reader.readLine()) != null) { logger.info("接收到消息:[{}]", message); } } catch (IOException e) { e.printStackTrace(); } }); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new BioChatClient().start(); } }
- 测试
- 启动服务端与客户端(1个服务端,3个客户端)
客户端发送消息与转发
- 测试通过,达成目标~
# 对线程的考虑
- 从前面对BIO的实现可以看出,需要大量用到线程,现在来测试一下关于线程的问题。
启动50个客户端,观察服务器端线程的情况。
- 测试代码
/** * @author futao * @date 2020/7/5 */ public class ClientRunner { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 50; i++) { executorService.execute(() -> { new BioChatClient().start(); }); } } }
- 日志
线程
服务端有50个线程与客户端对应。
# 伪异步IO
服务端在accept()与客户端建立Socket连接之后,将该任务交给线程池去处理,而不是每次都开启一个新的线程。
- 改动服务端代码的两行代码
再次测试
- 服务端的线程数维持在了10个,保护了服务端的安全~