程序员必备的十大技能(进阶版)之网络与高并发原理(二)

简介: 教程来源 http://hllft.cn/ Reactor与Proactor是高性能I/O的两大核心模型:Reactor基于同步非阻塞+事件就绪通知(如epoll),由用户线程主动读写;Proactor基于异步I/O+操作完成通知(如IOCP/AIO),内核代为完成数据搬运。二者分别支撑Netty与Windows服务器等高并发架构。

三、Reactor与Proactor模型

3.1 Reactor模型(同步非阻塞)

┌─────────────────────────────────────────────────────────────────────────┐
│                         Reactor模型架构图                               │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   ┌─────────────────────────────────────────────────────────────┐       │
│   │                      Reactor                                  │       │
│   │   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐      │       │
│   │   │   Selector  │←──→│   分发器    │←──→│  事件队列   │      │       │
│   │   └─────────────┘    └──────┬──────┘    └─────────────┘      │       │
│   │                              │                                 │       │
│   └──────────────────────────────┼─────────────────────────────────┘       │
│                                  │                                         │
│           ┌──────────────────────┼──────────────────────┐                  │
│           │                      │                      │                  │
│           ▼                      ▼                      ▼                  │
│   ┌───────────────┐      ┌───────────────┐      ┌───────────────┐          │
│   │  Acceptor     │      │   Handler     │      │   Handler     │          │
│   │  (处理连接)   │      │   (读/写)     │      │   (读/写)     │          │
│   └───────────────┘      └───────────────┘      └───────────────┘          │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Reactor三种模式:
1. 单Reactor单线程(Redis 6.0之前)
2. 单Reactor多线程(Netty Boss Group + Worker Group)
3. 主从Reactor多线程(Netty多Boss Group)
// 简易Reactor模型实现
public class SimpleReactor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocket;

    public SimpleReactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);

        // 注册ACCEPT事件
        SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        key.attach(new Acceptor());
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();

                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    dispatch(key);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey key) {
        Runnable handler = (Runnable) key.attachment();
        if (handler != null) {
            handler.run();
        }
    }

    // Acceptor负责接受连接
    private class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel client = serverSocket.accept();
                if (client != null) {
                    new Handler(selector, client);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // Handler负责读写
    private class Handler implements Runnable {
        private final SocketChannel channel;
        private final SelectionKey key;
        private ByteBuffer input = ByteBuffer.allocate(1024);
        private ByteBuffer output = ByteBuffer.allocate(1024);
        private static final int READING = 0, WRITING = 1;
        private int state = READING;

        Handler(Selector selector, SocketChannel channel) throws IOException {
            this.channel = channel;
            channel.configureBlocking(false);
            key = channel.register(selector, 0);
            key.attach(this);
            key.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }

        @Override
        public void run() {
            try {
                if (state == READING) {
                    read();
                } else if (state == WRITING) {
                    write();
                }
            } catch (IOException e) {
                e.printStackTrace();
                close();
            }
        }

        private void read() throws IOException {
            int read = channel.read(input);
            if (read == -1) {
                close();
                return;
            }
            if (read > 0) {
                input.flip();
                // 处理数据
                byte[] data = new byte[input.remaining()];
                input.get(data);
                System.out.println("Received: " + new String(data));

                // 切换到写状态
                state = WRITING;
                key.interestOps(SelectionKey.OP_WRITE);
            }
        }

        private void write() throws IOException {
            output.clear();
            output.put("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".getBytes());
            output.flip();
            channel.write(output);
            state = READING;
            key.interestOps(SelectionKey.OP_READ);
        }

        private void close() {
            try {
                key.cancel();
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new SimpleReactor(8080).run();
    }
}

3.2 Netty架构解析

// Netty服务端完整示例
public class NettyServer {

    public static void main(String[] args) {
        // Boss Group:负责接受新连接(通常1个线程)
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // Worker Group:负责处理读写(通常CPU核心数*2)
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // TCP参数优化
                .option(ChannelOption.SO_BACKLOG, 1024)           // listen队列大小
                .option(ChannelOption.SO_REUSEADDR, true)         // 端口复用
                .childOption(ChannelOption.TCP_NODELAY, true)     // 禁用Nagle算法
                .childOption(ChannelOption.SO_KEEPALIVE, true)    // 开启KeepAlive
                .childOption(ChannelOption.SO_RCVBUF, 65536)      // 接收缓冲区
                .childOption(ChannelOption.SO_SNDBUF, 65536)      // 发送缓冲区
                // 内存分配优化
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();

                        // 编解码器
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4));
                        pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
                        pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));

                        // 业务处理器
                        pipeline.addLast(new BusinessHandler());

                        // 空闲检测(读空闲30秒触发)
                        pipeline.addLast(new IdleStateHandler(30, 0, 0));
                    }
                });

            // 绑定端口
            ChannelFuture future = bootstrap.bind(8080).sync();
            System.out.println("Netty server started on port 8080");
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // 业务处理器
    @ChannelHandler.Sharable
    static class BusinessHandler extends SimpleChannelInboundHandler<String> {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("New connection: " + ctx.channel().remoteAddress());
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            // 业务处理
            String response = handleRequest(msg);

            // 异步写入(Netty默认使用ChannelFutureListener自动释放)
            ctx.writeAndFlush(response)
                .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }

        private String handleRequest(String request) {
            // 模拟业务处理
            return "HTTP/1.1 200 OK\r\n\r\nHello Netty";
        }
    }
}

