拓展阅读
Netty 权威指南-04-为什么选择 Netty?Netty 入门教程
问题
现在的实现看起来一切都好,但是有一个问题,会导致阻塞。
为了一步步演示,我们把代码简化一下。
v1-bio
最基本的版本
package com.github.houbb.minicat.bs.servlet;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author 老马啸西风
* @since 0.1.0
*/
public class MiniCatBootstrapBioSocket {
private static final Log logger = LogFactory.getLog(MiniCatBootstrapBioSocket.class);
/**
* 启动端口号
*/
private final int port;
/**
* 服务端 socket
*/
private ServerSocket serverSocket;
public MiniCatBootstrapBioSocket() {
this.port = 8080;
}
public void start() {
logger.info("[MiniCat] start listen on port {}", port);
logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);
try {
this.serverSocket = new ServerSocket(port);
while (true) {
Socket clientSocket = serverSocket.accept(); // 等待客户端连接
// 从Socket获取输入流
logger.info("readRequestString start");
String requestString = readRequestString(clientSocket);
logger.info("readRequestString end");
// 这里模拟一下耗时呢
TimeUnit.SECONDS.sleep(5);
// 写回到客户端
logger.info("writeToClient start");
writeToClient(clientSocket, requestString);
logger.info("writeToClient end");
// 关闭连接
clientSocket.close();
}
} catch (Exception e) {
logger.error("[MiniCat] start meet ex", e);
throw new MiniCatException(e);
}
}
private void writeToClient(Socket clientSocket, String requestString) throws IOException {
OutputStream outputStream = clientSocket.getOutputStream();
String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
outputStream.write(httpText.getBytes("UTF-8"));
}
private String readRequestString(Socket clientSocket) throws IOException {
// 从Socket获取输入流
BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
StringBuilder requestBuilder = new StringBuilder();
String line;
// 读取HTTP请求直到空行(表示HTTP请求结束)
while ((line = reader.readLine()) != null && !line.isEmpty()) {
requestBuilder.append(line).append("\n");
}
return requestBuilder.toString();
}
}
这种实现方式每次只能处理一个请求。
当然,我们可以引入 thread 线程池。
v2-bio+thread
package com.github.houbb.minicat.bs.servlet;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 实际测试还是会阻塞
*
* @author 老马啸西风
* @since 0.1.0
*/
public class MiniCatBootstrapBioThreadSocket {
private static final Log logger = LogFactory.getLog(MiniCatBootstrapBioThreadSocket.class);
/**
* 启动端口号
*/
private final int port;
/**
* 服务端 socket
*/
private ServerSocket serverSocket;
private final ExecutorService threadPool;
public MiniCatBootstrapBioThreadSocket() {
this.port = 8080;
threadPool = Executors.newFixedThreadPool(10);
}
public void start() {
logger.info("[MiniCat] start listen on port {}", port);
logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);
try {
this.serverSocket = new ServerSocket(port);
while (true) {
Socket clientSocket = serverSocket.accept(); // 等待客户端连接
// 从Socket获取输入流
threadPool.submit(new Runnable() {
@Override
public void run() {
handleSocket(clientSocket);
}
});
}
} catch (Exception e) {
logger.error("[MiniCat] start meet ex", e);
throw new MiniCatException(e);
}
}
private void handleSocket(Socket clientSocket) {
try {
logger.info("readRequestString start");
String requestString = readRequestString(clientSocket);
logger.info("readRequestString end");
// 这里模拟一下耗时呢
TimeUnit.SECONDS.sleep(5);
// 写回到客户端
logger.info("writeToClient start");
writeToClient(clientSocket, requestString);
logger.info("writeToClient end");
// 关闭连接
clientSocket.close();
} catch (IOException | InterruptedException e) {
logger.error("");
}
}
private void writeToClient(Socket clientSocket, String requestString) throws IOException {
OutputStream outputStream = clientSocket.getOutputStream();
String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
outputStream.write(httpText.getBytes("UTF-8"));
}
private String readRequestString(Socket clientSocket) throws IOException {
// 从Socket获取输入流
BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
StringBuilder requestBuilder = new StringBuilder();
String line;
// 读取HTTP请求直到空行(表示HTTP请求结束)
while ((line = reader.readLine()) != null && !line.isEmpty()) {
requestBuilder.append(line).append("\n");
}
return requestBuilder.toString();
}
}
其实这个还是不够的,测试发现这里的 socket 其实还是阻塞的。
v3-nio
nio 可以让 socket 不再阻塞
package com.github.houbb.minicat.bs.servlet;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class MiniCatBootstrapNioSocket {
private static final Log logger = LogFactory.getLog(MiniCatBootstrapNioSocket.class);
private final int port;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public MiniCatBootstrapNioSocket() {
this.port = 8080;
}
public void start() {
logger.info("[MiniCat] start listen on port {}", port);
logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
}
keyIterator.remove();
}
}
} catch (IOException | InterruptedException e) {
logger.error("[MiniCat] start meet ex", e);
throw new MiniCatException(e);
}
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void handleRead(SelectionKey key) throws IOException, InterruptedException {
logger.info("handle read start");
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder requestBuilder = new StringBuilder();
int bytesRead = socketChannel.read(buffer);
while (bytesRead > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
requestBuilder.append((char) buffer.get());
}
buffer.clear();
bytesRead = socketChannel.read(buffer);
}
String requestString = requestBuilder.toString();
logger.info("handle read requestString={}", requestString);
TimeUnit.SECONDS.sleep(5); // 模拟耗时操作
logger.info("start write");
writeToClient(socketChannel, requestString);
logger.info("end writeToClient");
socketChannel.close();
}
private void writeToClient(SocketChannel socketChannel, String requestString) throws IOException {
String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
ByteBuffer buffer = ByteBuffer.wrap(httpText.getBytes("UTF-8"));
socketChannel.write(buffer);
}
}
v4-nio+thread
不过测试发现,依然会阻塞在 sleep 的地方。
调整如下:
package com.github.houbb.minicat.bs.servlet;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import com.github.houbb.minicat.util.InnerHttpUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MiniCatBootstrapNioThreadSocket {
private static final Log logger = LogFactory.getLog(MiniCatBootstrapNioThreadSocket.class);
private final int port;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private ExecutorService threadPool;
public MiniCatBootstrapNioThreadSocket() {
this.port = 8080;
this.threadPool = Executors.newFixedThreadPool(10); // 10个线程的线程池
}
public void start() {
logger.info("[MiniCat] start listen on port {}", port);
logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
}
keyIterator.remove();
}
}
} catch (IOException e) {
logger.error("[MiniCat] start meet ex", e);
throw new MiniCatException(e);
}
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
private void handleRead(SelectionKey key) throws IOException {
threadPool.execute(() -> {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder requestBuilder = new StringBuilder();
int bytesRead = socketChannel.read(buffer);
while (bytesRead > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
requestBuilder.append((char) buffer.get());
}
buffer.clear();
bytesRead = socketChannel.read(buffer);
}
String requestString = requestBuilder.toString();
logger.info("read requestString={}", requestString);
TimeUnit.SECONDS.sleep(5); // 模拟耗时操作
writeToClient(socketChannel, requestString);
logger.info("writeToClient done");
socketChannel.close();
} catch (InterruptedException | IOException e) {
logger.error("[MiniCat] error processing request", e);
}
});
}
private void writeToClient(SocketChannel socketChannel, String requestString) throws IOException {
String httpText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);
ByteBuffer buffer = ByteBuffer.wrap(httpText.getBytes("UTF-8"));
socketChannel.write(buffer);
}
public void shutdown() {
try {
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("[MiniCat] error shutting down thread pool", e);
Thread.currentThread().interrupt();
} finally {
try {
selector.close();
serverSocketChannel.close();
} catch (IOException e) {
logger.error("[MiniCat] error closing server socket", e);
}
}
}
}
v5-netty
看的出来,我们废了很大的精力才实现了 nio。
其实 netty 就是针对 nio api 设计的过于复杂的问题,做了大量的改进和优化。
我们来一起欣赏一下 netty 的版本:
package com.github.houbb.minicat.bs.servlet;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.exception.MiniCatException;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MiniCatBootstrapNetty {
private static final Log logger = LogFactory.getLog(MiniCatBootstrapNetty.class);
/**
* 启动端口号
*/
private final int port;
public MiniCatBootstrapNetty() {
this.port = 8080;
}
public void start() {
logger.info("[MiniCat] start listen on port {}", port);
logger.info("[MiniCat] visit url http://{}:{}", "127.0.0.1", port);
EventLoopGroup bossGroup = new NioEventLoopGroup();
//worker 线程池的数量默认为 CPU 核心数的两倍
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MiniCatNettyServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture future = serverBootstrap.bind(port).sync();
// Wait until the server socket is closed.
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("[MiniCat] start meet ex", e);
throw new MiniCatException(e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
package com.github.houbb.minicat.bs.servlet;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.minicat.util.InnerHttpUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
public class MiniCatNettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Log logger = LogFactory.getLog(MiniCatNettyServerHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String requestString = new String(bytes, Charset.defaultCharset());
logger.info("channelRead requestString={}", requestString);
// Simulating some processing time
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String respText = InnerHttpUtil.http200Resp("ECHO: \r\n" + requestString);;
ByteBuf responseBuf = Unpooled.copiedBuffer(respText.getBytes());
ctx.writeAndFlush(responseBuf)
.addListener(ChannelFutureListener.CLOSE); // Close the channel after sending the response
logger.info("channelRead writeAndFlush DONE");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught", cause);
ctx.close();
}
}
开源地址
/\_/\
( o.o )
> ^ <
mini-cat 是简易版本的 tomcat 实现。别称【嗅虎】(心有猛虎,轻嗅蔷薇。)