Netty学习三

简介: 前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllTasks操作。首先,我们可以看Netty的架构图,图片来自即时通讯网:

Netty学习三

前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllTasks操作。

首先,我们可以看Netty的架构图,图片来自即时通讯网:

微信图片_20221214023656.jpg

reactor线程模型图,图片来自即时通讯网:

微信图片_20221214023700.jpg

下面是跟踪源码的流程操作:

AbstractBootstrap#bind(intinetPort)->AbstractBootstrap#bind(SocketAddresslocalAddress)->AbstractBootstrap#doBind(finalSocketAddresslocalAddress)->AbstractBootstrap#doBind0(
finalChannelFutureregFuture, finalChannelchannel,
finalSocketAddresslocalAddress, finalChannelPromisepromise) 注意这个方法,里面先进行绑定,然后添加监听->ChannelOutboundInvoker#bind(SocketAddresslocalAddress, ChannelPromisepromise)->重要AbstractChannel#bind(SocketAddresslocalAddress, ChannelPromisepromise)->DefaultChannelPipeline#bind(SocketAddresslocalAddress, ChannelPromisepromise)->AbstractChannelHandlerContext#bind(finalSocketAddresslocalAddress, finalChannelPromisepromise)->AbstractChannelHandlerContext#invokeBind(SocketAddresslocalAddress, ChannelPromisepromise)
——>ChannelOutboundInvoker#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise)      
进入LoggingHandler的操作->重要,调用LoggingHandler#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise)#ctx.bind(localAddress, promise)
->AbstractChannelHandlerContext#bind(finalSocketAddresslocalAddress, finalChannelPromisepromise)#next.invokeBind(localAddress, promise)
->AbstractChannelHandlerContext#invokeBind(SocketAddresslocalAddress, ChannelPromisepromise)#bind(this, localAddress, promise)
->ChannelOutboundInvoker#bind(SocketAddresslocalAddress, ChannelPromisepromise)
->重要DefaultChannelPipeline#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise)#unsafe.bind(localAddress, promise)
->重要AbstractChannel#bind(finalSocketAddresslocalAddress, finalChannelPromisepromise)
->javaChannel()->NioServerSocketChannel#doBind(SocketAddresslocalAddress)
->AbstractChannel#invokeLater(Runnabletask)#eventLoop().execute(task)
->SingleThreadEventExecutor#execute(Runnabletask)
#execute(Runnabletask, booleanimmediate)#addTask(Runnabletask)#wakeup(inEventLoop)#safeSetSuccess(promise)
->AbstractChannel#safeSetSuccess(ChannelPromisepromise)
->此时会将结果往回抛AbstractChannelHandlerContext->LoggingHandler#bind(ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise) 
->DefaultChannelPromise#addListener(GenericFutureListener<?extendsFuture<?superVoid>>listener)
->DefaultPromise#addListener(GenericFutureListener<?extendsFuture<?superV>>listener)#isDone()#notifyListeners()
#executor()
->AbstractBootstrap#executor()#super.executor()
->DefaultChannelPromise#channel().eventLoop()
->AbstractNioChannel#eventLoop()
->AbstractChannel#eventLoop()
->notifyListenersNow()
->FastThreadLocalRunnable#run()
->ThreadExecutorMap#apply(finalRunnablecommand, finalEventExecutoreventExecutor)#command.run()
->SingleThreadEventExecutor#doStartThread()#SingleThreadEventExecutor.this.run()
->重要NioEventLoop#run()#runAllTasks(longtimeoutNanos)#runTasks++#task=pollTask()#pollTaskFrom(Queue<Runnable>taskQueue)
->NioEventLoop#safeExecute(Runnabletask)#fireChannelActive() 重要#invokeChannelActive()((ChannelInboundHandler) handler()).channelActive(this);

跟踪完之后,回忆一下,Netty的操作,首先启动服务,此时已经启动了服务,还需要建立连接。

进行绑定操作的重要步骤:

@Overridepublicfinalvoidbind(finalSocketAddresslocalAddress, finalChannelPromisepromise) {
assertEventLoop();  
if (!promise.setUncancellable() ||!ensureOpen(promise)) {
return;
    }
// See: https://github.com/netty/netty/issues/576if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddressinstanceofInetSocketAddress&&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() &&!PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn(
"A non-root user can't receive a broadcast packet if the socket "+"is not bound to a wildcard address; binding to a non-wildcard "+"address ("+localAddress+") anyway as requested.");
    }
booleanwasActive=isActive();
try {
doBind(localAddress);
    } catch (Throwablet) {
safeSetFailure(promise, t);
closeIfClosed();
return;
    }
