【BIO】在聊天室项目中的演化

简介: 【BIO】在聊天室项目中的演化

通过聊天室项目的演化。介绍BIO的基本用法与优缺点。


  • 提示: 注意阅读代码和注释。


# 提要:


  1. 第一版: echo聊天室
  • 服务器接收到客户端发送的消息,并打印
  • 服务端将客户端发送的消息经过包装后再次发送给客户端
  • 客户端断开连接
  • eg:
  • client: greet from socket.
  • server: echo from server: <greet from socket.>
  1. 第二版: 群聊聊天室
  • 服务器接收客户端发送的消息,并打印
  • 服务端将客户端发送的消息转发给其他在线的客户端
  • 客户端可一直保持在线状态


# 基础配置与工具类


  • 基础配置

/**
 * 常量
 *
 * @author futao
 * @date 2020/7/2
 */
public class Constants {
    /**
     * 服务器端口
     */
    public static final int SERVER_PORT = 9507;
    /**
     * 关键字-客户端退出系统
     */
    public static final String KEY_WORD_QUIT = "quit";
    /**
     * 使用的字符集编码
     */
    public static final Charset CHARSET = StandardCharsets.UTF_8;
}
  • IO工具类

/**
 * @author futao
 * @date 2020/7/2
 */
public class IOUtils {
    /**
     * 从输入流中读取字符串
     *
     * @param is 输入流
     * @return 读取到的字符串
     * @throws IOException
     */
    public static String readString(InputStream is) throws IOException {
        //使用带有缓冲区的BufferInputStream以提高读取性能
        BufferedInputStream bufferedInputStream = new BufferedInputStream(is);
        //从缓冲区中一次读取的数据
        byte[] buffer = new byte[1024 * 4];
        //读取的字符串
        StringBuilder fullMessage = new StringBuilder();
        //本次读取到的字节个数
        int curBufferSize;
        //循环将数据写入缓冲区buffer,并返回读取到的字节个数。当前数据读取完毕会返回-1。
        // curBufferSize这个参数的作用有两个
        //  1. 判断是否读取到了流的末尾(==-1?)
        //  2. 缓冲区字节数组buffer可能并没有写满,只写了curBufferSize,
        //  那么我们只需要将字节数组中前面curBufferSize个字节转换成字符串就行。
        while ((curBufferSize = bufferedInputStream.read(buffer)) != -1) {
            //将buffer中的数据转换成字符串,从buffer的第0个字节开始,读取curBufferSize个字节
            fullMessage.append(new String(buffer, 0, curBufferSize));
        }
        return fullMessage.toString();
    }
}

1. echo聊天室


  • 需求描述:


  • 服务器接收到客户端发送的消息,并打印
  • 服务端将客户端发送的消息经过包装后再次发送给客户端
  • 客户端断开连接
  • eg:
  • client: greet from socket.
  • server: echo from server: <greet from socket.>


  • 实现思路:
  • 创建服务端ServerSocket并绑定所监听的端口
  • 调用serverSocket.accept()阻塞监听客户端的接入
  • 客户端接入后获取到客户端Socket,并将该Socket上流的读写操作交给子线程去处理,主线程继续阻塞在accept()监听客户端的接入,否则同一时刻只能有一个客户端接入。
  • 服务器端 BioChatServer

/**
 * @author futao
 * @date 2020/7/2
 */
public class BioChatServer {
    private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class);
    /**
     * 启动服务器
     */
    public void start() {
        //创建服务端ServerSocket,并监听端口
        try (ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT)) {
            logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT);
            //循环accept()监听
            while (true) {
                //accept()将阻塞,直到有客户端Socket接入。并在服务端创建一个Socket与其对应
                Socket socket = serverSocket.accept();
                logger.debug("客户端[{}]成功接入", socket.getPort());
                //将获取到的客户端连接交给子线程去处理,不影响主线程继续监听,等待下一个客户端连接
                new Thread(() -> {
                    try (
                            //用于从客户端读取数据
                            InputStream inputStream = socket.getInputStream();
                            //用于将数据写给客户端
                            OutputStream outputStream = socket.getOutputStream()
                    ) {
                        //从输入流中读取数据
                        String fullMessage = IOUtils.readString(inputStream);
                        logger.info("接收到客户端【{}】发来的消息[{}]", socket.getPort(), fullMessage);
                        //编码与响应
                        outputStream.write(String.format("echo from server: <%s>", fullMessage).getBytes(Constants.CHARSET));
                    } catch (IOException e) {
                        logger.error("客户端异常", e);
                    }
                }).start();
            }
        } catch (IOException e) {
            logger.error("服务器启动失败", e);
            return;
        }
    }
    public static void main(String[] args) {
        new BioChatServer().start();
    }
}
  • 客户端 BioChatClient

