【Netty 从成神到升仙系列 二】你真的懂 NIOEventLoop 嘛?

简介: 【Netty 从成神到升仙系列 二】你真的懂 NIOEventLoop 嘛?

Netty 源码

二、NIOEventLoop

从全局来看,我们的 NioEventLoop 主要包括一下几方面:

  • Selector
public final class NioEventLoop extends SingleThreadEventLoop {
    private Selector selector;
    private Selector unwrappedSelector;
}

线程

/**
 * NioEventLoop - SingleThreadEventLoop - SingleThreadEventExecutor
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    // 一个 NioEventLoop 只包括一个线程
    private volatile Thread thread;
    // 使用的是一个线程,不过这里可以提交一些任务
    private final Executor executor;
}

任务队列

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    // 缓存的任务队列
    // 当任务执行过多时,会将多余的任务缓存至该队列中
    private final Queue<Runnable> taskQueue;
    // 在其父类中,还存在着定时队列,执行定时任务
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
}

1. Selector 何时创建?

先给结论:当我们调用 NIOEventLoop 的构造方法时,进行创建

源码:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        // 执行 openSelector 方法
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
 private SelectorTuple openSelector() {
     final Selector unwrappedSelector;
     // 通过 provider.openSelector 进行创建
     unwrappedSelector = provider.openSelector();
 }

从上述我们可以得知,NioEventLoop 中的 Selector 是通过 provider.openSelector() 进行的创建

所以,我们只要验证 provider.openSelector() == Selector.open()


从上述代码逻辑可看出,我们的 provider.openSelector() == Selector.open() 结论成立,故 Selector 的创建时机成立。

1.1 EventLoop 里面为什么有两个 selector 成员变量?

我们上面讲到了,真正调用 provider.openSelector() 的是 unwrappedSelector 该成员变量

正常来说,我们只有一个 Selector 就足够了,为什么还需要另外的呢?

这里主要和 SelectionKey 优化有关,我们 Java 自己封装的 SelectionKey 是一个 Set 的类型,如下:public abstract Set<SelectionKey> selectedKeys();

这样当我们去遍历的时候,其实效率是比较低的,而 Netty 在此基础进行改良,将 Set 类型修改为 数组 类型,查询效率大大提升

整体源码如下所示:

private SelectorTuple openSelector() {
        //  unwrappedSelector:原始的 selector 创建
        final Selector unwrappedSelector;
        unwrappedSelector = provider.openSelector();
        // Netty 利用数组魔改的 selectedKey 集合
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    // 填充数据
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                }
            }
        };
        selectedKeys = selectedKeySet;
        // 1.创建 SelectorTuple 实例
        // 1.1 unwrappedSelector 正常的 selector 实例
        // 1.2 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet) 魔改的 selector 实例
        return new SelectorTuple(unwrappedSelector,new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
    this.unwrappedSelector = unwrappedSelector;
    this.selector = selector;
}
                                                             NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        final SelectorTuple selectorTuple = openSelector();
        // 魔改的
        this.selector = selectorTuple.selector;
        // 正常的
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

这样,我们基本就了解了两个 Seletor 是为了让我们系统的性能更加的迅速,另外我们看看 SelectedSelectionKeySet 的一些信息

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    SelectionKey[] keys;
    int size;
    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }
}

2. EventLoop 何时启动?

结论:当我们第一次调用 execute 的方法时,就会启动该线程。通过状态位(state)控制线程只会启动一次。

我们先写个测试用例,一步一步 Debug 来看一下:

public class TestEventLoop {
    public static void main(String[] args) {
        EventLoop eventLoop = new NioEventLoopGroup().next();
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello");
            }
        });
    }
}

首先,是我们的 execute 实现:

@Override
public void execute(Runnable task) {
    // 判断当前任务是否为空
    ObjectUtil.checkNotNull(task, "task");
    // 执行任务
    // 此处判断了任务是否为懒加载任务(详细可见懒加载的描述),wakesUpForTask的返回值只会为true
    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

进入 execute 方法:

private void execute(Runnable task, boolean immediate) {
         // 判断当前线程是否为 NIO 线程
        // 判断方法为 return thread == this.thread;
         // this.thread 即为NIO线程,首次执行任务时,其为null
        boolean inEventLoop = inEventLoop();
       // 向我们上面讲的往任务队列(taskQueue)中添加任务 
        addTask(task);
        if (!inEventLoop) {
            // 启动线程
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                }
                if (reject) {
                    reject();
                }
            }
        }
        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

进入 startThread 方法:

private void startThread() {
    // 判断当前的状态:线程是否被创建
    // private static final int ST_NOT_STARTED = 1(默认是1)
    if (state == ST_NOT_STARTED) {
        // 利用 CAS 来完成数值的变更,用来表示当前线程已被创建
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

进入 doStartThread 方法:

 private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // 当前线程赋值为NIO线程
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                }   
}

进入 ingleThreadEventExecutor.this.run()

@Override
protected void run() {
    int selectCnt = 0;
    // 死循环,不断地从任务队列中获取各种任务来执行
    for (;;) {
        // 执行各种任务
    }
}

3.提交普通任务会不会结束 select 阻塞?

结论:提交任务可以结束 select 阻塞,我们会使用 wakeup 进行唤醒操作。

通过上面我们继续往下追,可以看到这么一行代码:

if (!hasTasks()) {
    strategy = select(curDeadlineNanos);
}
private int select(long deadlineNanos) throws IOException {
    // 如果当前没有指定阻塞事件,则调用 select() 方法即可
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    // 否则调用一个带有时间参数的 select() 方法,代表经过多长时间会停止阻塞
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

但这里 selector.select() 为什么要阻塞?

  • 如果让他不断的空轮询会大量的浪费CPU的资源,我们将其阻塞住。

既然阻塞,那必然有唤醒,当我们 有任务需要被执行时,会执行唤醒的操作

// 有任务需要被执行时,唤醒阻塞的NIO线程
if (!addTaskWakesUp && immediate) {
    wakeup(inEventLoop);
}
protected void wakeup(boolean inEventLoop) {
    // 只有当其他线程给当前的 NIO 线程提交任务时(比如执行execute),才会被唤醒
    // 通过 AtomicLong 进行更新,保证只有一个线程,底层使用的是 unsafe 的 CAS 操作
     if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
         // 唤醒被selector.select方法阻塞的NIO线程
         selector.wakeup();
     }
}

当然,唤醒也需要做一些判断:

  • 判断当前提交任务的是不是NIO线程
  • 若是其他线程,则唤醒 NIO 线程
  • 若是 NIO 线程自己,则不能唤醒
  • 通过AtomicLong保证有多个线程同时提交任务时,只有一个线程能够唤醒 NIO 线程(防止多个线程同时唤醒 select,提升性能)

4. 什么时候会进入 SelectStrategy.SELECT 分支?

结论:当我们的任务队列中没有任务时,就会进入该分支,执行上述的阻塞逻辑。

// 主要根据当前有无任务来处理
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
    case SelectStrategy.CONTINUE:
        continue;
    case SelectStrategy.BUSY_WAIT:
        // fall-through to SELECT since the busy-wait is not supported with NIO
    case SelectStrategy.SELECT:
        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
        nextWakeupNanos.set(curDeadlineNanos);
        if (!hasTasks()) {
            strategy = select(curDeadlineNanos);
        }
}
// 当前的任务队列是否有任务
protected boolean hasTasks() {
    return super.hasTasks() || !tailTasks.isEmpty();
}

从上述我们可以看到,是否走阻塞的逻辑取决于 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()) 的返回结果

我们来看看这个方法的作用是什么

// 有任务时,调用 selectSupplier.get
// 无任务时,调用 SelectStrategy.SELECT,也就是我们的阻塞的 select
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

selectSupplier.get 到底是什么呢?

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        // 直接调用 selector.selectNow
        return selector.selectNow();
    }
};

这里可能有的读者不太清楚,罗列一下:


// 不阻塞,立即返回


selector.selectNow()


// 一直阻塞等待


selector.select()


// 监控注册通道,参数用来设置超时时间,在时间内会阻塞


selector.select(int longTime)

如果当前任务队列存在任务,调用 selectNow 方法,拿到对应的 IO 事件,进行响应的执行。

如果当前任务队列没有任务,则设置超时时间,进行阻塞。等待任务的到来。

4.1 何时会 select 阻塞,阻塞多久

结论:大概 1s 左右

long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
// Timeout will only be 0 if deadline is within 5 microsecs
// 如果截止时间在5微秒之内,超时将仅为0
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);

5. 空轮询 bug

Linux 场景下会出现 NIOepoll 空轮询问题

我们观察下面的代码,可以发现,在我们的 NIOEventLoop 中,存在一个死循环

@Override
protected void run() {
    int selectCnt = 0;
    for(;;){
        // 可能发生空轮询,无法阻塞NIO线程
        strategy = select(curDeadlineNanos);    
      if(...) {
      ...
      } else if (unexpectedSelectorWakeup(selectCnt) ){
            // 通过unexpectedSelectorWakeup方法中的rebuildSelector重建selector
            // 并将selectCnt重置为0
            selectCnt = 0;
        }
  }
}

Netty中通过selectCnt变量来检测select方法是否发生空轮询BUG

若发生 空轮询BUG,那么 selectCnt 的值会增长是十分迅速。当selectCnt的值大于等于SELECTOR_AUTO_REBUILD_THRESHOLD

默认512)时,Netty 则判断其出现了 空轮询BUG,进行如下处理

if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    // 重建selector,将原selector的配置信息传给新selector
    // 再用新selector覆盖旧selector
    rebuildSelector();
    return true;
}

通过rebuildSelector方法重建selector,将原selector的配置信息传给新selector,再用新selector覆盖旧selector。同时将selectCnt的值设置为0

6. ioRatio 控制什么,设置成 100 有何作用?

结论:控制处理 io 事件所占用的时间比例

我们来看看源码:

if (ioRatio == 100) {
    try {
        if (strategy > 0) {
            processSelectedKeys();
        }
    } finally {
        // Ensure we always run tasks.
        ranTasks = runAllTasks();
    }
} else if (strategy > 0) {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
} else {
    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

如果我们将 ioRatio 设置成 100,那么它将会处理所有的 IO 事件,然后处理所有的任务队列的任务。

但这种不是我们想要的,我们想要 io任务 的消耗时间存在比例性

如果我们将 ioRatio 设置成 80

// 假设执行 IO 的时间为 8s,ioTime = 8s
// 8 * (100 - 80)/ 80 = 2s
// IO执行时间 :任务执行时间 = 8 :2 = 4 :1
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

8. 在哪里进行事件类型的处理

我们之前写 Java NIO 的时候,最终会获取客户端的 channel 对其进行各种事件的处理,同样,我们的 Netty 也实现了上述。

private void processSelectedKeys() {
    // 新的 selectedKeys 也就是使用数组改良的
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        // 原始的
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

我们去看看新增的如何进行事件类型的

private void processSelectedKeysOptimized() {
    // 遍历数组
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
      // 拿到我们前面所说的附件,也就是NioServerSocketChannel
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            // 执行对应的key
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

进入 processSelectedKey(k, (AbstractNioChannel) a) 执行对应的事件

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            eventLoop = ch.eventLoop();
            if (eventLoop == this) 
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }
        try {
            int readyOps = k.readyOps();
            // 连接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            // 写事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            // 读事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

三、Accept

我们需要先理解原来 NIO 的流程:

  • selector.select() 阻塞直到事件发生
  • 遍历处理 selectedKeys
  • 拿到一个 key,判断该事件是否为  accept
  • 创建一个 ScoketChannel,设置非阻塞
  • ScoketChannel 注册至 selector
  • 关注 selectedKeysread 事件
// 阻塞直到事件发生
selector.select();
Iterator<SelectionKey> iter = selector.selectionKeys().iterator();
while (iter.hasNext()) {    
    // 拿到一个事件
    SelectionKey key = iter.next();
    // 如果是 accept 事件
    if (key.isAcceptable()) {
        // 执行accept,获得SocketChannel
        SocketChannel channel = serverSocketChannel.accept();
        channel.configureBlocking(false);
        // 将SocketChannel注册到selector中,并关注read事件
        channel.register(selector, SelectionKey.OP_READ);
    }
    // ...
}

我们之前已经讲过了,怎么拿到的 key,怎么将其进行判断,我们这一节主要关心后面的 read

1. SocketChannel的创建与注册

当发生 OP_ACCEPT 事件时,会触发一下操作:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 
    unsafe.read();
}

当客户端连接的时候,会进入 unsafe.read() 该方法:

public void read() {
    // 创建一个 SocketChannel 并创建了一个 NIOSocketChannel,设置成非阻塞
    int localRead = doReadMessages(readBuf); 
     for (int i = 0; i < size; i ++) {
         readPending = false;
         // 将我们刚刚创建的 NIOSocketChannel 放到流水线上
         // 并由我们的 acceptorhandler 执行
         pipeline.fireChannelRead(readBuf.get(i));
     }
}

这里我们关注一下 acceptorhandler 里面执行了什么操作(类的名称:ServerBootstrapAcceptor):

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 我们上面创建的 NIOSocketChannel
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);
    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    }
}

所以,重要的是 childGroup.register(child) 的方法,我们追进去看看:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    AbstractChannel.this.eventLoop = eventLoop;
   // 这是执行的线程是我们的 NIOServerSocketChannel
    // 需要新开一个线程,让我们的 NIOSocketChannel 执行
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        }
    }
}

进入我们的 register0,看看做了什么

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        // selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        // 主要将该 NIOSocketChannel 注册到我们的 selector
        doRegister();
        neverRegistered = false;
        registered = true;
      // 触发新的 handler 初始化事件
        pipeline.invokeHandlerAddedIfNeeded();
        if (isActive()) {
            if (firstRegistration) {
                // 最终将 NIOSocketChannel 关注 read 事件
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    }
}

我们看一下 pipeline.fireChannelActive 做了什么

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
  // 这时候 interestOps是0
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 关注read事件
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}






相关文章
|
消息中间件 缓存 网络协议
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
|
消息中间件 缓存 安全
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
|
设计模式 安全 Java
【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?
【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?
【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?
|
XML 缓存 JSON
【Netty 从成神到升仙系列 四】让我们一起探索 Netty 中的零拷贝
【Netty 从成神到升仙系列 四】让我们一起探索 Netty 中的零拷贝
【Netty 从成神到升仙系列 四】让我们一起探索 Netty 中的零拷贝
|
缓存 JSON 网络协议
【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?
【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?
【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?
|
存储 前端开发 Java
【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)
【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)
【Netty 从成神到升仙系列 一】Netty 服务端的启动源码剖析(一)
|
8月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
43 0
|
8月前
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
48 0
|
8月前
|
Java
由浅入深Netty基础知识NIO三大组件原理实战 1
由浅入深Netty基础知识NIO三大组件原理实战
62 0