从零手写实现 tomcat-06-servlet bio/thread/nio/netty 池化处理

简介: 该文介绍了逐步改进的网络服务器实现,从最初的 BIO 基础版到使用线程池的 BIO+Thread,再到 NIO 版本和 NIO+Thread,最后展示了一个使用 Netty 框架的简洁实现。文章旨在说明如何解决阻塞问题,并对比不同模型的优劣,最终推荐使用 Netty 以简化 NIO 编程。

拓展阅读

Netty 权威指南-01-BIO 案例

Netty 权威指南-02-NIO 案例

Netty 权威指南-03-AIO 案例

Netty 权威指南-04-为什么选择 Netty?Netty 入门教程

问题

现在的实现看起来一切都好,但是有一个问题,会导致阻塞。

为了一步步演示,我们把代码简化一下。

v1-bio

最基本的版本

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author 老马啸西风
 * @since 0.1.0
 */
public class MiniCatBootstrapBioSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapBioSocket.class);

    /**
     * 启动端口号
     */
    private final int port;

    /**
     * 服务端 socket
     */
    private ServerSocket serverSocket;

    public MiniCatBootstrapBioSocket() {
   
        this.port = 8080;
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            this.serverSocket = new ServerSocket(port);

            while (true) {
   
                Socket clientSocket = serverSocket.accept(); // 等待客户端连接

                // 从Socket获取输入流
                logger.info("readRequestString start");
                String requestString = readRequestString(clientSocket);
                logger.info("readRequestString end");

                // 这里模拟一下耗时呢
                TimeUnit.SECONDS.sleep(5);

                // 写回到客户端
                logger.info("writeToClient start");
                writeToClient(clientSocket, requestString);
                logger.info("writeToClient end");

                // 关闭连接
                clientSocket.close();
            }


        } catch (Exception e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void writeToClient(Socket clientSocket, String requestString) throws IOException {
   
        OutputStream outputStream = clientSocket.getOutputStream();
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        outputStream.write(httpText.getBytes("UTF-8"));
    }

    private String readRequestString(Socket clientSocket) throws IOException {
   
        // 从Socket获取输入流
        BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        StringBuilder requestBuilder = new StringBuilder();
        String line;
        // 读取HTTP请求直到空行(表示HTTP请求结束)
        while ((line = reader.readLine()) != null && !line.isEmpty()) {
   
            requestBuilder.append(line).append("\n");
        }
        return requestBuilder.toString();
    }

}

这种实现方式每次只能处理一个请求。

当然,我们可以引入 thread 线程池。

v2-bio+thread

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 实际测试还是会阻塞
 *
 * @author 老马啸西风
 * @since 0.1.0
 */
public class MiniCatBootstrapBioThreadSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapBioThreadSocket.class);

    /**
     * 启动端口号
     */
    private final int port;

    /**
     * 服务端 socket
     */
    private ServerSocket serverSocket;

    private final ExecutorService threadPool;

    public MiniCatBootstrapBioThreadSocket() {
   
        this.port = 8080;

        threadPool = Executors.newFixedThreadPool(10);
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            this.serverSocket = new ServerSocket(port);

            while (true) {
   
                Socket clientSocket = serverSocket.accept(); // 等待客户端连接

                // 从Socket获取输入流
                threadPool.submit(new Runnable() {
   
                    @Override
                    public void run() {
   
                        handleSocket(clientSocket);
                    }
                });
            }


        } catch (Exception e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void handleSocket(Socket clientSocket) {
   
        try {
   
            logger.info("readRequestString start");
            String requestString = readRequestString(clientSocket);
            logger.info("readRequestString end");

            // 这里模拟一下耗时呢
            TimeUnit.SECONDS.sleep(5);

            // 写回到客户端
            logger.info("writeToClient start");
            writeToClient(clientSocket, requestString);
            logger.info("writeToClient end");

            // 关闭连接
            clientSocket.close();
        } catch (IOException | InterruptedException e) {
   
            logger.error("");
        }
    }

    private void writeToClient(Socket clientSocket, String requestString) throws IOException {
   
        OutputStream outputStream = clientSocket.getOutputStream();
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        outputStream.write(httpText.getBytes("UTF-8"));
    }

    private String readRequestString(Socket clientSocket) throws IOException {
   
        // 从Socket获取输入流
        BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        StringBuilder requestBuilder = new StringBuilder();
        String line;
        // 读取HTTP请求直到空行(表示HTTP请求结束)
        while ((line = reader.readLine()) != null && !line.isEmpty()) {
   
            requestBuilder.append(line).append("\n");
        }
        return requestBuilder.toString();
    }

}