/**
 * @author futao
 * @date 2020/7/2
 */
public class BioChatClient {
    private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class);
    /**
     * 启动客户端
     */
    private void start() {
        try (
                //尝试连接到服务器
                Socket socket = new Socket("localhost", Constants.SERVER_PORT);
                //获取到输入流
                InputStream inputStream = socket.getInputStream();
                //输出流
                OutputStream outputStream = socket.getOutputStream()
        ) {
            logger.debug("========== 成功连接到聊天服务器 ==========");
            //获取到用户输入的字符串
            String userInputStr = new Scanner(System.in).nextLine();
            //将字符串转换成字节,写入输出流
            outputStream.write(userInputStr.getBytes(Constants.CHARSET));
            //刷新缓冲区
            outputStream.flush();
            //【重要】关闭输出流,通知服务器客户端消息已经发送完毕
            socket.shutdownOutput();
            //读取服务端的响应
            logger.info("接收到消息:[{}]", IOUtils.readString(inputStream));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new BioChatClient().start();
    }
}


  • 需要注意的代码为BioChatClient.start()socket.shutdownOutput();,如果不将客户端socket对应的输出流关闭,服务器端将不知道客户端消息是否发送完毕,服务端会一直阻塞在inputstream.read()
  • 测试
  • 启动服务端
  • 分别启动两个客户端,向服务端发送消息



image.png

两个客户端分别发送了一条消息,并接收到了服务器的响应。


image.png

  • 通过服务器端日志可以看出
  • 客户端接入事件都是在主线程main线程上发生的。
  • 而客户端消息的收发都是在新的子线程上发生的。而且每一个连接都需要一个全新的线程来处理。
  • 通过对线程运行状态的分析也可以看出,子线程在完成消息读取和发送之后立马就销毁了


image.png

  • 缺点:
  • 客户端每次接入只能发送一条消息就下线了,无法保持长期在线。
  • 每有一个客户端接入,就需要创建一个线程,如果有大量客户端接入,将对服务器产生较大压力。
  • 且每个创建的线程只执行了非常少量的任务就被销毁了,对资源的消耗比较大。


2. 群聊聊天室


  • 服务器接收客户端发送的消息,并打印
  • 服务端将客户端发送的消息转发给其他在线的客户端
  • 客户端可一直保持在线状态


2.1 常规思路


  • 服务端


image.png

image.png

image.png

客户端


image.png


image.png


  • 但是测试下来会发现,如果客户端不断开连接,服务端将一直阻塞在Inputstream.read(),因为服务端根本不知道客户端的数据已经发送完毕了。
  • 所以现在的问题是:如何告知对方数据已经发送完毕?


# 如何告知对方数据已经发送完毕?


客户端打开一个输出流,如果不做约定,也不关闭它,那么服务端永远不知道客户端是否发送完消息,那么服务端会一直等待下去,直到读取超时。


  1. 关闭Socket连接。
  • 缺点: 客户端Socket关闭后,将不能接受服务端发送的消息,也不能再次发送消息。
  1. 关闭Socket的输出流(而不是Socket)
  • 参照Echo聊天室的实现
  • 关闭输出流socket.shutdownOutput();而不是outputStream.close();,这样还能继续监听从服务端响应的输入流。
  • 缺点:还是不能再次发送消息给服务端。
  1. 通过约定的符号
  1. 通过Writer.writeLine()/Reader.readLine()
  • Reader.readLine()将会在读取到回车\r,换行\n或者回车紧跟着换行\r\n时返回读取到的数据。
  1. 通过指定长度告知对方已发送完命令
  • 先在输出流的第一个字节写入本次传输将会传递的数据的字节大小,接收方在获取到这个值之后,从输入流中读取指定个数的字节即可。


