三、Reactor与Proactor模型
3.1 Reactor模型(同步非阻塞)
┌─────────────────────────────────────────────────────────────────────────┐
│ Reactor模型架构图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Reactor │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Selector │←──→│ 分发器 │←──→│ 事件队列 │ │ │
│ │ └─────────────┘ └──────┬──────┘ └─────────────┘ │ │
│ │ │ │ │
│ └──────────────────────────────┼─────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────┼──────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ Acceptor │ │ Handler │ │ Handler │ │
│ │ (处理连接) │ │ (读/写) │ │ (读/写) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Reactor三种模式:
1. 单Reactor单线程(Redis 6.0之前)
2. 单Reactor多线程(Netty Boss Group + Worker Group)
3. 主从Reactor多线程(Netty多Boss Group)
// 简易Reactor模型实现
public class SimpleReactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocket;
public SimpleReactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
// 注册ACCEPT事件
SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
key.attach(new Acceptor());
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
dispatch(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey key) {
Runnable handler = (Runnable) key.attachment();
if (handler != null) {
handler.run();
}
}
// Acceptor负责接受连接
private class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel client = serverSocket.accept();
if (client != null) {
new Handler(selector, client);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// Handler负责读写
private class Handler implements Runnable {
private final SocketChannel channel;
private final SelectionKey key;
private ByteBuffer input = ByteBuffer.allocate(1024);
private ByteBuffer output = ByteBuffer.allocate(1024);
private static final int READING = 0, WRITING = 1;
private int state = READING;
Handler(Selector selector, SocketChannel channel) throws IOException {
this.channel = channel;
channel.configureBlocking(false);
key = channel.register(selector, 0);
key.attach(this);
key.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
} catch (IOException e) {
e.printStackTrace();
close();
}
}
private void read() throws IOException {
int read = channel.read(input);
if (read == -1) {
close();
return;
}
if (read > 0) {
input.flip();
// 处理数据
byte[] data = new byte[input.remaining()];
input.get(data);
System.out.println("Received: " + new String(data));
// 切换到写状态
state = WRITING;
key.interestOps(SelectionKey.OP_WRITE);
}
}
private void write() throws IOException {
output.clear();
output.put("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".getBytes());
output.flip();
channel.write(output);
state = READING;
key.interestOps(SelectionKey.OP_READ);
}
private void close() {
try {
key.cancel();
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new SimpleReactor(8080).run();
}
}
3.2 Netty架构解析
// Netty服务端完整示例
public class NettyServer {
public static void main(String[] args) {
// Boss Group:负责接受新连接(通常1个线程)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// Worker Group:负责处理读写(通常CPU核心数*2)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP参数优化
.option(ChannelOption.SO_BACKLOG, 1024) // listen队列大小
.option(ChannelOption.SO_REUSEADDR, true) // 端口复用
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法
.childOption(ChannelOption.SO_KEEPALIVE, true) // 开启KeepAlive
.childOption(ChannelOption.SO_RCVBUF, 65536) // 接收缓冲区
.childOption(ChannelOption.SO_SNDBUF, 65536) // 发送缓冲区
// 内存分配优化
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 编解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4));
pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
// 业务处理器
pipeline.addLast(new BusinessHandler());
// 空闲检测(读空闲30秒触发)
pipeline.addLast(new IdleStateHandler(30, 0, 0));
}
});
// 绑定端口
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Netty server started on port 8080");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 业务处理器
@ChannelHandler.Sharable
static class BusinessHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("New connection: " + ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 业务处理
String response = handleRequest(msg);
// 异步写入(Netty默认使用ChannelFutureListener自动释放)
ctx.writeAndFlush(response)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
private String handleRequest(String request) {
// 模拟业务处理
return "HTTP/1.1 200 OK\r\n\r\nHello Netty";
}
}
}
// Netty线程模型核心源码分析
// NioEventLoop.run() - 主循环
protected void run() {
for (;;) {
try {
// 1. 执行selector.select()
select();
// 2. 处理IO事件
processSelectedKeys();
// 3. 执行任务队列(普通任务 + 定时任务)
runAllTasks();
} catch (Throwable t) {
handleLoopException(t);
}
}
}
3.3 Proactor模型(异步I/O)
// AIO(Asynchronous I/O)示例 - Windows IOCP / Linux AIO
public class AIOServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(8080));
// 异步接受连接
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
// 继续接受下一个连接
serverChannel.accept(null, this);
// 分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 异步读取数据
clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result == -1) {
close(clientChannel);
return;
}
attachment.flip();
byte[] data = new byte[attachment.remaining()];
attachment.get(data);
System.out.println("Received: " + new String(data));
// 异步写入响应
ByteBuffer response = ByteBuffer.wrap("OK".getBytes());
clientChannel.write(response, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
close(clientChannel);
}
@Override
public void failed(Throwable exc, Void attachment) {
close(clientChannel);
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
close(clientChannel);
}
});
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
// 保持主线程运行
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void close(AsynchronousSocketChannel channel) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}