Netty学习三
前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllTasks操作。
首先,我们可以看Netty的架构图,图片来自即时通讯网:
reactor线程模型图,图片来自即时通讯网:
下面是跟踪源码的流程操作:
AbstractBootstrap#bind(intinetPort)->AbstractBootstrap#bind(SocketAddresslocalAddress)->AbstractBootstrap#doBind(finalSocketAddresslocalAddress)->AbstractBootstrap#doBind0( finalChannelFutureregFuture, finalChannelchannel, finalSocketAddresslocalAddress, finalChannelPromisepromise) 注意这个方法,里面先进行绑定,然后添加监听->ChannelOutboundInvoker#bind(SocketAddresslocalAddress, ChannelPromisepromise)->重要AbstractChannel#bind(SocketAddresslocalAddress, ChannelPromisepromise)->DefaultChannelPipeline#bind(SocketAddresslocalAddress, ChannelPromisepromise)->AbstractChannelHandlerContext#bind(finalSocketAddresslocalAddress, finalChannelPromisepromise)->AbstractChannelHandlerContext#invokeBind(SocketAddresslocalAddress, ChannelPromisepromise) ——>ChannelOutboundInvoker#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise) 进入LoggingHandler的操作->重要,调用LoggingHandler#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise)#ctx.bind(localAddress, promise) ->AbstractChannelHandlerContext#bind(finalSocketAddresslocalAddress, finalChannelPromisepromise)#next.invokeBind(localAddress, promise) ->AbstractChannelHandlerContext#invokeBind(SocketAddresslocalAddress, ChannelPromisepromise)#bind(this, localAddress, promise) ->ChannelOutboundInvoker#bind(SocketAddresslocalAddress, ChannelPromisepromise) ->重要DefaultChannelPipeline#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise)#unsafe.bind(localAddress, promise) ->重要AbstractChannel#bind(finalSocketAddresslocalAddress, finalChannelPromisepromise) ->javaChannel()->NioServerSocketChannel#doBind(SocketAddresslocalAddress) ->AbstractChannel#invokeLater(Runnabletask)#eventLoop().execute(task) ->SingleThreadEventExecutor#execute(Runnabletask) #execute(Runnabletask, booleanimmediate)#addTask(Runnabletask)#wakeup(inEventLoop)#safeSetSuccess(promise) ->AbstractChannel#safeSetSuccess(ChannelPromisepromise) ->此时会将结果往回抛AbstractChannelHandlerContext->LoggingHandler#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise) ->DefaultChannelPromise#addListener(GenericFutureListener<?extendsFuture<?superVoid>>listener) ->DefaultPromise#addListener(GenericFutureListener<?extendsFuture<?superV>>listener)#isDone()#notifyListeners() #executor() ->AbstractBootstrap#executor()#super.executor() ->DefaultChannelPromise#channel().eventLoop() ->AbstractNioChannel#eventLoop() ->AbstractChannel#eventLoop() ->notifyListenersNow() ->FastThreadLocalRunnable#run() ->ThreadExecutorMap#apply(finalRunnablecommand, finalEventExecutoreventExecutor)#command.run() ->SingleThreadEventExecutor#doStartThread()#SingleThreadEventExecutor.this.run() ->重要NioEventLoop#run()#runAllTasks(longtimeoutNanos)#runTasks++#task=pollTask()#pollTaskFrom(Queue<Runnable>taskQueue) ->NioEventLoop#safeExecute(Runnabletask)#fireChannelActive() 重要#invokeChannelActive()的((ChannelInboundHandler) handler()).channelActive(this);
跟踪完之后,回忆一下,Netty的操作,首先启动服务,此时已经启动了服务,还需要建立连接。
进行绑定操作的重要步骤:
publicfinalvoidbind(finalSocketAddresslocalAddress, finalChannelPromisepromise) { assertEventLoop(); if (!promise.setUncancellable() ||!ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddressinstanceofInetSocketAddress&&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() &&!PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn( "A non-root user can't receive a broadcast packet if the socket "+"is not bound to a wildcard address; binding to a non-wildcard "+"address ("+localAddress+") anyway as requested."); } booleanwasActive=isActive(); try { doBind(localAddress); } catch (Throwablet) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive&&isActive()) { invokeLater(newRunnable() { publicvoidrun() { //重要,进行active操作pipeline.fireChannelActive(); } }); } //关键地方,此时会一步一步的返回pipeline操作safeSetSuccess(promise); }
此时我们可以看到EventLoop是一个重要的类,我们的大部分操作都是在NioEventLoop中完成的操作。EventLoop的作用是一个死循环,而这个循环做了下面几件事:
有条件的等待Nio事件
处理Nio事件
处理消息队列中的任务
protectedvoidrun() { intselectCnt=0; //自旋操作for (;;) { try { intstrategy; try { //计算策略strategy=selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); //匹配策略switch (strategy) { caseSelectStrategy.CONTINUE: continue; caseSelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIOcaseSelectStrategy.SELECT: longcurDeadlineNanos=nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos==-1L) { curDeadlineNanos=NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy=select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE); } // fall throughdefault: } } catch (IOExceptione) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0(); selectCnt=0; handleLoopException(e); continue; } selectCnt++; cancelledKeys=0; needsToSelectAgain=false; finalintioRatio=this.ioRatio; booleanranTasks; if (ioRatio==100) { try { if (strategy>0) { processSelectedKeys(); } } finally { // Ensure we always run tasks.ranTasks=runAllTasks(); } } elseif (strategy>0) { finallongioStartTime=System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks.finallongioTime=System.nanoTime() -ioStartTime; //运行task 重要ranTasks=runAllTasks(ioTime* (100-ioRatio) /ioRatio); } } else { ranTasks=runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks||strategy>0) { if (selectCnt>MIN_PREMATURE_SELECTOR_RETURNS&&logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt-1, selector); } selectCnt=0; } elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt=0; } } catch (CancelledKeyExceptione) { // Harmless exception - log anywayif (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() +" raised by a Selector {} - JDK bug?", selector, e); } } catch (Throwablet) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception.try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwablet) { handleLoopException(t); } } } //运行AllTasks操作protectedbooleanrunAllTasks(longtimeoutNanos) { fetchFromScheduledTaskQueue(); Runnabletask=pollTask(); if (task==null) { afterRunningAllTasks(); returnfalse; } finallongdeadline=timeoutNanos>0?ScheduledFutureTask.nanoTime() +timeoutNanos : 0; longrunTasks=0; longlastExecutionTime; for (;;) { safeExecute(task); runTasks++; // Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks&0x3F) ==0) { lastExecutionTime=ScheduledFutureTask.nanoTime(); if (lastExecutionTime>=deadline) { break; } } task=pollTask(); if (task==null) { lastExecutionTime=ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime=lastExecutionTime; returntrue; }
接收请求,进行多路复用的key
NioEventLoop#processSelectedKey(SelectionKeyk, AbstractNioChannelch)#unsafe.read()->AbstractNioMessageChannel#read() ->NioServerSocketChannel#doReadMessages(List<Object>buf)#SocketChannelch=SocketUtils.accept(javaChannel())#SocketUtils#accept(finalServerSocketChannelserverSocketChannel)#ServerSocketChannel#accept()->sun公司的ServerSocketChannelImpl#accept(),此方法完成连接操作->关注doReadMessages(List<Object>buf)中的pipeline.fireChannelRead(readBuf.get(i))方法,此方法完成了read操作
最终,会调用到AbstractNioChannel的doBeginRead方法。
下面我们来看一下,首先我们在网页中输入http://localhost:8007/,会看到客户端发出请求,断掉会进入到unsafe.read()操作:
privatevoidprocessSelectedKey(SelectionKeyk, AbstractNioChannelch) { finalAbstractNioChannel.NioUnsafeunsafe=ch.unsafe(); if (!k.isValid()) { finalEventLoopeventLoop; try { eventLoop=ch.eventLoop(); } catch (Throwableignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop==this) { // close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise()); } return; } try { intreadyOps=k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps&SelectionKey.OP_CONNECT) !=0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924intops=k.interestOps(); ops&=~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps&SelectionKey.OP_WRITE) !=0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps& (SelectionKey.OP_READ|SelectionKey.OP_ACCEPT)) !=0||readyOps==0) { //重要unsafe.read(); } } catch (CancelledKeyExceptionignored) { unsafe.close(unsafe.voidPromise()); } }
进入到,这个方法是重要的:
privatefinalclassNioMessageUnsafeextendsAbstractNioUnsafe { privatefinalList<Object>readBuf=newArrayList<Object>(); publicvoidread() { asserteventLoop().inEventLoop(); finalChannelConfigconfig=config(); finalChannelPipelinepipeline=pipeline(); finalRecvByteBufAllocator.HandleallocHandle=unsafe().recvBufAllocHandle(); allocHandle.reset(config); booleanclosed=false; Throwableexception=null; try { try { do { intlocalRead=doReadMessages(readBuf); if (localRead==0) { break; } if (localRead<0) { closed=true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwablet) { exception=t; } intsize=readBuf.size(); for (inti=0; i<size; i++) { readPending=false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception!=null) { closed=closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown=true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending&&!config.isAutoRead()) { removeReadOp(); } } } }
进而进入:
protectedintdoReadMessages(List<Object>buf) throwsException { SocketChannelch=SocketUtils.accept(javaChannel()); try { if (ch!=null) { buf.add(newNioSocketChannel(this, ch)); return1; } } catch (Throwablet) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwablet2) { logger.warn("Failed to close a socket.", t2); } } return0; } publicstaticSocketChannelaccept(finalServerSocketChannelserverSocketChannel) throwsIOException { try { returnAccessController.doPrivileged(newPrivilegedExceptionAction<SocketChannel>() { publicSocketChannelrun() throwsIOException { returnserverSocketChannel.accept(); } }); } catch (PrivilegedActionExceptione) { throw (IOException) e.getCause(); } } publicSocketChannelaccept() throwsIOException { Objectvar1=this.lock; synchronized(this.lock) { if (!this.isOpen()) { thrownewClosedChannelException(); } elseif (!this.isBound()) { thrownewNotYetBoundException(); } else { SocketChannelImplvar2=null; intvar3=0; FileDescriptorvar4=newFileDescriptor(); InetSocketAddress[] var5=newInetSocketAddress[1]; InetSocketAddressvar6; try { this.begin(); if (!this.isOpen()) { var6=null; returnvar6; } this.thread=NativeThread.current(); do { var3=this.accept(this.fd, var4, var5); } while(var3==-3&&this.isOpen()); } finally { this.thread=0L; this.end(var3>0); assertIOStatus.check(var3); } if (var3<1) { returnnull; } else { IOUtil.configureBlocking(var4, true); var6=var5[0]; var2=newSocketChannelImpl(this.provider(), var4, var6); SecurityManagervar7=System.getSecurityManager(); if (var7!=null) { try { var7.checkAccept(var6.getAddress().getHostAddress(), var6.getPort()); } catch (SecurityExceptionvar13) { var2.close(); throwvar13; } } returnvar2; } } } }
此时会调用jdk的SocketChannel,最终调用sun公司的ServerSocketChannel的accept操作。
接着来分析
for (inti=0; i<size; i++) { readPending=false; pipeline.fireChannelRead(readBuf.get(i)); }
会调用:
staticvoidinvokeChannelRead(finalAbstractChannelHandlerContextnext, Objectmsg) { finalObjectm=next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutorexecutor=next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(newRunnable() { publicvoidrun() { next.invokeChannelRead(m); } }); } } privatevoidinvokeChannelRead(Objectmsg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwablet) { invokeExceptionCaught(t); } } else { fireChannelRead(msg); } }
调用ChannelRead:
"unchecked") (publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) { finalChannelchild= (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { //执行register操作childGroup.register(child).addListener(newChannelFutureListener() { publicvoidoperationComplete(ChannelFuturefuture) throwsException { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwablet) { forceClose(child, t); } }
进入AbstactChannel的register:
publicfinalvoidregister(EventLoopeventLoop, finalChannelPromisepromise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) { promise.setFailure(newIllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( newIllegalStateException("incompatible event loop type: "+eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop=eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(newRunnable() { publicvoidrun() { register0(promise); } }); } catch (Throwablet) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
进而进入我们需要看的doBeginRead:
privatevoidregister0(ChannelPromisepromise) { try { // check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() ||!ensureOpen(promise)) { return; } booleanfirstRegistration=neverRegistered; //进行注册doRegister(); neverRegistered=false; registered=true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } elseif (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805//重要beginRead(); } } } catch (Throwablet) { // Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
进入doBeginRead:
publicfinalvoidbeginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead(); } catch (finalExceptione) { invokeLater(newRunnable() { publicvoidrun() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
进入doBeginRead方法:
protectedvoiddoBeginRead() throwsException { // Channel.read() or ChannelHandlerContext.read() was calledfinalSelectionKeyselectionKey=this.selectionKey; if (!selectionKey.isValid()) { return; } readPending=true; finalintinterestOps=selectionKey.interestOps(); if ((interestOps&readInterestOp) ==0) { //完成感兴趣操作selectionKey.interestOps(interestOps|readInterestOp); } }
可以在jdk的SelectionKeyImpl看到:
publicSelectionKeyinterestOps(intvar1) { this.ensureValid(); returnthis.nioInterestOps(var1); }
此时完成了doBeginRead操作。
也即首先启动关注bind操作,完成启动之后,进行accept操作,然后进行read操作。