if (!wasActive&&isActive()) {
invokeLater(newRunnable() {
@Overridepublicvoidrun() {
//重要,进行active操作pipeline.fireChannelActive();
            }
        });
    }
//关键地方,此时会一步一步的返回pipeline操作safeSetSuccess(promise);
}

此时我们可以看到EventLoop是一个重要的类,我们的大部分操作都是在NioEventLoop中完成的操作。EventLoop的作用是一个死循环,而这个循环做了下面几件事:

有条件的等待Nio事件

处理Nio事件

处理消息队列中的任务

@Overrideprotectedvoidrun() {
intselectCnt=0;
//自旋操作for (;;) {
try {
intstrategy;
try {
//计算策略strategy=selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
//匹配策略switch (strategy) {
caseSelectStrategy.CONTINUE:
continue;
caseSelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIOcaseSelectStrategy.SELECT:
longcurDeadlineNanos=nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos==-1L) {
curDeadlineNanos=NONE; // nothing on the calendar                        }
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy=select(curDeadlineNanos);
                            }
                        } finally {
// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);
                        }
// fall throughdefault:
                    }
                } catch (IOExceptione) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();
selectCnt=0;
handleLoopException(e);
continue;
                }
selectCnt++;
cancelledKeys=0;
needsToSelectAgain=false;
finalintioRatio=this.ioRatio;
booleanranTasks;
if (ioRatio==100) {
try {
if (strategy>0) {
processSelectedKeys();
                        }
                    } finally {
// Ensure we always run tasks.ranTasks=runAllTasks();
                    }
                } elseif (strategy>0) {
finallongioStartTime=System.nanoTime();
try {
processSelectedKeys();
                    } finally {
// Ensure we always run tasks.finallongioTime=System.nanoTime() -ioStartTime;
//运行task 重要ranTasks=runAllTasks(ioTime* (100-ioRatio) /ioRatio);
                    }
                } else {
ranTasks=runAllTasks(0); // This will run the minimum number of tasks                }
if (ranTasks||strategy>0) {
if (selectCnt>MIN_PREMATURE_SELECTOR_RETURNS&&logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt-1, selector);
                    }
selectCnt=0;
                } elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt=0;
                }
            } catch (CancelledKeyExceptione) {
// Harmless exception - log anywayif (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() +" raised by a Selector {} - JDK bug?",
selector, e);
                }
            } catch (Throwablet) {
handleLoopException(t);
            }
// Always handle shutdown even if the loop processing threw an exception.try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
                    }
                }
            } catch (Throwablet) {
handleLoopException(t);
            }
        }
    }
//运行AllTasks操作protectedbooleanrunAllTasks(longtimeoutNanos) {
fetchFromScheduledTaskQueue();
Runnabletask=pollTask();
if (task==null) {
afterRunningAllTasks();
returnfalse;
        }
finallongdeadline=timeoutNanos>0?ScheduledFutureTask.nanoTime() +timeoutNanos : 0;
longrunTasks=0;
longlastExecutionTime;
for (;;) {
safeExecute(task);
runTasks++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks&0x3F) ==0) {
lastExecutionTime=ScheduledFutureTask.nanoTime();
if (lastExecutionTime>=deadline) {
break;
                }
            }
task=pollTask();
if (task==null) {
lastExecutionTime=ScheduledFutureTask.nanoTime();
break;
            }
        }
afterRunningAllTasks();
this.lastExecutionTime=lastExecutionTime;
returntrue;
    }

接收请求,进行多路复用的key

NioEventLoop#processSelectedKey(SelectionKeyk, AbstractNioChannelch)#unsafe.read()->AbstractNioMessageChannel#read()
->NioServerSocketChannel#doReadMessages(List<Object>buf)#SocketChannelch=SocketUtils.accept(javaChannel())#SocketUtils#accept(finalServerSocketChannelserverSocketChannel)#ServerSocketChannel#accept()->sun公司的ServerSocketChannelImpl#accept(),此方法完成连接操作->关注doReadMessages(List<Object>buf)中的pipeline.fireChannelRead(readBuf.get(i))方法,此方法完成了read操作

最终,会调用到AbstractNioChannel的doBeginRead方法。

下面我们来看一下,首先我们在网页中输入http://localhost:8007/,会看到客户端发出请求,断掉会进入到unsafe.read()操作:

