从零手写实现 tomcat-06-servlet bio/thread/nio/netty 池化处理

简介: 该文介绍了逐步改进的网络服务器实现,从最初的 BIO 基础版到使用线程池的 BIO+Thread,再到 NIO 版本和 NIO+Thread,最后展示了一个使用 Netty 框架的简洁实现。文章旨在说明如何解决阻塞问题,并对比不同模型的优劣,最终推荐使用 Netty 以简化 NIO 编程。

拓展阅读

Netty 权威指南-01-BIO 案例

Netty 权威指南-02-NIO 案例

Netty 权威指南-03-AIO 案例

Netty 权威指南-04-为什么选择 Netty?Netty 入门教程

问题

现在的实现看起来一切都好,但是有一个问题,会导致阻塞。

为了一步步演示,我们把代码简化一下。

v1-bio

最基本的版本

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author 老马啸西风
 * @since 0.1.0
 */
public class MiniCatBootstrapBioSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapBioSocket.class);

    /**
     * 启动端口号
     */
    private final int port;

    /**
     * 服务端 socket
     */
    private ServerSocket serverSocket;

    public MiniCatBootstrapBioSocket() {
   
        this.port = 8080;
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            this.serverSocket = new ServerSocket(port);

            while (true) {
   
                Socket clientSocket = serverSocket.accept(); // 等待客户端连接

                // 从Socket获取输入流
                logger.info("readRequestString start");
                String requestString = readRequestString(clientSocket);
                logger.info("readRequestString end");

                // 这里模拟一下耗时呢
                TimeUnit.SECONDS.sleep(5);

                // 写回到客户端
                logger.info("writeToClient start");
                writeToClient(clientSocket, requestString);
                logger.info("writeToClient end");

                // 关闭连接
                clientSocket.close();
            }


        } catch (Exception e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void writeToClient(Socket clientSocket, String requestString) throws IOException {
   
        OutputStream outputStream = clientSocket.getOutputStream();
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        outputStream.write(httpText.getBytes("UTF-8"));
    }

    private String readRequestString(Socket clientSocket) throws IOException {
   
        // 从Socket获取输入流
        BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        StringBuilder requestBuilder = new StringBuilder();
        String line;
        // 读取HTTP请求直到空行(表示HTTP请求结束)
        while ((line = reader.readLine()) != null && !line.isEmpty()) {
   
            requestBuilder.append(line).append("\n");
        }
        return requestBuilder.toString();
    }

}

这种实现方式每次只能处理一个请求。

当然,我们可以引入 thread 线程池。

v2-bio+thread

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 实际测试还是会阻塞
 *
 * @author 老马啸西风
 * @since 0.1.0
 */
public class MiniCatBootstrapBioThreadSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapBioThreadSocket.class);

    /**
     * 启动端口号
     */
    private final int port;

    /**
     * 服务端 socket
     */
    private ServerSocket serverSocket;

    private final ExecutorService threadPool;

    public MiniCatBootstrapBioThreadSocket() {
   
        this.port = 8080;

        threadPool = Executors.newFixedThreadPool(10);
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            this.serverSocket = new ServerSocket(port);

            while (true) {
   
                Socket clientSocket = serverSocket.accept(); // 等待客户端连接

                // 从Socket获取输入流
                threadPool.submit(new Runnable() {
   
                    @Override
                    public void run() {
   
                        handleSocket(clientSocket);
                    }
                });
            }


        } catch (Exception e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void handleSocket(Socket clientSocket) {
   
        try {
   
            logger.info("readRequestString start");
            String requestString = readRequestString(clientSocket);
            logger.info("readRequestString end");

            // 这里模拟一下耗时呢
            TimeUnit.SECONDS.sleep(5);

            // 写回到客户端
            logger.info("writeToClient start");
            writeToClient(clientSocket, requestString);
            logger.info("writeToClient end");

            // 关闭连接
            clientSocket.close();
        } catch (IOException | InterruptedException e) {
   
            logger.error("");
        }
    }

    private void writeToClient(Socket clientSocket, String requestString) throws IOException {
   
        OutputStream outputStream = clientSocket.getOutputStream();
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        outputStream.write(httpText.getBytes("UTF-8"));
    }

    private String readRequestString(Socket clientSocket) throws IOException {
   
        // 从Socket获取输入流
        BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        StringBuilder requestBuilder = new StringBuilder();
        String line;
        // 读取HTTP请求直到空行(表示HTTP请求结束)
        while ((line = reader.readLine()) != null && !line.isEmpty()) {
   
            requestBuilder.append(line).append("\n");
        }
        return requestBuilder.toString();
    }

}