2.2 通过约定的符号\r\n,标识消息发送完毕。


  • 服务端代码

/**
 * 使用标记符号的方式通知消息发送完毕
 *
 * @author futao
 * @date 2020/7/2
 */
public class BioChatServer {
    private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class);
    private static final Set<Socket> CLIENT_SOCKET_SET = new HashSet<Socket>() {
        @Override
        public synchronized boolean add(Socket o) {
            return super.add(o);
        }
        @Override
        public synchronized boolean remove(Object o) {
            return super.remove(o);
        }
    };
    public void start() {
        //创建服务端ServerSocket,并监听端口
        try (ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT)) {
            logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT);
            //循环accept()监听
            while (true) {
                //accept()将阻塞,直到有客户端Socket接入。并在服务端创建一个Socket与其对应
                Socket socket = serverSocket.accept();
                logger.debug("客户端[{}]上线", socket.getPort());
                CLIENT_SOCKET_SET.add(socket);
                //将获取到的客户端连接交给子线程去处理,不影响主线程继续监听,等待下一个客户端连接
                new Thread(() -> {
                    try {
                        //用于从客户端读取数据(将字节流转换成字符流)
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream())));
                        while (true) {
                            //从输入流中读取数据
                            String message = bufferedReader.readLine();
                            if (StringUtils.isNotBlank(message)) {
                                logger.info("接收到客户端【{}】发来的消息[{}]", socket.getPort(), message);
                            } else {
                                isQuit(message, socket);
                            }
                            //判断是否为下线
                            boolean isQuit = isQuit(message, socket);
                            //转发消息
                            if (!isQuit) {
                                forwardMessage(socket.getPort(), String.format("<from %s>", socket.getPort()) + message);
                            } else {
                                break;
                            }
                        }
                    } catch (IOException e) {
                        logger.error("客户端异常", e);
                    }
                }).start();
            }
        } catch (IOException e) {
            logger.error("服务器启动失败", e);
            return;
        }
    }
    public boolean isQuit(String message, Socket socket) throws IOException {
        boolean isQuit = Constants.KEY_WORD_QUIT.equals(message);
        if (isQuit) {
            CLIENT_SOCKET_SET.remove(socket);
            int port = socket.getPort();
            socket.close();
            logger.debug("客户端[{}]下线", port);
        }
        return isQuit;
    }
    /**
     * 转发消息
     *
     * @param curSocketPort 当前发送消息的客户端Socket的端口
     * @param message       需要转发的消息
     */
    public void forwardMessage(int curSocketPort, String message) {
        message += "\r\n";
        if (StringUtils.isBlank(message)) {
            return;
        }
        for (Socket socket : CLIENT_SOCKET_SET) {
            if (socket.isClosed() || socket.getPort() == curSocketPort) {
                continue;
            }
            if (socket.getPort() != curSocketPort) {
                try {
                    OutputStream outputStream = socket.getOutputStream();
                    //将字符串编码之后写入客户端
                    outputStream.write(message.getBytes(Constants.CHARSET));
                    //刷新缓冲区
                    outputStream.flush();
                } catch (IOException e) {
                    logger.error("消息转发失败", e);
                }
            }
        }
    }
    public static void main(String[] args) {
        new BioChatServer().start();
    }
}
  • 客户端

/**
 * @author futao
 * @date 2020/7/2
 */