privatevoidprocessSelectedKey(SelectionKeyk, AbstractNioChannelch) {
finalAbstractNioChannel.NioUnsafeunsafe=ch.unsafe();
if (!k.isValid()) {
finalEventLoopeventLoop;
try {
eventLoop=ch.eventLoop();
        } catch (Throwableignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;
        }
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop==this) {
// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());
        }
return;
    }
try {
intreadyOps=k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps&SelectionKey.OP_CONNECT) !=0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924intops=k.interestOps();
ops&=~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
        }
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps&SelectionKey.OP_WRITE) !=0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();
        }
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps& (SelectionKey.OP_READ|SelectionKey.OP_ACCEPT)) !=0||readyOps==0) {
//重要unsafe.read();
        }
    } catch (CancelledKeyExceptionignored) {
unsafe.close(unsafe.voidPromise());
    }
}

进入到,这个方法是重要的:

privatefinalclassNioMessageUnsafeextendsAbstractNioUnsafe {
privatefinalList<Object>readBuf=newArrayList<Object>();
@Overridepublicvoidread() {
asserteventLoop().inEventLoop();
finalChannelConfigconfig=config();
finalChannelPipelinepipeline=pipeline();
finalRecvByteBufAllocator.HandleallocHandle=unsafe().recvBufAllocHandle();
allocHandle.reset(config);
booleanclosed=false;
Throwableexception=null;
try {
try {
do {
intlocalRead=doReadMessages(readBuf);
if (localRead==0) {
break;
                    }
if (localRead<0) {
closed=true;
break;
                    }
allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwablet) {
exception=t;
            }
intsize=readBuf.size();
for (inti=0; i<size; i++) {
readPending=false;
pipeline.fireChannelRead(readBuf.get(i));
            }
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception!=null) {
closed=closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
            }
if (closed) {
inputShutdown=true;
if (isOpen()) {
close(voidPromise());
                }
            }
        } finally {
// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending&&!config.isAutoRead()) {
removeReadOp();
            }
        }
    }
}

进而进入:

@OverrideprotectedintdoReadMessages(List<Object>buf) throwsException {
SocketChannelch=SocketUtils.accept(javaChannel());
try {
if (ch!=null) {
buf.add(newNioSocketChannel(this, ch));
return1;
        }
    } catch (Throwablet) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
        } catch (Throwablet2) {
logger.warn("Failed to close a socket.", t2);
        }
    }
return0;
}
publicstaticSocketChannelaccept(finalServerSocketChannelserverSocketChannel) throwsIOException {
try {
returnAccessController.doPrivileged(newPrivilegedExceptionAction<SocketChannel>() {
@OverridepublicSocketChannelrun() throwsIOException {
returnserverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionExceptione) {
throw (IOException) e.getCause();
        }
    }
publicSocketChannelaccept() throwsIOException {
Objectvar1=this.lock;
synchronized(this.lock) {
if (!this.isOpen()) {
thrownewClosedChannelException();
            } elseif (!this.isBound()) {
thrownewNotYetBoundException();
            } else {
SocketChannelImplvar2=null;
intvar3=0;
FileDescriptorvar4=newFileDescriptor();
InetSocketAddress[] var5=newInetSocketAddress[1];
InetSocketAddressvar6;
try {
this.begin();
if (!this.isOpen()) {
var6=null;
returnvar6;
                    }
this.thread=NativeThread.current();
do {
var3=this.accept(this.fd, var4, var5);
                    } while(var3==-3&&this.isOpen());
                } finally {
this.thread=0L;
this.end(var3>0);
assertIOStatus.check(var3);
                }
if (var3<1) {
returnnull;
                } else {
IOUtil.configureBlocking(var4, true);
var6=var5[0];
var2=newSocketChannelImpl(this.provider(), var4, var6);
SecurityManagervar7=System.getSecurityManager();
if (var7!=null) {
try {
var7.checkAccept(var6.getAddress().getHostAddress(), var6.getPort());
                        } catch (SecurityExceptionvar13) {
var2.close();
throwvar13;
                        }
                    }
returnvar2;
                }
            }
        }
    }

此时会调用jdk的SocketChannel,最终调用sun公司的ServerSocketChannel的accept操作。

接着来分析

for (inti=0; i<size; i++) {
readPending=false;
pipeline.fireChannelRead(readBuf.get(i));
 }

会调用:

staticvoidinvokeChannelRead(finalAbstractChannelHandlerContextnext, Objectmsg) {
finalObjectm=next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutorexecutor=next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
    } else {
executor.execute(newRunnable() {
@Overridepublicvoidrun() {
next.invokeChannelRead(m);
            }
        });
    }
}
privatevoidinvokeChannelRead(Objectmsg) {
if (invokeHandler()) {
try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwablet) {
invokeExceptionCaught(t);
            }
        } else {
fireChannelRead(msg);
        }
    }

调用ChannelRead:

@Override@SuppressWarnings("unchecked")
publicvoidchannelRead(ChannelHandlerContextctx, Objectmsg) {
finalChannelchild= (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//执行register操作childGroup.register(child).addListener(newChannelFutureListener() {
@OverridepublicvoidoperationComplete(ChannelFuturefuture) throwsException {
if (!future.isSuccess()) {
forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwablet) {
forceClose(child, t);
    }
}

进入AbstactChannel的register:

@Overridepublicfinalvoidregister(EventLoopeventLoop, finalChannelPromisepromise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(newIllegalStateException("registered to an event loop already"));
return;
    }
if (!isCompatible(eventLoop)) {
promise.setFailure(
newIllegalStateException("incompatible event loop type: "+eventLoop.getClass().getName()));
return;
    }
AbstractChannel.this.eventLoop=eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
    } else {
try {
eventLoop.execute(newRunnable() {
@Overridepublicvoidrun() {
register0(promise);
                }
            });
        } catch (Throwablet) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
        }
    }
}

