阿里P8架构师教你Netty源码面试解析实战(二)- 基本组件

简介: 阿里P8架构师教你Netty源码面试解析实战(二)- 基本组件

1 传统socket编程

1.1 实战

服务端:ServerBoot

/**
 * @author JavaEdge
 */
public class ServerBoot {
    private static final int PORT = 8000;
    public static void main(String[] args) {
        Server server = new Server(PORT);
        server.start();
    }
}

Server

/**
 * @author JavaEdge
 */
public class Server {
    private ServerSocket serverSocket;
    public Server(int port) {
        try {
            this.serverSocket = new ServerSocket(port);
            System.out.println("Server starts success,端口:" + port);
        } catch (IOException exception) {
            System.out.println("Server starts failed");
        }
    }
    public void start() {
        new Thread(() -> doStart()).start();
    }
    private void doStart() {
        while (true) {
            try {
                Socket client = serverSocket.accept();
                new ClientHandler(client).start();
            } catch (IOException e) {
                System.out.println("Server failure");
            }
        }
    }
}

ClientHandler

/**
 * @author JavaEdge
 */
public class ClientHandler {
    public static final int MAX_DATA_LEN = 1024;
    private final Socket socket;
    public ClientHandler(Socket socket) {
        this.socket = socket;
    }
    public void start() {
        System.out.println("新客户端接入");
        new Thread(() -> doStart()).start();
    }
    private void doStart() {
        try {
            InputStream inputStream = socket.getInputStream();
            while (true) {
                byte[] data = new byte[MAX_DATA_LEN];
                int len;
                while ((len = inputStream.read(data)) != -1) {
                    String message = new String(data, 0, len);
                    System.out.println("客户端传来消息: " + message);
                    socket.getOutputStream().write(data);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Client

/**
 * @author JavaEdge
 */
public class Client {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 8000;
    private static final int SLEEP_TIME = 5000;
    public static void main(String[] args) throws IOException {
        final Socket socket = new Socket(HOST, PORT);
        new Thread(() -> {
            System.out.println("客户端启动成功!");
            while (true) {
                try {
                    String message = "hello world";
                    System.out.println("客户端发送数据: " + message);
                    socket.getOutputStream().write(message.getBytes());
                } catch (Exception e) {
                    System.out.println("写数据出错!");
                }
                sleep();
            }
        }).start();
    }
    private static void sleep() {
        try {
            Thread.sleep(SLEEP_TIME);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

先后启动 ServerBootClient,输出

Server starts success,端口:8000
新客户端接入
客户端传来消息: hello worldhello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端启动成功!
客户端发送数据: hello world
客户端发送数据: hello world
客户端发送数据: hello world

1.2 传统HTTP服务器原理

  1. 创建一个ServerSocket
  2. 监听并绑定一个端口一系列客户端来请求这个端口服务器使用Accept,获得一个来自客户端的Socket连接对象
  3. 启动一个新线程处理连接读Socket,
  • 得到字节流解码协议
  • 得到Http请求对象处理Http请求
  • 得到一个结果
  • 封装成一个HttpResponse对象编码协议
  • 将结果序列化字节流写Socket,
  • 将字节流发给客户端
  • 继续循环步骤3

1.3 C/S 交互流程

2  Netty版socket编程

3 Netty核心组件

3.1 NioEventLoop

3.1.1 简介

① EventLoop

一个 EventLoop 就是一个 eventexecutor:

package io.netty.channel;
import io.netty.util.concurrent.OrderedEventExecutor;
/**
 * 一旦Channel注册了,将处理所有的  I/O 操作
 * 一个 EventLoop 实例通常处理一个以上的Channel ,但这取决于执行的细节和内幕
 */
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}

NioEventLoopGroup ,Netty 框架提供的一个基于 NIO 的实现。是一个处理 I/O 操作的多线程事件循环的组。Netty 为不同类型的传输提供各种 EventLoopGroup 实现。

示例代码中实现服务器端应用程序,将使用两个NioEventLoopGroup:

  • boss,接受传入的连接。因为accept事件只需建立一次连接,连接是可以复用的,accept只接受一次
  • work,在上司接受连接并登记到工作人员后,处理接受连接的流量。使用多少线程及如何映射到创建的通道,取决于 EventLoopGroup 实现,甚至可能通过构造函数配置

Netty的发动机:

Server端

Client端

while(true)就对应一个 run 方法。

NioEventLoop#run

@Override
    protected void run() {
        for (;;) {
            oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select();
                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();
                  
                   // 打断点
                    processSelectedKeys();
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);
                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

netty有不同I/O编程模型实现。以NIO为例,对IO事件的处理是在NioEventLoop里,事件的注册:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            // 首先处理 OP_WRITE,因为我们可写一些排队的缓冲区,从而释放内存
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // 调用 forceFlush,一旦无剩余可写内容,它也将清除 OP_WRITE
                // 可见,注册 OP_WRITE 事件,要执行的就是 flush 操作.
                ch.unsafe().forceFlush();
            }
            // 还要校验 readOps 为0以解决可能的JDK bug,否则可能导致 spin loop
            // 处理读请求(断开连接)或接入连接
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
private void processSelectedKeys() {
   if (selectedKeys != null) {
     // 不用 JDK 的 selector.selectedKeys,性能更好(%1-2%),GC更少
     processSelectedKeysOptimized();
   } else {
     processSelectedKeysPlain(selector.selectedKeys());
   }
 }

不同事件调用unsafe的不同方法,Netty对底层socket的操作都通过

unsafe
  1. NioMessageUnsafeNioServerSocketChannel使用NioMessageUnsafe做socket操作
  2. NioByteUnsafeNioSocketChannel使用NioByteUnsafe做socket操作

处理每个连接:

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // 数组中的空节点允许通道关闭后对其GC
        // 在SelectedSelectionKeySet中使用单数组
        // 动机:SelectedSelectionKeySet当前在内部使用2个数组,并且期望用户调用flip访问基础数组并切换活动数组。
        // 但是,我们不能同时使用2个数组,如果在重置数组元素时格外小心,就可以摆脱使用单数组。
        // 修改: 介绍包装了Selector的SelectedSelectionKeySetSelector并确保我们在选择之前重置基础的SelectedSelectionKeySet数据结构-
        // NioEventLoop#processSelectedKeysOptimized中的循环边界可以更精确地定义,因为我们知道基础数组的实际大小
        selectedKeys.keys[i] = null;
        // attachment 就是 NioServerSocketChannel
        final Object a = k.attachment();
       // 打断点!!!
        if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}
② EventExecutorGroup

负责:

  • 经由其使用next()方法,提供EventExecutor
  • 处理自己的生命周期,并允许在全局模式中关闭它们
③ EventExecutor

特殊的EventExecutorGroup,附带一些快捷方法,看是否有Thread在事件循环执行。

④ EventLoopGroup
public interface EventLoopGroup extends EventExecutorGroup

特殊的 EventExecutorGroup,允许注册 Channel,即事件循环期间可执行 channel 操作,得到处理,供以后选用。

/**
 * EventExecutorGroup实现的抽象基类,它同时处理多个线程的任务
 */
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
  // 底层数组存储
    private final EventExecutor[] children;

3.2 Channel

以服务端的NioMessageUnsafe为例来看下read()方法的实现,对应是否有新连接进来的情况

直接把底层的 channel 封装成 NioSocketChannel

3.3 ByteBuf

3.4 Pipeline

小样例中对应内容,实际非常复杂

netty 将其抽象成逻辑链,netty怎么把每个 pipeline 加入到客户端连接的呢?

AbstractChannel

protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

3.5 ChannelHandler

// 主要入参的ChannelHandler
@Override
public final ChannelPipeline addAfter(
        EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    final AbstractChannelHandlerContext ctx;
    synchronized (this) {
        checkMultiplicity(handler);
        name = filterName(name, handler);
        ctx = getContextOrDie(baseName);
        newCtx = newContext(group, name, handler);
        addAfter0(ctx, newCtx);
        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we remove the context from the pipeline and add a task that will call
        // ChannelHandler.handlerRemoved(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}
目录
相关文章
|
7月前
|
存储 缓存 NoSQL
Redis常见面试题全解析
Redis面试高频考点全解析:从过期删除、内存淘汰策略,到缓存雪崩、击穿、穿透及BigKey问题,深入原理与实战解决方案,助你轻松应对技术挑战,提升系统性能与稳定性。(238字)
|
8月前
|
机器学习/深度学习 人工智能 搜索推荐
从零构建短视频推荐系统:双塔算法架构解析与代码实现
短视频推荐看似“读心”,实则依赖双塔推荐系统:用户塔与物品塔分别将行为与内容编码为向量,通过相似度匹配实现精准推送。本文解析其架构原理、技术实现与工程挑战,揭秘抖音等平台如何用AI抓住你的注意力。
2121 7
从零构建短视频推荐系统:双塔算法架构解析与代码实现
|
7月前
|
机器学习/深度学习 人工智能 自然语言处理
34_GPT系列:从1到5的架构升级_深度解析
大型语言模型(LLM)的发展历程中,OpenAI的GPT系列无疑扮演着至关重要的角色。自2018年GPT-1问世以来,每一代GPT模型都在架构设计、预训练策略和性能表现上实现了质的飞跃。本专题将深入剖析GPT系列从1.17亿参数到能够处理百万级token上下文的技术演进,特别关注2025年8月8日发布的GPT-5如何引领大模型技术迈向通用人工智能(AGI)的重要一步。
828 2
|
7月前
|
监控 Java 关系型数据库
面试性能测试总被刷?学员真实遇到的高频问题全解析!
面试常被性能测试题难住?其实考的不是工具,而是分析思维。从脚本编写到瓶颈定位,企业更看重系统理解与实战能力。本文拆解高频面试题,揭示背后考察逻辑,并通过真实项目训练,帮你构建性能测试完整知识体系,实现从“会操作”到“能解决问题”的跨越。
|
7月前
|
存储 监控 安全
132_API部署:FastAPI与现代安全架构深度解析与LLM服务化最佳实践
在大语言模型(LLM)部署的最后一公里,API接口的设计与安全性直接决定了模型服务的可用性、稳定性与用户信任度。随着2025年LLM应用的爆炸式增长,如何构建高性能、高安全性的REST API成为开发者面临的核心挑战。FastAPI作为Python生态中最受青睐的Web框架之一,凭借其卓越的性能、强大的类型安全支持和完善的文档生成能力,已成为LLM服务化部署的首选方案。
1263 3
|
8月前
|
存储 监控 NoSQL
Redis高可用架构全解析:从主从复制到集群方案
Redis高可用确保服务持续稳定,避免单点故障导致数据丢失或业务中断。通过主从复制实现数据冗余,哨兵模式支持自动故障转移,Cluster集群则提供分布式数据分片与水平扩展,三者层层递进,保障读写分离、容灾切换与大规模数据存储,构建高性能、高可靠的Redis架构体系。
|
7月前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路

热门文章

最新文章

推荐镜像

更多
  • DNS