public class BioChatClient {
    private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class);
    /**
     * 开启这个线程的目的是,当用户输入了退出指令,需要通知监听响应的线程也结束,
     * 否则如果监听响应的线程还处于阻塞状态的话,客户端应用是无法停止的
     */
    private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private void start() {
        try {
            Socket socket = new Socket("localhost", Constants.SERVER_PORT);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Constants.CHARSET));
            OutputStream outputStream = socket.getOutputStream();
            logger.debug("========== 成功连接到聊天服务器 ==========");
            new Thread(() -> {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in, Constants.CHARSET));
                while (true) {
                    try {
                        String userInputStr = bufferedReader.readLine();
                        //需要加上换行符
                        outputStream.write((userInputStr + "\n").getBytes(Constants.CHARSET));
                        outputStream.flush();
                        if (Constants.KEY_WORD_QUIT.equals(userInputStr)) {
                            reader.close();
                            outputStream.close();
                            socket.close();
                            //通知监听响应的线程也结束
                            executorService.shutdownNow();
                            break;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                logger.debug("========== 退出聊天 ==========");
            }).start();
            executorService.execute(() -> {
                //线程一直监听服务端发送的消息
                String message;
                try {
                    while (!socket.isInputShutdown() && (message = reader.readLine()) != null) {
                        logger.info("接收到消息:[{}]", message);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new BioChatClient().start();
    }
}


  • 测试
  • 启动服务端与客户端(1个服务端,3个客户端)


image.png

客户端发送消息与转发


image.png

image.png

image.png


  • 测试通过,达成目标~


# 对线程的考虑


  • 从前面对BIO的实现可以看出,需要大量用到线程,现在来测试一下关于线程的问题。


启动50个客户端,观察服务器端线程的情况。


  • 测试代码

/**
 * @author futao
 * @date 2020/7/5
 */
public class ClientRunner {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 50; i++) {
            executorService.execute(() -> {
                new BioChatClient().start();
            });
        }
    }
}


  • 日志


image.png

image.png

线程


image.png

服务端有50个线程与客户端对应。


# 伪异步IO


服务端在accept()与客户端建立Socket连接之后,将该任务交给线程池去处理,而不是每次都开启一个新的线程。


  • 改动服务端代码的两行代码


image.png

再次测试


image.png


  • 服务端的线程数维持在了10个,保护了服务端的安全~


# 参考


相关文章
|
14天前
|
存储 Java
BIO的工作流程
BIO(Blocking I/O)工作流程是指在进行输入输出操作时,线程会一直阻塞直到操作完成。具体流程包括:客户端发起请求,服务器接收后开始处理,期间服务器线程处于等待状态,直至数据准备完毕,响应返回给客户端,线程才继续执行其他任务。
29 5
|
4月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
183 1
|
4月前
|
Java 应用服务中间件 Linux
(九)Java网络编程无冕之王-这回把大名鼎鼎的Netty框架一网打尽!
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty恰恰是Java网络编程领域的无冕之王。Netty这个框架相信大家定然听说过,其在Java网络编程中的地位,好比JavaEE中的Spring。
157 3
|
6月前
|
Java
“解密Netty中的BossGroup与WorkerGroup:他们之间的默契和配合“
“解密Netty中的BossGroup与WorkerGroup:他们之间的默契和配合“
562 0
|
7月前
Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力
Netty Review - 借助SimpleTalkRoom初体验异步网络编程的魅力
82 0
|
程序员
难倒了80%程序员的题,谈谈你对Netty中,Pipeline工作原理的理解
1位工作8年的小伙伴,去某东面试IM部门,被问到这样一道面试题。说,请你谈一谈你对Netty Pipeline设计原理的理解。当时,他说只是用过Netty的Pipline,原理没有深入了解过,然后就没有然后了。
84 0
|
存储 缓存 负载均衡
计网 - 怎样实现 RPC 框架
计网 - 怎样实现 RPC 框架
109 0
|
存储 编解码 安全
基于Netty的IM聊天加密技术学习:一文理清常见的加密概念、术语等
本文正好借此机会,以Netty编写的IM聊天加密为例,为入门者理清什么是PKI体系、什么是SSL、什么是OpenSSL、以及各类证书和它们间的关系等,并在文末附上简短的Netty代码实示例,希望能助你通俗易懂地快速理解这些知识和概念!
223 0
基于Netty的IM聊天加密技术学习:一文理清常见的加密概念、术语等
|
存储 JSON 编解码
06、Netty学习笔记—(聊天业务优化:扩展序列化算法)
06、Netty学习笔记—(聊天业务优化:扩展序列化算法)
06、Netty学习笔记—(聊天业务优化:扩展序列化算法)