其实这个还是不够的,测试发现这里的 socket 其实还是阻塞的。

v3-nio

nio 可以让 socket 不再阻塞

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class MiniCatBootstrapNioSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapNioSocket.class);

    private final int port;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;

    public MiniCatBootstrapNioSocket() {
   
        this.port = 8080;
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);

            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
   
                int readyChannels = selector.select();
                if (readyChannels == 0) {
   
                    continue;
                }

                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {
   
                    SelectionKey key = keyIterator.next();

                    if (key.isAcceptable()) {
   
                        handleAccept(key);
                    } else if (key.isReadable()) {
   
                        handleRead(key);
                    }

                    keyIterator.remove();
                }
            }
        } catch (IOException | InterruptedException e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void handleAccept(SelectionKey key) throws IOException {
   
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private void handleRead(SelectionKey key) throws IOException, InterruptedException {
   
        logger.info("handle read start");
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder requestBuilder = new StringBuilder();

        int bytesRead = socketChannel.read(buffer);
        while (bytesRead > 0) {
   
            buffer.flip();
            while (buffer.hasRemaining()) {
   
                requestBuilder.append((char) buffer.get());
            }
            buffer.clear();
            bytesRead = socketChannel.read(buffer);
        }

        String requestString = requestBuilder.toString();
        logger.info("handle read requestString={}", requestString);

        TimeUnit.SECONDS.sleep(5); // 模拟耗时操作

        logger.info("start write");
        writeToClient(socketChannel, requestString);
        logger.info("end writeToClient");

        socketChannel.close();
    }

    private void writeToClient(SocketChannel socketChannel, String requestString) throws IOException {
   
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        ByteBuffer buffer = ByteBuffer.wrap(httpText.getBytes("UTF-8"));
        socketChannel.write(buffer);
    }

}

v4-nio+thread

不过测试发现,依然会阻塞在 sleep 的地方。

调整如下:

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MiniCatBootstrapNioThreadSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapNioThreadSocket.class);

    private final int port;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private ExecutorService threadPool;

    public MiniCatBootstrapNioThreadSocket() {
   
        this.port = 8080;
        this.threadPool = Executors.newFixedThreadPool(10); // 10个线程的线程池
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);

            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
   
                int readyChannels = selector.select();
                if (readyChannels == 0) {
   
                    continue;
                }

                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {
   
                    SelectionKey key = keyIterator.next();

                    if (key.isAcceptable()) {
   
                        handleAccept(key);
                    } else if (key.isReadable()) {
   
                        handleRead(key);
                    }

                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void handleAccept(SelectionKey key) throws IOException {
   
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private void handleRead(SelectionKey key) throws IOException {
   
        threadPool.execute(() -> {
   
            try {
   
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                StringBuilder requestBuilder = new StringBuilder();

                int bytesRead = socketChannel.read(buffer);
                while (bytesRead > 0) {
   
                    buffer.flip();
                    while (buffer.hasRemaining()) {
   
                        requestBuilder.append((char) buffer.get());
                    }
                    buffer.clear();
                    bytesRead = socketChannel.read(buffer);
                }

                String requestString = requestBuilder.toString();
                logger.info("read requestString={}", requestString);

                TimeUnit.SECONDS.sleep(5); // 模拟耗时操作
                writeToClient(socketChannel, requestString);
                logger.info("writeToClient done");
                socketChannel.close();
            } catch (InterruptedException | IOException e) {
   
                logger.error("[MiniCat] error processing request", e);
            }
        });
    }

    private void writeToClient(SocketChannel socketChannel, String requestString) throws IOException {
   
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        ByteBuffer buffer = ByteBuffer.wrap(httpText.getBytes("UTF-8"));
        socketChannel.write(buffer);
    }

    public void shutdown() {
   
        try {
   
            threadPool.shutdown();
            threadPool.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
   
            logger.error("[MiniCat] error shutting down thread pool", e);
            Thread.currentThread().interrupt();
        } finally {
   
            try {
   
                selector.close();
                serverSocketChannel.close();
            } catch (IOException e) {
   
                logger.error("[MiniCat] error closing server socket", e);
            }
        }
    }

}

v5-netty

看的出来,我们废了很大的精力才实现了 nio。

其实 netty 就是针对 nio api 设计的过于复杂的问题,做了大量的改进和优化。

我们来一起欣赏一下 netty 的版本:

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class MiniCatBootstrapNetty {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapNetty.class);

    /**
     * 启动端口号
     */
    private final int port;

    public MiniCatBootstrapNetty() {
   
        this.port = 8080;
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //worker 线程池的数量默认为 CPU 核心数的两倍
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
   
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
   
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
   
                            ch.pipeline().addLast(new MiniCatNettyServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            ChannelFuture future = serverBootstrap.bind(port).sync();

            // Wait until the server socket is closed.
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        } finally {
   
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}
package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.util.InnerHttpUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

public class MiniCatNettyServerHandler extends ChannelInboundHandlerAdapter {
   

    private static final Log logger = LogFactory.getLog(MiniCatNettyServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String requestString = new String(bytes, Charset.defaultCharset());
        logger.info("channelRead requestString={}", requestString);

        // Simulating some processing time
        try {
   
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }

        String respText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);;
        ByteBuf responseBuf = Unpooled.copiedBuffer(respText.getBytes());
        ctx.writeAndFlush(responseBuf)
                .addListener(ChannelFutureListener.CLOSE); // Close the channel after sending the response
        logger.info("channelRead writeAndFlush DONE");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
   
        logger.error("exceptionCaught", cause);
        ctx.close();
    }
}

开源地址

 /\_/\  
( o.o ) 
 > ^ <

mini-cat 是简易版本的 tomcat 实现。别称【嗅虎】(心有猛虎,轻嗅蔷薇。)

开源地址:https://github.com/houbb/minicat

相关文章
|
7天前
|
监控 网络协议 Java
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
11 0
|
1月前
|
Java 视频直播 数据库连接
Java I/O 模型详解:BIO、NIO 与 AIO 的特性与应用
Java I/O 模型详解:BIO、NIO 与 AIO 的特性与应用
27 2
|
21天前
|
存储 监控 Java
深入探索Java BIO与NIO输入输出模型:基于文件复制和socket通信
深入探索Java BIO与NIO输入输出模型:基于文件复制和socket通信
|
26天前
|
Scala
scala 读取文件(中文)异常 thread "main" java.nio.charset.MalformedInputException: Input length = 1
scala 读取文件(中文)异常 thread "main" java.nio.charset.MalformedInputException: Input length = 1
17 0
|
1月前
|
Java
谈谈NIO和BIO区别
谈谈NIO和BIO区别
16 0
|
2月前
|
编解码 网络协议 Java
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
|
2月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
2月前
|
网络协议 Java Linux
用Java来实现BIO和NIO模型的HTTP服务器(二) NIO的实现
用Java来实现BIO和NIO模型的HTTP服务器(二) NIO的实现
|
2月前
|
消息中间件 网络协议 Java
一文彻底理解BIO、NIO、AIO
一文彻底理解BIO、NIO、AIO
129 0
|
2月前
|
Java 应用服务中间件 Linux
java中的NIO,BIO,AIO
java中的NIO,BIO,AIO
32 0