程序员必备的十大技能(进阶版)之网络与高并发原理(二)

简介: 教程来源 http://hllft.cn/ Reactor与Proactor是高性能I/O的两大核心模型:Reactor基于同步非阻塞+事件就绪通知(如epoll),由用户线程主动读写;Proactor基于异步I/O+操作完成通知(如IOCP/AIO),内核代为完成数据搬运。二者分别支撑Netty与Windows服务器等高并发架构。

三、Reactor与Proactor模型

3.1 Reactor模型(同步非阻塞)

┌─────────────────────────────────────────────────────────────────────────┐
│                         Reactor模型架构图                               │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   ┌─────────────────────────────────────────────────────────────┐       │
│   │                      Reactor                                  │       │
│   │   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐      │       │
│   │   │   Selector  │←──→│   分发器    │←──→│  事件队列   │      │       │
│   │   └─────────────┘    └──────┬──────┘    └─────────────┘      │       │
│   │                              │                                 │       │
│   └──────────────────────────────┼─────────────────────────────────┘       │
│                                  │                                         │
│           ┌──────────────────────┼──────────────────────┐                  │
│           │                      │                      │                  │
│           ▼                      ▼                      ▼                  │
│   ┌───────────────┐      ┌───────────────┐      ┌───────────────┐          │
│   │  Acceptor     │      │   Handler     │      │   Handler     │          │
│   │  (处理连接)   │      │   (读/写)     │      │   (读/写)     │          │
│   └───────────────┘      └───────────────┘      └───────────────┘          │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Reactor三种模式:
1. 单Reactor单线程(Redis 6.0之前)
2. 单Reactor多线程(Netty Boss Group + Worker Group)
3. 主从Reactor多线程(Netty多Boss Group)
// 简易Reactor模型实现
public class SimpleReactor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocket;

    public SimpleReactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);

        // 注册ACCEPT事件
        SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        key.attach(new Acceptor());
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();

                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    dispatch(key);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey key) {
        Runnable handler = (Runnable) key.attachment();
        if (handler != null) {
            handler.run();
        }
    }

    // Acceptor负责接受连接
    private class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel client = serverSocket.accept();
                if (client != null) {
                    new Handler(selector, client);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // Handler负责读写
    private class Handler implements Runnable {
        private final SocketChannel channel;
        private final SelectionKey key;
        private ByteBuffer input = ByteBuffer.allocate(1024);
        private ByteBuffer output = ByteBuffer.allocate(1024);
        private static final int READING = 0, WRITING = 1;
        private int state = READING;

        Handler(Selector selector, SocketChannel channel) throws IOException {
            this.channel = channel;
            channel.configureBlocking(false);
            key = channel.register(selector, 0);
            key.attach(this);
            key.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }

        @Override
        public void run() {
            try {
                if (state == READING) {
                    read();
                } else if (state == WRITING) {
                    write();
                }
            } catch (IOException e) {
                e.printStackTrace();
                close();
            }
        }

        private void read() throws IOException {
            int read = channel.read(input);
            if (read == -1) {
                close();
                return;
            }
            if (read > 0) {
                input.flip();
                // 处理数据
                byte[] data = new byte[input.remaining()];
                input.get(data);
                System.out.println("Received: " + new String(data));

                // 切换到写状态
                state = WRITING;
                key.interestOps(SelectionKey.OP_WRITE);
            }
        }

        private void write() throws IOException {
            output.clear();
            output.put("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".getBytes());
            output.flip();
            channel.write(output);
            state = READING;
            key.interestOps(SelectionKey.OP_READ);
        }

        private void close() {
            try {
                key.cancel();
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new SimpleReactor(8080).run();
    }
}

3.2 Netty架构解析

// Netty服务端完整示例
public class NettyServer {

    public static void main(String[] args) {
        // Boss Group:负责接受新连接(通常1个线程)
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // Worker Group:负责处理读写(通常CPU核心数*2)
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                // TCP参数优化
                .option(ChannelOption.SO_BACKLOG, 1024)           // listen队列大小
                .option(ChannelOption.SO_REUSEADDR, true)         // 端口复用
                .childOption(ChannelOption.TCP_NODELAY, true)     // 禁用Nagle算法
                .childOption(ChannelOption.SO_KEEPALIVE, true)    // 开启KeepAlive
                .childOption(ChannelOption.SO_RCVBUF, 65536)      // 接收缓冲区
                .childOption(ChannelOption.SO_SNDBUF, 65536)      // 发送缓冲区
                // 内存分配优化
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();

                        // 编解码器
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4));
                        pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
                        pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));

                        // 业务处理器
                        pipeline.addLast(new BusinessHandler());

                        // 空闲检测(读空闲30秒触发)
                        pipeline.addLast(new IdleStateHandler(30, 0, 0));
                    }
                });

            // 绑定端口
            ChannelFuture future = bootstrap.bind(8080).sync();
            System.out.println("Netty server started on port 8080");
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // 业务处理器
    @ChannelHandler.Sharable
    static class BusinessHandler extends SimpleChannelInboundHandler<String> {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("New connection: " + ctx.channel().remoteAddress());
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            // 业务处理
            String response = handleRequest(msg);

            // 异步写入(Netty默认使用ChannelFutureListener自动释放)
            ctx.writeAndFlush(response)
                .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }

        private String handleRequest(String request) {
            // 模拟业务处理
            return "HTTP/1.1 200 OK\r\n\r\nHello Netty";
        }
    }
}

