前面我们已经了解了官方的Netty的example,知道要编写一个一个聊天demo或者一个简单的rpc,或者应答模式的demo,在Netty中通常需要写服务端和客户端的引导,而引导是启动服务用的,而服务端和客户端的Handler则是用于处理具体的业务逻辑。这个通常在RPC框架中比如Dubbo,通常会在服务进行暴露或者进行引用的时候,需要调用Netty服务进行启动,然后进行暴露或者调用的,此时采用协议适配的时候,采用适配器模式,而我们知道生产者端最重要的方法就是doBind方法,而在消费者端最重要的方法是doConnect方法。而在RocketMQ中,我们也可以看到服务端和消费端中producer.start()和consumer.start()的时候都会调用Netty进行启动操作,也有类似的操作。
那么Netty的服务端又是怎样使用Nio、保证线程的充分使用的呢?
服务器端引导:
/*** Echoes back any received data from a client.* Netty的服务端引导*/publicfinalclassEchoServer { staticfinalbooleanSSL=System.getProperty("ssl") !=null; //端口staticfinalintPORT=Integer.parseInt(System.getProperty("port", "8007")); publicstaticvoidmain(String[] args) throwsException { // Configure SSL.finalSslContextsslCtx; if (SSL) { SelfSignedCertificatessc=newSelfSignedCertificate(); sslCtx=SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx=null; } // Configure the server.boss和worker线程组EventLoopGroupbossGroup=newNioEventLoopGroup(1); EventLoopGroupworkerGroup=newNioEventLoopGroup(); //创建服务端业务处理器对象,进行业务处理,添加线程组,同时开启通道添加so_backlog//同时添加childHandler,重写initChannel方法finalEchoServerHandlerserverHandler=newEchoServerHandler(); try { //服务端引导ServerBootstrapb=newServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(newLoggingHandler(LogLevel.INFO)) .childHandler(newChannelInitializer<SocketChannel>() { publicvoidinitChannel(SocketChannelch) throwsException { //流水线ChannelPipelinep=ch.pipeline(); if (sslCtx!=null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //添加日志信息//p.addLast(new LoggingHandler(LogLevel.INFO));//添加serverHandlerp.addLast(serverHandler); } }); // Start the server.//启动服务器ChannelFuturef=b.bind(PORT).sync(); // Wait until the server socket is closed.f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
我们来看到bind操作,也是启动服务器的操作。
AbstractBootstrap#bind(int inetPort)#bind(SocketAddress localAddress)
/*** Create a new {@link Channel} and bind it.*/publicChannelFuturebind(intinetPort) { returnbind(newInetSocketAddress(inetPort)); } /*** Create a new {@link Channel} and bind it.*/publicChannelFuturebind(SocketAddresslocalAddress) { //对参数信息进行校验:group和channelFactoryvalidate(); returndoBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); }
可以看到里面绑定的是websocket的端口信息。
privateChannelFuturedoBind(finalSocketAddresslocalAddress) { finalChannelFutureregFuture=initAndRegister(); finalChannelchannel=regFuture.channel(); if (regFuture.cause() !=null) { returnregFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful.ChannelPromisepromise=channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); returnpromise; } else { // Registration future is almost always fulfilled already, but just in case it's not.finalPendingRegistrationPromisepromise=newPendingRegistrationPromise(channel); regFuture.addListener(newChannelFutureListener() { publicvoidoperationComplete(ChannelFuturefuture) throwsException { Throwablecause=future.cause(); if (cause!=null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); returnpromise; } }
进而
privatestaticvoiddoBind0( finalChannelFutureregFuture, finalChannelchannel, finalSocketAddresslocalAddress, finalChannelPromisepromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(newRunnable() { publicvoidrun() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
查看bind方法
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
ChannelFuturebind(SocketAddresslocalAddress, ChannelPromisepromise); publicChannelFuturebind(SocketAddresslocalAddress, ChannelPromisepromise) { returnpipeline.bind(localAddress, promise); }
进行绑定:
publicfinalChannelFuturebind(SocketAddresslocalAddress, ChannelPromisepromise) { returntail.bind(localAddress, promise); }
AbstractChannelHandlerContext
publicChannelFuturebind(finalSocketAddresslocalAddress, finalChannelPromisepromise) { ObjectUtil.checkNotNull(localAddress, "localAddress"); if (isNotValidPromise(promise, false)) { // cancelledreturnpromise; } finalAbstractChannelHandlerContextnext=findContextOutbound(MASK_BIND); EventExecutorexecutor=next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, newRunnable() { publicvoidrun() { next.invokeBind(localAddress, promise); } }, promise, null, false); } returnpromise; } privatevoidinvokeBind(SocketAddresslocalAddress, ChannelPromisepromise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwablet) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } }
DefaultChannelPipeline
publicvoidbind( ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise) { unsafe.bind(localAddress, promise); } publicfinalvoidbind(finalSocketAddresslocalAddress, finalChannelPromisepromise) { assertEventLoop(); if (!promise.setUncancellable() ||!ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddressinstanceofInetSocketAddress&&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() &&!PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn( "A non-root user can't receive a broadcast packet if the socket "+"is not bound to a wildcard address; binding to a non-wildcard "+"address ("+localAddress+") anyway as requested."); } booleanwasActive=isActive(); try { doBind(localAddress); } catch (Throwablet) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive&&isActive()) { invokeLater(newRunnable() { publicvoidrun() { pipeline.fireChannelActive(); } }); } //关键地方,此时会一步一步的返回pipeline操作safeSetSuccess(promise); }
NioServerSocketChannel
reason="Usage guarded by java version check") (protectedvoiddoBind(SocketAddresslocalAddress) throwsException { if (PlatformDependent.javaVersion() >=7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
最终我们在NioServerSocketChannel中找到doBind方法,此时的说明其是Nio操作,同时可以看到分为两个分支,一个是jdk版本>=7的,另外一个是jdk版本<7的,开不开心,兴奋不兴奋,此时再debug一步就可以看到进入jdk的接口了,ServerSockerChannel接口。
jdk#ServerSocketChannel
publicServerSocketChannelbind(SocketAddressvar1, intvar2) throwsIOException { Objectvar3=this.lock; synchronized(this.lock) { if (!this.isOpen()) { thrownewClosedChannelException(); } elseif (this.isBound()) { thrownewAlreadyBoundException(); } else { InetSocketAddressvar4=var1==null?newInetSocketAddress(0) : Net.checkAddress(var1); SecurityManagervar5=System.getSecurityManager(); if (var5!=null) { var5.checkListen(var4.getPort()); } NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort()); Net.bind(this.fd, var4.getAddress(), var4.getPort()); Net.listen(this.fd, var2<1?50 : var2); Objectvar6=this.stateLock; synchronized(this.stateLock) { this.localAddress=Net.localAddress(this.fd); } returnthis; } } }
也就是说它本质上是基于Nio的Socket实现的,最终调用sun公司写的net接口。
接着就会返回 safeSetSuccess(promise),从而进一步回调SingleThreadEventExecutor extends#runAllTasks方法,从而走到我们想看到的EventLoop操作。
/*** Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.*/protectedbooleanrunAllTasks(longtimeoutNanos) { fetchFromScheduledTaskQueue(); Runnabletask=pollTask(); if (task==null) { afterRunningAllTasks(); returnfalse; } finallongdeadline=timeoutNanos>0?ScheduledFutureTask.nanoTime() +timeoutNanos : 0; longrunTasks=0; longlastExecutionTime; for (;;) { //执行execute操作safeExecute(task); //对所有的线程进行++操作runTasks++; // Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks&0x3F) ==0) { lastExecutionTime=ScheduledFutureTask.nanoTime(); if (lastExecutionTime>=deadline) { break; } } task=pollTask(); if (task==null) { lastExecutionTime=ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime=lastExecutionTime; returntrue; }
然后debug进行就是我们想要看到的run方法,也即最终的死循环操作,是不是很兴奋,看到这里:
protectedvoidrun() { intselectCnt=0; for (;;) { try { intstrategy; try { strategy=selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { caseSelectStrategy.CONTINUE: continue; caseSelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIOcaseSelectStrategy.SELECT: longcurDeadlineNanos=nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos==-1L) { curDeadlineNanos=NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy=select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE); } // fall throughdefault: } } catch (IOExceptione) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0(); selectCnt=0; handleLoopException(e); continue; } selectCnt++; cancelledKeys=0; needsToSelectAgain=false; finalintioRatio=this.ioRatio; booleanranTasks; if (ioRatio==100) { try { if (strategy>0) { processSelectedKeys(); } } finally { // Ensure we always run tasks.//运行task操作ranTasks=runAllTasks(); } } elseif (strategy>0) { finallongioStartTime=System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks.finallongioTime=System.nanoTime() -ioStartTime; ranTasks=runAllTasks(ioTime* (100-ioRatio) /ioRatio); } } else { ranTasks=runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks||strategy>0) { if (selectCnt>MIN_PREMATURE_SELECTOR_RETURNS&&logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt-1, selector); } selectCnt=0; } elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt=0; } } catch (CancelledKeyExceptione) { // Harmless exception - log anywayif (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() +" raised by a Selector {} - JDK bug?", selector, e); } } catch (Throwablet) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception.try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwablet) { handleLoopException(t); } } }
接着我们来看
EventLoopGroupbossGroup=newNioEventLoopGroup(1); EventLoopGroupworkerGroup=newNioEventLoopGroup();
这两句,其本质是一个死循环,可以从源码中可以看到
NioEventLoopGroup
publicNioEventLoopGroup() { this(0); } publicNioEventLoopGroup(intnThreads) { this(nThreads, (Executor) null); } publicNioEventLoopGroup(intnThreads, Executorexecutor) { this(nThreads, executor, SelectorProvider.provider()); } publicNioEventLoopGroup( intnThreads, Executorexecutor, finalSelectorProviderselectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } publicNioEventLoopGroup(intnThreads, Executorexecutor, finalSelectorProviderselectorProvider, finalSelectStrategyFactoryselectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
MultithreadEventLoopGroup
/*** @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)*/protectedMultithreadEventLoopGroup(intnThreads, Executorexecutor, Object... args) { super(nThreads==0?DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
MultithreadEventExecutorGroup
这个方法是我们需要关注的重点
/*** Create a new instance.** @param nThreads the number of threads that will be used by this instance.* @param executor the Executor to use, or {@code null} if the default should be used.* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protectedMultithreadEventExecutorGroup(intnThreads, Executorexecutor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } /*** Create a new instance.** @param nThreads the number of threads that will be used by this instance.* @param executor the Executor to use, or {@code null} if the default should be used.* @param chooserFactory the {@link EventExecutorChooserFactory} to use.* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protectedMultithreadEventExecutorGroup(intnThreads, Executorexecutor, EventExecutorChooserFactorychooserFactory, Object... args) { if (nThreads<=0) { thrownewIllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //创建新的taskif (executor==null) { executor=newThreadPerTaskExecutor(newDefaultThreadFactory()); } children=newEventExecutor[nThreads]; for (inti=0; i<nThreads; i++) { booleansuccess=false; try { //重点关注children[i] =newChild(executor, args); success=true; } catch (Exceptione) { // TODO: Think about if this is a good exception typethrownewIllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (intj=0; j<i; j++) { children[j].shutdownGracefully(); } for (intj=0; j<i; j++) { EventExecutore=children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedExceptioninterrupted) { // Let the caller handle the interruption.Thread.currentThread().interrupt(); break; } } } } } chooser=chooserFactory.newChooser(children); finalFutureListener<Object>terminationListener=newFutureListener<Object>() { publicvoidoperationComplete(Future<Object>future) throwsException { if (terminatedChildren.incrementAndGet() ==children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutore: children) { //添加监听e.terminationFuture().addListener(terminationListener); } //放入到LinkedHashSetSet<EventExecutor>childrenSet=newLinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren=Collections.unmodifiableSet(childrenSet); }
继续回到原来的NioEventLoopGroup,可以看到里面是一个NioEventLoop:
protectedEventLoopnewChild(Executorexecutor, Object... args) throwsException { EventLoopTaskQueueFactoryqueueFactory=args.length==4? (EventLoopTaskQueueFactory) args[3] : null; //重点关注returnnewNioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
从而我们就可以看到我们需要关注的NioEventLoop:
NioEventLoop(NioEventLoopGroupparent, Executorexecutor, SelectorProviderselectorProvider, SelectStrategystrategy, RejectedExecutionHandlerrejectedExecutionHandler, EventLoopTaskQueueFactoryqueueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider=ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy=ObjectUtil.checkNotNull(strategy, "selectStrategy"); finalSelectorTupleselectorTuple=openSelector(); this.selector=selectorTuple.selector; this.unwrappedSelector=selectorTuple.unwrappedSelector; }
除此之外,还可以关注一下initChannel操作。