// Netty线程模型核心源码分析
// NioEventLoop.run() - 主循环
protected void run() {
    for (;;) {
        try {
            // 1. 执行selector.select()
            select();

            // 2. 处理IO事件
            processSelectedKeys();

            // 3. 执行任务队列(普通任务 + 定时任务)
            runAllTasks();
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

3.3 Proactor模型(异步I/O)

// AIO(Asynchronous I/O)示例 - Windows IOCP / Linux AIO
public class AIOServer {

    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel serverChannel = 
            AsynchronousServerSocketChannel.open();
        serverChannel.bind(new InetSocketAddress(8080));

        // 异步接受连接
        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
                // 继续接受下一个连接
                serverChannel.accept(null, this);

                // 分配缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);

                // 异步读取数据
                clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        if (result == -1) {
                            close(clientChannel);
                            return;
                        }

                        attachment.flip();
                        byte[] data = new byte[attachment.remaining()];
                        attachment.get(data);
                        System.out.println("Received: " + new String(data));

                        // 异步写入响应
                        ByteBuffer response = ByteBuffer.wrap("OK".getBytes());
                        clientChannel.write(response, null, new CompletionHandler<Integer, Void>() {
                            @Override
                            public void completed(Integer result, Void attachment) {
                                close(clientChannel);
                            }

                            @Override
                            public void failed(Throwable exc, Void attachment) {
                                close(clientChannel);
                            }
                        });
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        close(clientChannel);
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });

        // 保持主线程运行
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void close(AsynchronousSocketChannel channel) {
        try {
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

来源:
http://tmywi.cn/

相关文章
|
8天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
2763 15
|
6天前
|
人工智能 开发工具 iOS开发
Claude Code 新手完全上手指南:安装、国产模型配置与常用命令全解
Claude Code 是一款运行在终端环境中的 AI 编程助手,能够直接在命令行中完成代码生成、项目分析、文件修改、命令执行、Git 管理等开发全流程工作。它最大的特点是**任务驱动、终端原生、轻量高效、多模型兼容**,无需图形界面、不依赖 IDE 插件,能够深度融入开发者日常工作流。
2303 4
|
21天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23554 13
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
8天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
2055 1
|
2天前
|
人工智能 Linux BI
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
JeecgBoot AI专题研究 一键脚本:Claude Code + JeecgBoot Skills + DeepSeek 全平台接入 一行命令装好 Claude Code + JeecgBoot Skills + DeepSeek 接入,无需翻墙使用 Claude Code,支持 Wind
1306 1
国内用 Claude Code 终于不用翻墙了:一行命令搞定,自动接 DeepSeek
|
14天前
|
人工智能 缓存 Shell
Claude Code 全攻略:命令大全 + 实战工作流(完整版)
Claude Code 是一款运行在终端环境下的 AI 编码助手,能够直接在项目目录中理解代码结构、编辑文件、执行命令、执行开发计划,并支持持久化记忆、上下文压缩、后台任务、多模型切换等专业能力。对于日常开发、项目维护、快速重构、代码审查等场景,它可以大幅减少手动操作、提升编码效率。本文从常用命令、界面模式、核心指令、记忆机制、图片处理、进阶工作流等维度完整说明,帮助开发者快速上手并稳定使用。
3456 5
|
7天前
|
人工智能 安全 开发工具
Claude Code 官方工作原理与使用指南
Claude Code 不是传统代码补全工具,而是 Anthropic 推出的终端 AI 代理,具备代理循环、双驱动架构(模型+工具)、全局项目感知、6 种权限模式等核心能力,本文基于官方文档系统解析其工作原理与高效使用技巧。
1095 0