// Netty线程模型核心源码分析
// NioEventLoop.run() - 主循环
protected void run() {
    for (;;) {
        try {
            // 1. 执行selector.select()
            select();

            // 2. 处理IO事件
            processSelectedKeys();

            // 3. 执行任务队列(普通任务 + 定时任务)
            runAllTasks();
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

3.3 Proactor模型(异步I/O)

// AIO(Asynchronous I/O)示例 - Windows IOCP / Linux AIO
public class AIOServer {

    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel serverChannel = 
            AsynchronousServerSocketChannel.open();
        serverChannel.bind(new InetSocketAddress(8080));

        // 异步接受连接
        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
                // 继续接受下一个连接
                serverChannel.accept(null, this);

                // 分配缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);

                // 异步读取数据
                clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        if (result == -1) {
                            close(clientChannel);
                            return;
                        }

                        attachment.flip();
                        byte[] data = new byte[attachment.remaining()];
                        attachment.get(data);
                        System.out.println("Received: " + new String(data));

                        // 异步写入响应
                        ByteBuffer response = ByteBuffer.wrap("OK".getBytes());
                        clientChannel.write(response, null, new CompletionHandler<Integer, Void>() {
                            @Override
                            public void completed(Integer result, Void attachment) {
                                close(clientChannel);
                            }

                            @Override
                            public void failed(Throwable exc, Void attachment) {
                                close(clientChannel);
                            }
                        });
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        close(clientChannel);
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });

        // 保持主线程运行
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void close(AsynchronousSocketChannel channel) {
        try {
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

来源:
http://tmywi.cn/

相关文章
|
21小时前
|
缓存 弹性计算 应用服务中间件
高端网站搭建:Nginx 反向代理与动静分离架构配置详解
在现代企业级 Web 架构中,Nginx 凭借其极低的内存消耗和超强的高并发处理能力,成为了不可或缺的流量网关。特别是在阿里云 ECS 实例搭配 Alibaba Cloud Linux 3 的环境下,Nginx 能够充分利用操作系统的网络栈优化,实现惊人的吞吐量。 本文将详细介绍如何配置 Nginx 的反向代理与动静分离,将静态资源请求与动态接口请求完美剥离,从而大幅提升网站的整体响应速度。
|
21小时前
|
人工智能 安全 Shell
Harness Engineering 被讲烂之后,Agent 工程真正难的是什么?
看 Anthropic、OpenAI、Gemini 的 Harness 都在做啥?
251 0
|
21小时前
|
存储 人工智能 弹性计算
阿里云正式推出首个 OPC 专属产品套餐,护航 OPC 从起步到规模化全阶段
2026年,AI驱动“一人公司”(OPC)兴起。阿里云首发OPC创新助力计划,推出Starter/Lite/Pro三档全栈云套餐,覆盖验证、增长到成熟全周期:低成本试错、高稳架构、全球加速与安全防护,并提供Token补贴、1V1技术护航及生态资源支持。(239字)
阿里云正式推出首个 OPC 专属产品套餐,护航 OPC 从起步到规模化全阶段
|
21小时前
|
人工智能 缓存 自然语言处理
阿里云AI大模型节省计划解读:优势、使用、资源包、折扣信息及问题解答FAQ
阿里云AI模型节省计划是百炼平台推出的折扣方案,支持按量付费用户通过承诺周期内月消费金额,享最高5.3折优惠。覆盖千问、向量、语音、图像等全系阿里直供模型,自动抵扣,灵活开通,显著降低大模型调用成本。阿里云百炼AI大模型平台:https://t.aliyun.com/U/fPVHqY
159 0
|
21小时前
|
存储 人工智能 自然语言处理
知识库为谁而建 ?
随着 Agent 的逐步广泛应用,知识库的使用者正在从人变成 Agent。 知识库的设计逻辑、维护方式、甚至存在的意义,都需要重新思考。
236 10
知识库为谁而建 ?
|
21小时前
|
缓存 人工智能 JavaScript
Markstream-VUE:构建高性能流式 Markdown 渲染器
在 AI 对话、实时协作文档、知识库等场景中,Markdown 内容的流式渲染已成为刚需。传统方案面临"闪烁重绘"、"内存暴涨"、"大文档卡顿"三大痛点。本文将深度剖析开源项目https://github.com/Simon-He95/markstream-vue的技术架构,从流式解析算法、虚拟化渲染策略、Monaco 增量更新、渐进式图表渲染四个维度,揭示其实现"零闪烁、低内存、高响应"流式体验的核心原理,并提供可直接落地的性能调优方案。
226 8
Markstream-VUE:构建高性能流式 Markdown 渲染器
|
21小时前
|
数据采集 人工智能 缓存
字节面试官:别再直接让 AI 写代码了,先学会 SDD 规格驱动开发
AI编程虽快,但需求模糊易致代码失控。SDD(规格驱动开发)主张先明确定义目标、边界、行为、约束与验收标准,再让AI编码。对测试开发尤为关键——它将模糊需求转化为可测、可验、可追溯的质量规格,推动测试前置、风险可控、回归有据。
|
21小时前
|
机器学习/深度学习 人工智能 网络架构
深度解析:Transformer 的“灵魂”——QKV 变换的物理直觉
本文用图书馆检索等生活隐喻,从物理意义与认知科学角度解析Transformer中QKV设计的精妙本质:解耦查询(q)、键(k)、值(v)三重角色,实现语义分离、避免自注意力“自恋”,模拟人类动态信息路由的认知过程。(239字)
217 13
|
27天前
|
XML 前端开发 程序员
初级程序员必备的十大技能之 API 接口与前后端联调(一)
教程来源 http://qeext.cn/ 本文系统讲解API设计规范(RESTful/GraphQL)、HTTP协议核心(方法、状态码、头信息)、前后端联调流程及调试工具,助你打造标准化、高可用接口,打破前后端协作孤岛。
|
2月前
|
人工智能 芯片
万相2.7,模型使用指南
万相2.7,拥有全面的创作控制力,将AI的能力从单一素材生成扩至创作全链路,从“演”迈向“导” 。
万相2.7,模型使用指南