1. NioEventLoopGroup
MultithreadEventExecutorGroup内部维护一个类型为EventExecutor的children数组, 其大小是 nThreads, 这样就构成了一个线程池:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
}
实例化NioEventLoopGroup时, 可以指定线程池的大小nThreads,否则nThreads默认为CPU * 2,这个nThreads就是MultithreadEventExecutorGroup内部children数据的大小:
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
上面NioEventLoopGroup的构造函数中的nThreads最终会作为MultithreadEventExecutorGroup的children数组的容量,如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
...
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {...}
}
...
}
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
MultithreadEventExecutorGroup中会调用newChild抽象方法来初始化children数组元素,抽象方法newChild是在NioEventLoopGroup中实现的,返回NioEventLoop实例:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop通过调用openSelector()中的selector = provider.openSelector()来获取一个selector对象:
public final class NioEventLoop extends SingleThreadEventLoop {
private Selector selector;
private Selector unwrappedSelector;
private final SelectorProvider provider;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
...
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {}
...
}
}
而这个provider正是在NioEventLoopGroup的newChild中被调用的:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
2. NioEventLoop
2.1 NioEventLoop体系
NioEventLoop间接继承于SingleThreadEventExecutor,SingleThreadEventExecutor是Netty中对本地线程的抽象,它内部的thread属性存储了一个本地Java线程。因此,一个NioEventLoop其实和一个特定的线程绑定,在其生命周期内该绑定关系都不会改变。
AbstractScheduledEventExecutor实现了NioEventLoop的schedule功能,即我们可以通过调用一
个NioEventLoop实例的schedule方法来运行一些定时任务:
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
validateScheduled(delay, unit);
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
}
配合任务队列的功能,可以调用一个NioEventLoop实例的execute方法来向任务队列中添加一个task,并由NioEventLoop进行调度执行。
通常来说,NioEventLoop肩负着两种任务,第一个是作为IO线程,执行与Channel相关的IO操作,包括调用select等待就绪的IO事件、读写数据与数据的处理等;而第二个任务是作为任务队列,执行taskQueue中的任务。
2.2 NioEventLoop绑定线程
SingleThreadEventExecutor的thread属性是与SingleThreadEventExecutor关联的本地线程:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
try {
SingleThreadEventExecutor.this.run();
} catch (Throwable t) {}
...
}
});
}
在这个线程中所做的就是调用SingleThreadEventExecutor.this.run()方法, 而因为NioEventLoop实现了这个方法,因此根据多态性,其实调用的是NioEventLoop.run()方法。这个run()中做的就是从选择器获取就绪的事件:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
...
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {}
}
}
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
while(true) {
...
int selectedKeys = selector.select(timeoutMillis);
++selectCnt;
...
} catch (CancelledKeyException var13) {}
}
从选择器选取就绪的事件后,会最终调用processSelectedKey具体处理每个事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {}
}
2.3 EventLoop与Channel的关联
channel会注册到EventLoop的selector上,在SingleThreadEventLoop的register中:
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
...
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
跟踪代码,最终会跟踪到AbstractNioChannel的doRegister():
public abstract class AbstractNioChannel extends AbstractChannel {
...
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
}
其中:
this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this)
将NioSocketChannel实例注册到了当前NioEventLoop的选择器中,而unwrappedSelector()调用的则是NioEventLoop的unwrappedSelector属性,即NioEventLoop所拥有的selector:
public final class NioEventLoop extends SingleThreadEventLoop {
...
Selector unwrappedSelector() {
return unwrappedSelector;
}
}
2.4 EventLoop运行
NioEventLoop是一个SingleThreadEventExecutor,SingleThreadEventExecutor是Netty中对本地线程的抽象,因此NioEventLoop的运行就是NioEventLoop所绑定的本地Java线程的运行,因此调用SingleThreadEventExecutor中thread属性的start()方法就是运行这个线程的入口。
该线程运行入口在SingleThreadEventExecutor的doStartThread()中:
@Override
public void execute(Runnable task) {
...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
...
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {...}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {}
...
}
});
}
state是SingleThreadEventExecutor内部标识当前thread状态的属性,初始化值为ST_NOT_STARTED
,因此第一次调用startThread()时会执行if语句,之后执行doStartThread,startThread()是在本类的execute()中被调用。
这个execute()会在AbstractChannel的AbstractUnsafe的register()中被调用,注册channel时被调用:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {...}
}
}
在EventLoop中注册channel的过程中,如果是第一次注册,即从Bootstrap的bind()或connect()开始执行到AbstractUnsafe的register()时,整个代码都是在主线程中运行的,因此上面的eventLoop.inEventLoop()就为false,于是进入到else分支,在这个分支中调用了eventLoop.execute,就会触发startThread() 的调用,进而导致了EventLoop所对应的Java线程的启动。
AbstractUnsafe的register()中的eventLoop是一个NioEventLoop的实例,而NioEventLoop没有实现execute方法, 因此调用的就是SingleThreadEventExecutor的execute。因此NioEventLoop启动的源头在这里,所以当EventLoop.execute第一次被调用时。
Netty 的 IO 处理循环
netty中的EventLoop负责如下功能:
1. 作为 IO 线程,负责IO操作,将TCP数据从java nio Socket传递到handler中;
2. 作为任务线程,执行taskQueue中的任务。
java nio中,Selector角色会不断的调用Java NIO的Selector.select(),用于查询当前已经就绪的IO事件。netty中的Selector角色就是EventLoop。
继续上面SingleThreadEventExecutor的doStartThread(),其中SingleThreadEventExecutor.this.run()
实际调用的是NioEventLoop的run(),因此NioEventLoop启动时实际开启了一条线程去运行NioEventLoop的run():
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {...}
...
}
}
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
...
}
} catch (CancelledKeyException e) {...}
}
run()中使用了for(;;)循环调用select,选取就绪的事件之后,最终调用processSelectedKey具体处理每个事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {}
}
在select(boolean oldWakenUp)中,第一步:
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
这里其实就是从一个定时任务队列中取出定时任务,根据任务的截止时间计算:
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
计算值为目前离当前定时任务下次执行时间之差,如果当前时间差不足0.5ms的话,即timeoutMillis<=0,那么认为时间太短,终止本次循环;并且如果当前selectCnt值为0,执行执行一次selectNow。
然后,select(boolean oldWakenUp)调用hasTasks()方法来判断当前任务队列中是否有任务:
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
这个方法检查了存放需要当前EventLoop执行的任务列表,即taskQueue是否为空,当taskQueue不为空时,就执行selectNow(),当taskQueue为空时,执行select(timeoutMillis)。
selectNow()和select(timeoutMillis)都用于检测当前是否有就绪的IO事件,区别是如果当前没有就绪的IO事件,selectNow()会立即返回的;而select(timeoutMillis)会阻塞等待timeoutMillis时间。
当hasTasks()为true时,表示当前有任务需要执行,此时应当尽快执行任务,所以此时需要调用selectNow(),不能阻塞当前线程;当hasTasks()为false时,表示没有需要执行的任务,那么这时候可以调用select(timeoutMillis)阻塞等待IO就绪事件。
继续看NioEventLoop的run():
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
processSelectedKeys()是处理分派就绪的IO事件,runAllTasks()是运行taskQueue中的任务。其中ioRatio表示配置给当前线程IO操作所占的时间比(即执行processSelectedKeys()在每次循环中所占用的时间),ioRatio默认是50,表示IO操作和执行task的耗时为1 : 1。
根据IO操作耗时和ioRatio,可以计算执行task所需要的大概时间:
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
ioRate = 50时,如果IO耗时100ms,那么runAllTasks()大概耗时为100ms,当runAllTasks(long timeoutNanos)执行时间大于100ms时则结束循环:
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
...
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
}
...
}
对于processSelectedKeys():
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
调用openSelector()时,根据JVM平台的不同selectedKeys会有不同的值,根据selectedKeys是否为空分别调用processSelectedKeysOptimized()或processSelectedKeysPlain()。这两个方法最终都会调用processSelectedKey()。