Netty学习二

简介: 前面我们已经了解了官方的Netty的example,知道要编写一个一个聊天demo或者一个简单的rpc,或者应答模式的demo,在Netty中通常需要写服务端和客户端的引导,而引导是启动服务用的,而服务端和客户端的Handler则是用于处理具体的业务逻辑。这个通常在RPC框架中比如Dubbo,通常会在服务进行暴露或者进行引用的时候,需要调用Netty服务进行启动,然后进行暴露或者调用的,此时采用协议适配的时候,采用适配器模式,而我们知道生产者端最重要的方法就是doBind方法,而在消费者端最重要的方法是doConnect方法。而在RocketMQ中,我们也可以看到服务端和消费端中produce

前面我们已经了解了官方的Netty的example,知道要编写一个一个聊天demo或者一个简单的rpc,或者应答模式的demo,在Netty中通常需要写服务端和客户端的引导,而引导是启动服务用的,而服务端和客户端的Handler则是用于处理具体的业务逻辑。这个通常在RPC框架中比如Dubbo,通常会在服务进行暴露或者进行引用的时候,需要调用Netty服务进行启动,然后进行暴露或者调用的,此时采用协议适配的时候,采用适配器模式,而我们知道生产者端最重要的方法就是doBind方法,而在消费者端最重要的方法是doConnect方法。而在RocketMQ中,我们也可以看到服务端和消费端中producer.start()和consumer.start()的时候都会调用Netty进行启动操作,也有类似的操作。

那么Netty的服务端又是怎样使用Nio、保证线程的充分使用的呢?

服务器端引导:

/*** Echoes back any received data from a client.* Netty的服务端引导*/publicfinalclassEchoServer {
staticfinalbooleanSSL=System.getProperty("ssl") !=null;
//端口staticfinalintPORT=Integer.parseInt(System.getProperty("port", "8007"));
publicstaticvoidmain(String[] args) throwsException {
// Configure SSL.finalSslContextsslCtx;
if (SSL) {
SelfSignedCertificatessc=newSelfSignedCertificate();
sslCtx=SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
sslCtx=null;
        }
// Configure the server.boss和worker线程组EventLoopGroupbossGroup=newNioEventLoopGroup(1);
EventLoopGroupworkerGroup=newNioEventLoopGroup();
//创建服务端业务处理器对象,进行业务处理,添加线程组,同时开启通道添加so_backlog//同时添加childHandler,重写initChannel方法finalEchoServerHandlerserverHandler=newEchoServerHandler();
try {
//服务端引导ServerBootstrapb=newServerBootstrap();
b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(newLoggingHandler(LogLevel.INFO))
             .childHandler(newChannelInitializer<SocketChannel>() {
@OverridepublicvoidinitChannel(SocketChannelch) throwsException {
//流水线ChannelPipelinep=ch.pipeline();
if (sslCtx!=null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
//添加日志信息//p.addLast(new LoggingHandler(LogLevel.INFO));//添加serverHandlerp.addLast(serverHandler);
                 }
             });
// Start the server.//启动服务器ChannelFuturef=b.bind(PORT).sync();
// Wait until the server socket is closed.f.channel().closeFuture().sync();
        } finally {
// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
        }
    }
}

我们来看到bind操作,也是启动服务器的操作。

AbstractBootstrap#bind(int inetPort)#bind(SocketAddress localAddress)

/*** Create a new {@link Channel} and bind it.*/publicChannelFuturebind(intinetPort) {
returnbind(newInetSocketAddress(inetPort));
}
/*** Create a new {@link Channel} and bind it.*/publicChannelFuturebind(SocketAddresslocalAddress) {
//对参数信息进行校验:group和channelFactoryvalidate();
returndoBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

可以看到里面绑定的是websocket的端口信息。

privateChannelFuturedoBind(finalSocketAddresslocalAddress) {
finalChannelFutureregFuture=initAndRegister();
finalChannelchannel=regFuture.channel();
if (regFuture.cause() !=null) {
returnregFuture;
    }
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.ChannelPromisepromise=channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
returnpromise;
    } else {
// Registration future is almost always fulfilled already, but just in case it's not.finalPendingRegistrationPromisepromise=newPendingRegistrationPromise(channel);
regFuture.addListener(newChannelFutureListener() {
@OverridepublicvoidoperationComplete(ChannelFuturefuture) throwsException {
Throwablecause=future.cause();
if (cause!=null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);
                } else {
// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();
doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
returnpromise;
    }
}

进而

privatestaticvoiddoBind0(
finalChannelFutureregFuture, finalChannelchannel,
finalSocketAddresslocalAddress, finalChannelPromisepromise) {
// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(newRunnable() {
@Overridepublicvoidrun() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
promise.setFailure(regFuture.cause());
            }
        }
    });
}

