Netty 源码
二、NIOEventLoop
从全局来看,我们的 NioEventLoop
主要包括一下几方面:
- Selector
public final class NioEventLoop extends SingleThreadEventLoop { private Selector selector; private Selector unwrappedSelector; }
线程
/** * NioEventLoop - SingleThreadEventLoop - SingleThreadEventExecutor */ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { // 一个 NioEventLoop 只包括一个线程 private volatile Thread thread; // 使用的是一个线程,不过这里可以提交一些任务 private final Executor executor; }
任务队列
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { // 缓存的任务队列 // 当任务执行过多时,会将多余的任务缓存至该队列中 private final Queue<Runnable> taskQueue; // 在其父类中,还存在着定时队列,执行定时任务 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue; }
1. Selector 何时创建?
先给结论:当我们调用 NIOEventLoop 的构造方法时,进行创建
源码:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { // 执行 openSelector 方法 final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; } private SelectorTuple openSelector() { final Selector unwrappedSelector; // 通过 provider.openSelector 进行创建 unwrappedSelector = provider.openSelector(); }
从上述我们可以得知,NioEventLoop
中的 Selector
是通过 provider.openSelector()
进行的创建
所以,我们只要验证 provider.openSelector()
== Selector.open()
从上述代码逻辑可看出,我们的 provider.openSelector()
== Selector.open()
结论成立,故 Selector
的创建时机成立。
1.1 EventLoop 里面为什么有两个 selector 成员变量?
我们上面讲到了,真正调用 provider.openSelector()
的是 unwrappedSelector
该成员变量
正常来说,我们只有一个 Selector
就足够了,为什么还需要另外的呢?
这里主要和 SelectionKey
优化有关,我们 Java
自己封装的 SelectionKey
是一个 Set
的类型,如下:public abstract Set<SelectionKey> selectedKeys();
这样当我们去遍历的时候,其实效率是比较低的,而 Netty
在此基础进行改良,将 Set
类型修改为 数组
类型,查询效率大大提升
整体源码如下所示:
private SelectorTuple openSelector() { // unwrappedSelector:原始的 selector 创建 final Selector unwrappedSelector; unwrappedSelector = provider.openSelector(); // Netty 利用数组魔改的 selectedKey 集合 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { // 填充数据 selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } } }; selectedKeys = selectedKeySet; // 1.创建 SelectorTuple 实例 // 1.1 unwrappedSelector 正常的 selector 实例 // 1.2 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet) 魔改的 selector 实例 return new SelectorTuple(unwrappedSelector,new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); } SelectorTuple(Selector unwrappedSelector, Selector selector) { this.unwrappedSelector = unwrappedSelector; this.selector = selector; } NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { final SelectorTuple selectorTuple = openSelector(); // 魔改的 this.selector = selectorTuple.selector; // 正常的 this.unwrappedSelector = selectorTuple.unwrappedSelector; }
这样,我们基本就了解了两个 Seletor
是为了让我们系统的性能更加的迅速,另外我们看看 SelectedSelectionKeySet
的一些信息
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } }
2. EventLoop 何时启动?
结论:当我们第一次调用 execute 的方法时,就会启动该线程。通过状态位(state)控制线程只会启动一次。
我们先写个测试用例,一步一步 Debug
来看一下:
public class TestEventLoop { public static void main(String[] args) { EventLoop eventLoop = new NioEventLoopGroup().next(); eventLoop.execute(new Runnable() { @Override public void run() { System.out.println("hello"); } }); } }
首先,是我们的 execute
实现:
@Override public void execute(Runnable task) { // 判断当前任务是否为空 ObjectUtil.checkNotNull(task, "task"); // 执行任务 // 此处判断了任务是否为懒加载任务(详细可见懒加载的描述),wakesUpForTask的返回值只会为true execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); }
进入 execute
方法:
private void execute(Runnable task, boolean immediate) { // 判断当前线程是否为 NIO 线程 // 判断方法为 return thread == this.thread; // this.thread 即为NIO线程,首次执行任务时,其为null boolean inEventLoop = inEventLoop(); // 向我们上面讲的往任务队列(taskQueue)中添加任务 addTask(task); if (!inEventLoop) { // 启动线程 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }
进入 startThread
方法:
private void startThread() { // 判断当前的状态:线程是否被创建 // private static final int ST_NOT_STARTED = 1(默认是1) if (state == ST_NOT_STARTED) { // 利用 CAS 来完成数值的变更,用来表示当前线程已被创建 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
进入 doStartThread
方法:
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { // 当前线程赋值为NIO线程 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } }
进入 ingleThreadEventExecutor.this.run()
@Override protected void run() { int selectCnt = 0; // 死循环,不断地从任务队列中获取各种任务来执行 for (;;) { // 执行各种任务 } }
3.提交普通任务会不会结束 select 阻塞?
结论:提交任务可以结束 select 阻塞,我们会使用 wakeup 进行唤醒操作。
通过上面我们继续往下追,可以看到这么一行代码:
if (!hasTasks()) { strategy = select(curDeadlineNanos); } private int select(long deadlineNanos) throws IOException { // 如果当前没有指定阻塞事件,则调用 select() 方法即可 if (deadlineNanos == NONE) { return selector.select(); } // 否则调用一个带有时间参数的 select() 方法,代表经过多长时间会停止阻塞 long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
但这里 selector.select()
为什么要阻塞?
- 如果让他不断的空轮询会大量的浪费CPU的资源,我们将其阻塞住。
既然阻塞,那必然有唤醒,当我们 有任务需要被执行时,会执行唤醒的操作
// 有任务需要被执行时,唤醒阻塞的NIO线程 if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } protected void wakeup(boolean inEventLoop) { // 只有当其他线程给当前的 NIO 线程提交任务时(比如执行execute),才会被唤醒 // 通过 AtomicLong 进行更新,保证只有一个线程,底层使用的是 unsafe 的 CAS 操作 if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) { // 唤醒被selector.select方法阻塞的NIO线程 selector.wakeup(); } }
当然,唤醒也需要做一些判断:
- 判断当前提交任务的是不是
NIO
线程
- 若是其他线程,则唤醒
NIO
线程 - 若是
NIO
线程自己,则不能唤醒
- 通过AtomicLong保证有多个线程同时提交任务时,只有一个线程能够唤醒
NIO
线程(防止多个线程同时唤醒 select,提升性能)
4. 什么时候会进入 SelectStrategy.SELECT 分支?
结论:当我们的任务队列中没有任务时,就会进入该分支,执行上述的阻塞逻辑。
// 主要根据当前有无任务来处理 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); nextWakeupNanos.set(curDeadlineNanos); if (!hasTasks()) { strategy = select(curDeadlineNanos); } } // 当前的任务队列是否有任务 protected boolean hasTasks() { return super.hasTasks() || !tailTasks.isEmpty(); }
从上述我们可以看到,是否走阻塞的逻辑取决于 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
的返回结果
我们来看看这个方法的作用是什么
// 有任务时,调用 selectSupplier.get // 无任务时,调用 SelectStrategy.SELECT,也就是我们的阻塞的 select public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }
那 selectSupplier.get
到底是什么呢?
private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { // 直接调用 selector.selectNow return selector.selectNow(); } };
这里可能有的读者不太清楚,罗列一下:
// 不阻塞,立即返回
selector.selectNow()
// 一直阻塞等待
selector.select()
// 监控注册通道,参数用来设置超时时间,在时间内会阻塞
selector.select(int longTime)
如果当前任务队列存在任务,调用 selectNow
方法,拿到对应的 IO
事件,进行响应的执行。
如果当前任务队列没有任务,则设置超时时间,进行阻塞。等待任务的到来。
4.1 何时会 select 阻塞,阻塞多久
结论:大概 1s 左右
long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); // Timeout will only be 0 if deadline is within 5 microsecs // 如果截止时间在5微秒之内,超时将仅为0 long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
5. 空轮询 bug
在 Linux
场景下会出现 NIO
的 epoll
空轮询问题
我们观察下面的代码,可以发现,在我们的 NIOEventLoop
中,存在一个死循环
@Override protected void run() { int selectCnt = 0; for(;;){ // 可能发生空轮询,无法阻塞NIO线程 strategy = select(curDeadlineNanos); if(...) { ... } else if (unexpectedSelectorWakeup(selectCnt) ){ // 通过unexpectedSelectorWakeup方法中的rebuildSelector重建selector // 并将selectCnt重置为0 selectCnt = 0; } } }
Netty中通过selectCnt
变量来检测select
方法是否发生空轮询BUG
若发生 空轮询BUG,那么 selectCnt
的值会增长是十分迅速。当selectCnt
的值大于等于SELECTOR_AUTO_REBUILD_THRESHOLD
默认512)时,Netty
则判断其出现了 空轮询BUG,进行如下处理
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 重建selector,将原selector的配置信息传给新selector // 再用新selector覆盖旧selector rebuildSelector(); return true; }
通过rebuildSelector
方法重建selector,将原selector的配置信息传给新selector,再用新selector覆盖旧selector。同时将selectCnt的值设置为0
6. ioRatio 控制什么,设置成 100 有何作用?
结论:控制处理 io 事件所占用的时间比例
我们来看看源码:
if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
如果我们将 ioRatio
设置成 100
,那么它将会处理所有的 IO
事件,然后处理所有的任务队列的任务。
但这种不是我们想要的,我们想要 io
和 任务
的消耗时间存在比例性
如果我们将 ioRatio
设置成 80
// 假设执行 IO 的时间为 8s,ioTime = 8s // 8 * (100 - 80)/ 80 = 2s // IO执行时间 :任务执行时间 = 8 :2 = 4 :1 final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
8. 在哪里进行事件类型的处理
我们之前写 Java NIO
的时候,最终会获取客户端的 channel
对其进行各种事件的处理,同样,我们的 Netty
也实现了上述。
private void processSelectedKeys() { // 新的 selectedKeys 也就是使用数组改良的 if (selectedKeys != null) { processSelectedKeysOptimized(); } else { // 原始的 processSelectedKeysPlain(selector.selectedKeys()); } }
我们去看看新增的如何进行事件类型的
private void processSelectedKeysOptimized() { // 遍历数组 for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; // 拿到我们前面所说的附件,也就是NioServerSocketChannel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // 执行对应的key processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
进入 processSelectedKey(k, (AbstractNioChannel) a)
执行对应的事件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; eventLoop = ch.eventLoop(); if (eventLoop == this) unsafe.close(unsafe.voidPromise()); } return; } try { int readyOps = k.readyOps(); // 连接事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // 写事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } // 读事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
三、Accept
我们需要先理解原来 NIO
的流程:
selector.select()
阻塞直到事件发生- 遍历处理
selectedKeys
- 拿到一个 key,判断该事件是否为
accept
- 创建一个
ScoketChannel
,设置非阻塞 - 将
ScoketChannel
注册至selector
- 关注
selectedKeys
的read
事件
// 阻塞直到事件发生 selector.select(); Iterator<SelectionKey> iter = selector.selectionKeys().iterator(); while (iter.hasNext()) { // 拿到一个事件 SelectionKey key = iter.next(); // 如果是 accept 事件 if (key.isAcceptable()) { // 执行accept,获得SocketChannel SocketChannel channel = serverSocketChannel.accept(); channel.configureBlocking(false); // 将SocketChannel注册到selector中,并关注read事件 channel.register(selector, SelectionKey.OP_READ); } // ... }
我们之前已经讲过了,怎么拿到的 key,怎么将其进行判断,我们这一节主要关心后面的 read
1. SocketChannel的创建与注册
当发生 OP_ACCEPT
事件时,会触发一下操作:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) unsafe.read(); }
当客户端连接的时候,会进入 unsafe.read()
该方法:
public void read() { // 创建一个 SocketChannel 并创建了一个 NIOSocketChannel,设置成非阻塞 int localRead = doReadMessages(readBuf); for (int i = 0; i < size; i ++) { readPending = false; // 将我们刚刚创建的 NIOSocketChannel 放到流水线上 // 并由我们的 acceptorhandler 执行 pipeline.fireChannelRead(readBuf.get(i)); } }
这里我们关注一下 acceptorhandler
里面执行了什么操作(类的名称:ServerBootstrapAcceptor
):
public void channelRead(ChannelHandlerContext ctx, Object msg) { // 我们上面创建的 NIOSocketChannel final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } }
所以,重要的是 childGroup.register(child)
的方法,我们追进去看看:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; // 这是执行的线程是我们的 NIOServerSocketChannel // 需要新开一个线程,让我们的 NIOSocketChannel 执行 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } } }
进入我们的 register0
,看看做了什么
private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; // selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 主要将该 NIOSocketChannel 注册到我们的 selector doRegister(); neverRegistered = false; registered = true; // 触发新的 handler 初始化事件 pipeline.invokeHandlerAddedIfNeeded(); if (isActive()) { if (firstRegistration) { // 最终将 NIOSocketChannel 关注 read 事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } }
我们看一下 pipeline.fireChannelActive
做了什么
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // 这时候 interestOps是0 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { // 关注read事件 selectionKey.interestOps(interestOps | readInterestOp); } }