其实这个还是不够的,测试发现这里的 socket 其实还是阻塞的。

v3-nio

nio 可以让 socket 不再阻塞

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class MiniCatBootstrapNioSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapNioSocket.class);

    private final int port;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;

    public MiniCatBootstrapNioSocket() {
   
        this.port = 8080;
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);

            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
   
                int readyChannels = selector.select();
                if (readyChannels == 0) {
   
                    continue;
                }

                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {
   
                    SelectionKey key = keyIterator.next();

                    if (key.isAcceptable()) {
   
                        handleAccept(key);
                    } else if (key.isReadable()) {
   
                        handleRead(key);
                    }

                    keyIterator.remove();
                }
            }
        } catch (IOException | InterruptedException e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void handleAccept(SelectionKey key) throws IOException {
   
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private void handleRead(SelectionKey key) throws IOException, InterruptedException {
   
        logger.info("handle read start");
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder requestBuilder = new StringBuilder();

        int bytesRead = socketChannel.read(buffer);
        while (bytesRead > 0) {
   
            buffer.flip();
            while (buffer.hasRemaining()) {
   
                requestBuilder.append((char) buffer.get());
            }
            buffer.clear();
            bytesRead = socketChannel.read(buffer);
        }

        String requestString = requestBuilder.toString();
        logger.info("handle read requestString={}", requestString);

        TimeUnit.SECONDS.sleep(5); // 模拟耗时操作

        logger.info("start write");
        writeToClient(socketChannel, requestString);
        logger.info("end writeToClient");

        socketChannel.close();
    }

    private void writeToClient(SocketChannel socketChannel, String requestString) throws IOException {
   
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        ByteBuffer buffer = ByteBuffer.wrap(httpText.getBytes("UTF-8"));
        socketChannel.write(buffer);
    }

}

v4-nio+thread

不过测试发现,依然会阻塞在 sleep 的地方。

调整如下:

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MiniCatBootstrapNioThreadSocket {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapNioThreadSocket.class);

    private final int port;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private ExecutorService threadPool;

    public MiniCatBootstrapNioThreadSocket() {
   
        this.port = 8080;
        this.threadPool = Executors.newFixedThreadPool(10); // 10个线程的线程池
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        try {
   
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);

            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
   
                int readyChannels = selector.select();
                if (readyChannels == 0) {
   
                    continue;
                }

                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                while (keyIterator.hasNext()) {
   
                    SelectionKey key = keyIterator.next();

                    if (key.isAcceptable()) {
   
                        handleAccept(key);
                    } else if (key.isReadable()) {
   
                        handleRead(key);
                    }

                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        }
    }

    private void handleAccept(SelectionKey key) throws IOException {
   
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private void handleRead(SelectionKey key) throws IOException {
   
        threadPool.execute(() -> {
   
            try {
   
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                StringBuilder requestBuilder = new StringBuilder();

                int bytesRead = socketChannel.read(buffer);
                while (bytesRead > 0) {
   
                    buffer.flip();
                    while (buffer.hasRemaining()) {
   
                        requestBuilder.append((char) buffer.get());
                    }
                    buffer.clear();
                    bytesRead = socketChannel.read(buffer);
                }

                String requestString = requestBuilder.toString();
                logger.info("read requestString={}", requestString);

                TimeUnit.SECONDS.sleep(5); // 模拟耗时操作
                writeToClient(socketChannel, requestString);
                logger.info("writeToClient done");
                socketChannel.close();
            } catch (InterruptedException | IOException e) {
   
                logger.error("[MiniCat] error processing request", e);
            }
        });
    }

    private void writeToClient(SocketChannel socketChannel, String requestString) throws IOException {
   
        String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
        ByteBuffer buffer = ByteBuffer.wrap(httpText.getBytes("UTF-8"));
        socketChannel.write(buffer);
    }

    public void shutdown() {
   
        try {
   
            threadPool.shutdown();
            threadPool.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
   
            logger.error("[MiniCat] error shutting down thread pool", e);
            Thread.currentThread().interrupt();
        } finally {
   
            try {
   
                selector.close();
                serverSocketChannel.close();
            } catch (IOException e) {
   
                logger.error("[MiniCat] error closing server socket", e);
            }
        }
    }

}

v5-netty

看的出来,我们废了很大的精力才实现了 nio。

其实 netty 就是针对 nio api 设计的过于复杂的问题,做了大量的改进和优化。

我们来一起欣赏一下 netty 的版本:

package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class MiniCatBootstrapNetty {
   

    private static final Log logger = LogFactory.getLog(MiniCatBootstrapNetty.class);

    /**
     * 启动端口号
     */
    private final int port;

    public MiniCatBootstrapNetty() {
   
        this.port = 8080;
    }

    public void start() {
   
        logger.info("[MiniCat] start listen on port {}", port);
        logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //worker 线程池的数量默认为 CPU 核心数的两倍
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
   
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
   
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
   
                            ch.pipeline().addLast(new MiniCatNettyServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            ChannelFuture future = serverBootstrap.bind(port).sync();

            // Wait until the server socket is closed.
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
   
            logger.error("[MiniCat] start meet ex", e);
            throw new MiniCatException(e);
        } finally {
   
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}
package com.github.houbb.minicat.bs.servlet;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.util.InnerHttpUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

public class MiniCatNettyServerHandler extends ChannelInboundHandlerAdapter {
   

    private static final Log logger = LogFactory.getLog(MiniCatNettyServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
   
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String requestString = new String(bytes, Charset.defaultCharset());
        logger.info("channelRead requestString={}", requestString);

        // Simulating some processing time
        try {
   
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
   
            throw new RuntimeException(e);
        }

        String respText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);;
        ByteBuf responseBuf = Unpooled.copiedBuffer(respText.getBytes());
        ctx.writeAndFlush(responseBuf)
                .addListener(ChannelFutureListener.CLOSE); // Close the channel after sending the response
        logger.info("channelRead writeAndFlush DONE");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
   
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
   
        logger.error("exceptionCaught", cause);
        ctx.close();
    }
}

开源地址

 /\_/\  
( o.o ) 
 > ^ <

mini-cat 是简易版本的 tomcat 实现。别称【嗅虎】(心有猛虎,轻嗅蔷薇。)

开源地址:https://github.com/houbb/minicat

相关文章
|
9天前
|
网络协议 Dubbo Java
一文搞懂NIO、AIO、BIO的核心区别(建议收藏)
本文详细解析了NIO、AIO、BIO的核心区别,NIO的三个核心概念,以及NIO在Java框架中的应用等。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
一文搞懂NIO、AIO、BIO的核心区别(建议收藏)
|
3月前
|
设计模式
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
|
13天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
2月前
|
Java
Netty BIO/NIO/AIO介绍
Netty BIO/NIO/AIO介绍
|
1月前
|
Java Linux 应用服务中间件
【编程进阶知识】高并发场景下Bio与Nio的比较及原理示意图
本文介绍了在Linux系统上使用Tomcat部署Java应用程序时,BIO(阻塞I/O)和NIO(非阻塞I/O)在网络编程中的实现和性能差异。BIO采用传统的线程模型,每个连接请求都会创建一个新线程进行处理,导致在高并发场景下存在严重的性能瓶颈,如阻塞等待和线程创建开销大等问题。而NIO则通过事件驱动机制,利用事件注册、事件轮询器和事件通知,实现了更高效的连接管理和数据传输,避免了阻塞和多级数据复制,显著提升了系统的并发处理能力。
58 0
|
3月前
|
缓存 Java UED
BIO、NIO、AIO有什么区别
【8月更文挑战第16天】BIO、NIO、AIO有什么区别
77 4
|
3月前
|
Java
"揭秘Java IO三大模式:BIO、NIO、AIO背后的秘密!为何AIO成为高并发时代的宠儿,你的选择对了吗?"
【8月更文挑战第19天】在Java的IO编程中,BIO、NIO与AIO代表了三种不同的IO处理机制。BIO采用同步阻塞模型,每个连接需单独线程处理,适用于连接少且稳定的场景。NIO引入了非阻塞性质,利用Channel、Buffer与Selector实现多路复用,提升了效率与吞吐量。AIO则是真正的异步IO,在JDK 7中引入,通过回调或Future机制在IO操作完成后通知应用,适合高并发场景。选择合适的模型对构建高效网络应用至关重要。
83 2
|
4月前
|
安全 Java Linux
(七)Java网络编程-IO模型篇之从BIO、NIO、AIO到内核select、epoll剖析!
IO(Input/Output)方面的基本知识,相信大家都不陌生,毕竟这也是在学习编程基础时就已经接触过的内容,但最初的IO教学大多数是停留在最基本的BIO,而并未对于NIO、AIO、多路复用等的高级内容进行详细讲述,但这些却是大部分高性能技术的底层核心,因此本文则准备围绕着IO知识进行展开。
165 1
|
5月前
|
Java 视频直播 数据库连接
Java I/O 模型详解:BIO、NIO 与 AIO 的特性与应用
Java I/O 模型详解:BIO、NIO 与 AIO 的特性与应用
66 2
|
4月前
|
监控 网络协议 Java
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
75 0