查看bind方法

AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)

ChannelFuturebind(SocketAddresslocalAddress, ChannelPromisepromise);
@OverridepublicChannelFuturebind(SocketAddresslocalAddress, ChannelPromisepromise) {
returnpipeline.bind(localAddress, promise);
    }

进行绑定:

@OverridepublicfinalChannelFuturebind(SocketAddresslocalAddress, ChannelPromisepromise) {
returntail.bind(localAddress, promise);
}

AbstractChannelHandlerContext

@OverridepublicChannelFuturebind(finalSocketAddresslocalAddress, finalChannelPromisepromise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelledreturnpromise;
    }
finalAbstractChannelHandlerContextnext=findContextOutbound(MASK_BIND);
EventExecutorexecutor=next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
    } else {
safeExecute(executor, newRunnable() {
@Overridepublicvoidrun() {
next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
returnpromise;
}
privatevoidinvokeBind(SocketAddresslocalAddress, ChannelPromisepromise) {
if (invokeHandler()) {
try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwablet) {
notifyOutboundHandlerException(t, promise);
            }
        } else {
bind(localAddress, promise);
        }
}

DefaultChannelPipeline

@Overridepublicvoidbind(
ChannelHandlerContextctx, SocketAddresslocalAddress, ChannelPromisepromise) {
unsafe.bind(localAddress, promise);
}
@Overridepublicfinalvoidbind(finalSocketAddresslocalAddress, finalChannelPromisepromise) {
assertEventLoop();
if (!promise.setUncancellable() ||!ensureOpen(promise)) {
return;
            }
// See: https://github.com/netty/netty/issues/576if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddressinstanceofInetSocketAddress&&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() &&!PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn(
"A non-root user can't receive a broadcast packet if the socket "+"is not bound to a wildcard address; binding to a non-wildcard "+"address ("+localAddress+") anyway as requested.");
            }
booleanwasActive=isActive();
try {
doBind(localAddress);
            } catch (Throwablet) {
safeSetFailure(promise, t);
closeIfClosed();
return;
            }
if (!wasActive&&isActive()) {
invokeLater(newRunnable() {
@Overridepublicvoidrun() {
pipeline.fireChannelActive();
                    }
                });
            }
//关键地方,此时会一步一步的返回pipeline操作safeSetSuccess(promise);
        }

NioServerSocketChannel

@SuppressJava6Requirement(reason="Usage guarded by java version check")
@OverrideprotectedvoiddoBind(SocketAddresslocalAddress) throwsException {
if (PlatformDependent.javaVersion() >=7) {
javaChannel().bind(localAddress, config.getBacklog());
    } else {
javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

最终我们在NioServerSocketChannel中找到doBind方法,此时的说明其是Nio操作,同时可以看到分为两个分支,一个是jdk版本>=7的,另外一个是jdk版本<7的,开不开心,兴奋不兴奋,此时再debug一步就可以看到进入jdk的接口了,ServerSockerChannel接口。

jdk#ServerSocketChannel

publicServerSocketChannelbind(SocketAddressvar1, intvar2) throwsIOException {
Objectvar3=this.lock;
synchronized(this.lock) {
if (!this.isOpen()) {
thrownewClosedChannelException();
        } elseif (this.isBound()) {
thrownewAlreadyBoundException();
        } else {
InetSocketAddressvar4=var1==null?newInetSocketAddress(0) : Net.checkAddress(var1);
SecurityManagervar5=System.getSecurityManager();
if (var5!=null) {
var5.checkListen(var4.getPort());
            }
NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2<1?50 : var2);
Objectvar6=this.stateLock;
synchronized(this.stateLock) {
this.localAddress=Net.localAddress(this.fd);
            }
returnthis;
        }
    }
}

也就是说它本质上是基于Nio的Socket实现的,最终调用sun公司写的net接口。

接着就会返回 safeSetSuccess(promise),从而进一步回调SingleThreadEventExecutor extends#runAllTasks方法,从而走到我们想看到的EventLoop操作。

/*** Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.*/protectedbooleanrunAllTasks(longtimeoutNanos) {
fetchFromScheduledTaskQueue();
Runnabletask=pollTask();
if (task==null) {
afterRunningAllTasks();
returnfalse;
    }
finallongdeadline=timeoutNanos>0?ScheduledFutureTask.nanoTime() +timeoutNanos : 0;
longrunTasks=0;
longlastExecutionTime;
for (;;) {
//执行execute操作safeExecute(task);
//对所有的线程进行++操作runTasks++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks&0x3F) ==0) {
lastExecutionTime=ScheduledFutureTask.nanoTime();
if (lastExecutionTime>=deadline) {
break;
            }
        }
task=pollTask();
if (task==null) {
lastExecutionTime=ScheduledFutureTask.nanoTime();
break;
        }
    }
afterRunningAllTasks();
this.lastExecutionTime=lastExecutionTime;
returntrue;
}