进而进入我们需要看的doBeginRead:

privatevoidregister0(ChannelPromisepromise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() ||!ensureOpen(promise)) {
return;
        }
booleanfirstRegistration=neverRegistered;
//进行注册doRegister();
neverRegistered=false;
registered=true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
            } elseif (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805//重要beginRead();
            }
        }
    } catch (Throwablet) {
// Close the channel directly to avoid FD leak.closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
    }
}

进入doBeginRead:

@OverridepublicfinalvoidbeginRead() {
assertEventLoop();
if (!isActive()) {
return;
    }
try {
doBeginRead();
    } catch (finalExceptione) {
invokeLater(newRunnable() {
@Overridepublicvoidrun() {
pipeline.fireExceptionCaught(e);
            }
        });
close(voidPromise());
    }
}

进入doBeginRead方法:

@OverrideprotectedvoiddoBeginRead() throwsException {
// Channel.read() or ChannelHandlerContext.read() was calledfinalSelectionKeyselectionKey=this.selectionKey;
if (!selectionKey.isValid()) {
return;
    }
readPending=true;
finalintinterestOps=selectionKey.interestOps();
if ((interestOps&readInterestOp) ==0) {
//完成感兴趣操作selectionKey.interestOps(interestOps|readInterestOp);
    }
}

可以在jdk的SelectionKeyImpl看到:

publicSelectionKeyinterestOps(intvar1) {
this.ensureValid();
returnthis.nioInterestOps(var1);
}

此时完成了doBeginRead操作。

也即首先启动关注bind操作,完成启动之后,进行accept操作,然后进行read操作。

目录
相关文章
|
6月前
|
开发工具 git
网络编程(三)netty学习demo和笔记和推荐的4本书
网络编程(三)netty学习demo和笔记和推荐的4本书
129 0
|
6月前
|
存储 网络协议 Java
Netty应用实例学习
Netty应用实例学习
48 0
|
6月前
|
编解码 缓存 网络协议
Netty核心功能学习
Netty核心功能学习
61 0
|
6月前
|
编解码 网络协议 Java
Netty基础入门学习
Netty基础入门学习
171 0
|
前端开发 网络协议 API
学习Netty BootStrap的核心知识,成为网络编程高手!
学习Netty BootStrap的核心知识,成为网络编程高手!
192 0
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12655 12
|
弹性计算 缓存 网络协议
netty学习(三)
Java NIO 编程
112 0
|
Java
netty学习(二)
Java BIO 编程
127 0
netty学习(二)
|
分布式计算 Dubbo 网络协议
netty学习(一)
netty的介绍和应用场景
167 0
netty学习(一)
|
消息中间件 Dubbo 前端开发
Netty学习二
前面我们已经了解了官方的Netty的example,知道要编写一个一个聊天demo或者一个简单的rpc,或者应答模式的demo,在Netty中通常需要写服务端和客户端的引导,而引导是启动服务用的,而服务端和客户端的Handler则是用于处理具体的业务逻辑。这个通常在RPC框架中比如Dubbo,通常会在服务进行暴露或者进行引用的时候,需要调用Netty服务进行启动,然后进行暴露或者调用的,此时采用协议适配的时候,采用适配器模式,而我们知道生产者端最重要的方法就是doBind方法,而在消费者端最重要的方法是doConnect方法。而在RocketMQ中,我们也可以看到服务端和消费端中produce
109 1
Netty学习二