然后debug进行就是我们想要看到的run方法,也即最终的死循环操作,是不是很兴奋,看到这里:

@Overrideprotectedvoidrun() {
intselectCnt=0;
for (;;) {
try {
intstrategy;
try {
strategy=selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
caseSelectStrategy.CONTINUE:
continue;
caseSelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIOcaseSelectStrategy.SELECT:
longcurDeadlineNanos=nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos==-1L) {
curDeadlineNanos=NONE; // nothing on the calendar                    }
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy=select(curDeadlineNanos);
                        }
                    } finally {
// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);
                    }
// fall throughdefault:
                }
            } catch (IOExceptione) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();
selectCnt=0;
handleLoopException(e);
continue;
            }
selectCnt++;
cancelledKeys=0;
needsToSelectAgain=false;
finalintioRatio=this.ioRatio;
booleanranTasks;
if (ioRatio==100) {
try {
if (strategy>0) {
processSelectedKeys();
                    }
                } finally {
// Ensure we always run tasks.//运行task操作ranTasks=runAllTasks();
                }
            } elseif (strategy>0) {
finallongioStartTime=System.nanoTime();
try {
processSelectedKeys();
                } finally {
// Ensure we always run tasks.finallongioTime=System.nanoTime() -ioStartTime;
ranTasks=runAllTasks(ioTime* (100-ioRatio) /ioRatio);
                }
            } else {
ranTasks=runAllTasks(0); // This will run the minimum number of tasks            }
if (ranTasks||strategy>0) {
if (selectCnt>MIN_PREMATURE_SELECTOR_RETURNS&&logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt-1, selector);
                }
selectCnt=0;
            } elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt=0;
            }
        } catch (CancelledKeyExceptione) {
// Harmless exception - log anywayif (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() +" raised by a Selector {} - JDK bug?",
selector, e);
            }
        } catch (Throwablet) {
handleLoopException(t);
        }
// Always handle shutdown even if the loop processing threw an exception.try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
                }
            }
        } catch (Throwablet) {
handleLoopException(t);
        }
    }
}

接着我们来看

EventLoopGroupbossGroup=newNioEventLoopGroup(1);
EventLoopGroupworkerGroup=newNioEventLoopGroup();

这两句,其本质是一个死循环,可以从源码中可以看到

NioEventLoopGroup

publicNioEventLoopGroup() {
this(0);
}
publicNioEventLoopGroup(intnThreads) {
this(nThreads, (Executor) null);
 }
publicNioEventLoopGroup(intnThreads, Executorexecutor) {
this(nThreads, executor, SelectorProvider.provider());
}
publicNioEventLoopGroup(
intnThreads, Executorexecutor, finalSelectorProviderselectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
publicNioEventLoopGroup(intnThreads, Executorexecutor, finalSelectorProviderselectorProvider,
finalSelectStrategyFactoryselectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

MultithreadEventLoopGroup

/*** @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)*/protectedMultithreadEventLoopGroup(intnThreads, Executorexecutor, Object... args) {
super(nThreads==0?DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

MultithreadEventExecutorGroup

这个方法是我们需要关注的重点

/*** Create a new instance.** @param nThreads          the number of threads that will be used by this instance.* @param executor          the Executor to use, or {@code null} if the default should be used.* @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protectedMultithreadEventExecutorGroup(intnThreads, Executorexecutor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
/*** Create a new instance.** @param nThreads          the number of threads that will be used by this instance.* @param executor          the Executor to use, or {@code null} if the default should be used.* @param chooserFactory    the {@link EventExecutorChooserFactory} to use.* @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protectedMultithreadEventExecutorGroup(intnThreads, Executorexecutor,
EventExecutorChooserFactorychooserFactory, Object... args) {
if (nThreads<=0) {
thrownewIllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
//创建新的taskif (executor==null) {
executor=newThreadPerTaskExecutor(newDefaultThreadFactory());
    }
children=newEventExecutor[nThreads];
for (inti=0; i<nThreads; i++) {
booleansuccess=false;
try {
//重点关注children[i] =newChild(executor, args);
success=true;
        } catch (Exceptione) {
// TODO: Think about if this is a good exception typethrownewIllegalStateException("failed to create a child event loop", e);
        } finally {
if (!success) {
for (intj=0; j<i; j++) {
children[j].shutdownGracefully();
                }
for (intj=0; j<i; j++) {
EventExecutore=children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedExceptioninterrupted) {
// Let the caller handle the interruption.Thread.currentThread().interrupt();
break;
                    }
                }
            }
        }
    }
chooser=chooserFactory.newChooser(children);
finalFutureListener<Object>terminationListener=newFutureListener<Object>() {
@OverridepublicvoidoperationComplete(Future<Object>future) throwsException {
if (terminatedChildren.incrementAndGet() ==children.length) {
terminationFuture.setSuccess(null);
            }
        }
    };
for (EventExecutore: children) {
//添加监听e.terminationFuture().addListener(terminationListener);
    }
//放入到LinkedHashSetSet<EventExecutor>childrenSet=newLinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren=Collections.unmodifiableSet(childrenSet);
}

继续回到原来的NioEventLoopGroup,可以看到里面是一个NioEventLoop:

@OverrideprotectedEventLoopnewChild(Executorexecutor, Object... args) throwsException {
EventLoopTaskQueueFactoryqueueFactory=args.length==4? (EventLoopTaskQueueFactory) args[3] : null;
//重点关注returnnewNioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

从而我们就可以看到我们需要关注的NioEventLoop:

NioEventLoop(NioEventLoopGroupparent, Executorexecutor, SelectorProviderselectorProvider,
SelectStrategystrategy, RejectedExecutionHandlerrejectedExecutionHandler,
EventLoopTaskQueueFactoryqueueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider=ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy=ObjectUtil.checkNotNull(strategy, "selectStrategy");
finalSelectorTupleselectorTuple=openSelector();
this.selector=selectorTuple.selector;
this.unwrappedSelector=selectorTuple.unwrappedSelector;
}

除此之外,还可以关注一下initChannel操作。


目录
相关文章
|
6月前
|
开发工具 git
网络编程(三)netty学习demo和笔记和推荐的4本书
网络编程(三)netty学习demo和笔记和推荐的4本书
128 0
|
6月前
|
存储 网络协议 Java
Netty应用实例学习
Netty应用实例学习
45 0
|
6月前
|
编解码 缓存 网络协议
Netty核心功能学习
Netty核心功能学习
58 0
|
6月前
|
编解码 网络协议 Java
Netty基础入门学习
Netty基础入门学习
154 0
|
前端开发 网络协议 API
学习Netty BootStrap的核心知识,成为网络编程高手!
学习Netty BootStrap的核心知识,成为网络编程高手!
186 0
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12650 10
|
弹性计算 缓存 网络协议
netty学习(三)
Java NIO 编程
110 0
|
Java
netty学习(二)
Java BIO 编程
123 0
netty学习(二)
|
分布式计算 Dubbo 网络协议
netty学习(一)
netty的介绍和应用场景
165 0
netty学习(一)
|
消息中间件 Java
Netty学习三
前面我们已经知道Netty服务端启动的时候最重要的是进行bind操作,这个操作不仅进行了run()操作进行死循环,而且将线程任务添加到队列中,进行runAllTasks操作。 首先,我们可以看Netty的架构图,图片来自即时通讯网:
106 1
Netty学习三