服务端启动模板
public class MyChatServer { public static void main(String[] args) { //步骤1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //步骤2 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new MyChatServerHandler()); } }); //步骤3 ChannelFuture f = b.bind(8888).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch ( Exception e) { } finally { //优雅关闭,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
步骤1:创建NioEventLoopGroup
EventLoopGroup workerGroup = new NioEventLoopGroup();
跟
//NioEventLoopGroup public NioEventLoopGroup() { this(0); }
跟
//NioEventLoopGroup public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
跟
参数多传入了一个selector
//NioEventLoopGroup public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }
跟
参数多传了一个DefaultSelectStrategyFactory.INSTANCE ,一个工厂类
//NioEventLoopGroup public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); }
跟
参数多传了一个RejectedExecutionHandlers.reject()
//NioEventLoopGroup public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
跟
DEFAULT_EVENT_LOOP_THREADS是一个默认的数(电脑的处理器核数*2(超线程)*2)(数不是重点)
//MultithreadEventLoopGroup protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
跟
//MultithreadEventExecutorGroup protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
跟
//MultithreadEventExecutorGroup protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //if 检查,过 if (nThreads <= 0) { } //初始化,这是传的参数,参数就是null if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //定义数组长度 children = new EventExecutor[nThreads]; //初始化数组 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //对每一个EventExecutor初始化,并且传入args参数 children[i] = newChild(executor, args); success = true; } catch (Exception e) { } finally { if (!success) {//用debug发现不走这里,略过 } } } //初始化chooser,并且把上面的children传入 chooser = chooserFactory.newChooser(children); //创建一个监听器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //对每一个EventExecutor添加监听器 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
跟到这里基本算是跟完了
步骤2:创建并且初始化ServerBootstrap,花式赋值
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //MyChatServerHandler是自己定义的方法 socketChannel.pipeline().addLast(new MyChatServerHandler()); } });
1跟
//ServerBootstrap public ServerBootstrap() { }
2跟group方法
//ServerBootstrap public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { //给ServerBootstrap的父类AbstractBootstrap里的成员变量赋值 super.group(parentGroup); //if省略 //给自己的成员变量赋值 this.childGroup = childGroup; return this; }
3跟channel方法
//AbstractBootstrap public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
3.1跟 new ReflectiveChannelFactory(channelClass) 方法
//ReflectiveChannelFactory,看名字就是反射工厂类 public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } //将NioServerSocketChannel.class赋值给clazz成员变量 this.clazz = clazz; }
3.2跟channelFactory(new ReflectiveChannelFactory(channelClass)) 方法
//AbstractBootstrap public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); }
//AbstractBootstrap public B channelFactory(ChannelFactory<? extends C> channelFactory) { //if省略 //给成员变量赋值 this.channelFactory = channelFactory; //AbstractBootstrap定义的参数泛型如下 //public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable //在ServerBootStrap继承AbstractBootstrap中给B的赋值为ServerBootStrap //public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> //能成功返回 return (B) this; }
4 跟handler()
//AbstractBootstrap public B handler(ChannelHandler handler) { //if省略 //给成员变量赋值 this.handler = handler; return (B) this; }
5跟option()
//AbstractBootstrap public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { //添加参数,添加参数,添加参数 options.put(option, value); } } return (B) this; }
6跟childHandler方法
//ServerBootStrap public ServerBootstrap childHandler(ChannelHandler childHandler) { //if省略 //给成员变量赋值 this.childHandler = childHandler; return this; }
步骤3:一个bind(8888)方法,要你命三千
ChannelFuture f = b.bind(8888).sync();
步骤3.1:一直跟bind方法.
//AbstractBootstrap private ChannelFuture doBind(final SocketAddress localAddress) { //initAndRegister重点代码 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { //debug我的没走,略 } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); //给注册号的channel添加监听器 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { //出现异常 promise.setFailure(cause); } else { //没有出现异常 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }
步骤3.2跟initAndRegister方法
//AbstractBootstrap final ChannelFuture initAndRegister() { Channel channel = null; try { //步骤3.2.1 channel = channelFactory.newChannel(); //步骤3.2.2 init(channel); } catch (Throwable t) {} 步骤3.2.3 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { //debug我的没走。略 } return regFuture; }
步骤3.2.1:创建NioServerSocketChannel
channel = channelFactory.newChannel();
跟进去发现是一个接口
public interface ChannelFactory<T extends Channel> { T newChannel(); }
然后发现上面接口的实现类有ReflectiveChannelFactory(这个类在上面步骤2的3中有创建过)
//ReflectiveChannelFactory public T newChannel() { try { //反射、反射、反射 //clazz在上面赋值过NioServerSocketChannel return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
此时应该调用NioServerSocketChannel的无参方法
//NioServerSocketChannel public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
跟newSocket(DEFAULT_SELECTOR_PROVIDER) 方法
//NioServerSocketChannel private static ServerSocketChannel newSocket(SelectorProvider provider) { try { //返回一个ServerSocketChannel return provider.openServerSocketChannel(); } catch (IOException e) { } }
跟this(newSocket(provider))的this方法(☆)
//NioServerSocketChannel public NioServerSocketChannel(ServerSocketChannel channel) { //SelectionKey.OP_ACCEPT,SelectionKey.OP_ACCEPT,SelectionKey.OP_ACCEPT super(null, channel, SelectionKey.OP_ACCEPT); //config中存入了当前NioServerSocketChannel config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
跟上面代码的super方法,一直跟
//AbstractNioChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //将channel赋值给成员变量 this.ch = ch; //Nio感兴趣的键 this.readInterestOp = readInterestOp; try { //Nio知识,设置为非阻塞 ch.configureBlocking(false); } catch (IOException e) { //略 } }
跟上面的super方法
//AbstractChannel protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); //初始化pipline对象 pipeline = newChannelPipeline(); }
此时NioServerSocketChannel已经创建完了,并且channel里带有ChannelPipeline对象
步骤3.2.2:init初始化NioServerSocketChannel
AbstractBootstrap abstract void init(Channel channel) throws Exception;
找方法的实现类
//ServerBootStrap //给bossGroup(parentGroup)的channel添加参数,并且添加了一个关键、关键、关键的ChannelInitializer void init(Channel channel) throws Exception { //获取bossGroup(parentGroup)的option(ChannelOption.SO_BACKLOG, 1024)这种参数并且传给channel(不是重点) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } //获取bossGroup的attr这种参数并且传给channel(不是重点) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //获取channel中的ChannelPipeline ChannelPipeline p = channel.pipeline(); //获取workerGroup(childGroup)及其一些参数childHandler、ChildOption、ChildAttrs,这些参数都在创建ServerBootStrap的时候赋过值 final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //重点、重点、重点\重点、重点、重点 //下面会重点解释这个地方 //P是ServerSocketChannnel中的ChannelPipeline,添加了一个ChannelInitializer整体(不用看里面的内容) p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { //对应handler(new LoggingHandler(LogLevel.INFO)) pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { //重点、重点、重点(后面会解释) pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
一直跟上面的的 p.addLast(new ChannelInitializer() 的addLast方法
//DefaultChannelPipeline //ChannelPipeline.addList虽然看似添加handler,其实是添加包含handler的ChannelHandlerContext public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //检查、省略 checkMultiplicity(handler); //创建一个ChannelHandlerContext并且把hander传入进去 newCtx = newContext(group, filterName(name, handler), handler); //将ChannelHandlerContext添加到创建的NioServerSocketChannel的Pipeline中 addLast0(newCtx); //走这个方法就返回 if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } //debug后面的没走,下面的略 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
跟上面代码中的 addLast0(newCtx) 方法
//DefaultChannelPipeline //双向链表操作,将ctx添加到链表中 private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
此时NioServerSocketChannel已经初始化完毕,主要的工作就是给channel里的成员变量赋值,并且添加了一ChannelInitializer类 ,并且吧包含handler的ChannelHandlerContext加到双向链表中
步骤3.2.3:注册NioServerSocketChannel
跟AbstractBootstrap中的initAndRegister方法里的
ChannelFuture regFuture = config().group().register(channel);
3.2.3.1跟config()方法
//ServerBootStrap //将自己传给ServerBootstrapConfig private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); public final ServerBootstrapConfig config() { return config; }
3.2.3.2一直跟group()方法
//AbstractBootstrap //group的成员变量的赋值在b.group(bossGroup, workerGroup),其实就是bossGroup volatile EventLoopGroup group; //返回成员变量 public final EventLoopGroup group() { return group; }
3.2.3.3跟register()方法
//MultithreadEventExecutorGroup public ChannelFuture register(Channel channel) { //next方法返回一个EventLoop,自己看 //跟register方法 return next().register(channel); }
跟register方法
//SingleThreadEventLoop public ChannelFuture register(Channel channel) { //DefaultChannelPromise传入了一个channel和this(EventExcuter) return register(new DefaultChannelPromise(channel, this)); }
跟regiser方法
//SingleThreadEventLoop public ChannelFuture register(final ChannelPromise promise) { //检查,省略 ObjectUtil.checkNotNull(promise, "promise"); //重点关注register promise.channel().unsafe().register(this, promise); return promise; }
跟register方法
//AbstractChannel public final void register(EventLoop eventLoop, final ChannelPromise promise) { //if省略 AbstractChannel.this.eventLoop = eventLoop; //下面会执行register0(promise) if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { } } }
跟register0方法
//AbstractChannel private void register0(ChannelPromise promise) { try { //if省略 boolean firstRegistration = neverRegistered; //真正的注册方法 doRegister(); neverRegistered = false; registered = true; //调用handlerAdd方法 pipeline.invokeHandlerAddedIfNeeded(); //这个方法中有一个触发监听器的功能 safeSetSuccess(promise); //调用ChannelRegiser方法 pipeline.fireChannelRegistered(); if (isActive()) {//debug没有执行,省略 } } catch (Throwable t) { } }
跟上面代码的doRegister方法,实现真正的注册
//AbstractNioChannel protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //java Nio 的知识,注册 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { } } }
跟AbstractChannel的register方法里的
//AbstractChannel的register方法里的 pipeline.invokeHandlerAddedIfNeeded();
//DefaultChannelPipeline final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; callHandlerAddedForAllHandlers(); } }
跟callHandlerAddedForAllHandlers()方法
//DefaultChannelPipeline private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; this.pendingHandlerCallbackHead = null; } PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { //跟这个方法 task.execute(); task = task.next; } }
跟task.excute()方法
//DefaultChannelPipeline void execute() { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { executor.execute(this); } catch (RejectedExecutionException e) { } } }
跟callHandlerAdd0(ctx)方法
//DefaultChannelPipeline private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); } catch (Throwable t) { //省略 } }
其中看ctx.handler(),ctx就是ChannelHandlerContext创建的时候就会传入hander,所以一个ChannelHandlerContext对应一个handler
下面跟 ctx.handler().handlerAdded(ctx) handlerAdded方法 (☆☆☆☆☆)
此时你应该懂你的NioServerSocket中的handler有哪些???
还记得ServerBootStrap中的init方法吗?如下所示
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
p.addLast(new ChannelInitializer<Channel>() pipeline中添加了ChannelInitializer ,所以要调用ChannelInitializer的handlerAdd方法
跟ChannelInitializer里的handlerAdded方法
//ChannelInitializer public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { //跟下面方法 initChannel(ctx); } }
跟
//ChannelInitializer private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { //这个initChannel方法就是自定义的方法 initChannel((C) ctx.channel()); } catch (Throwable cause) { } finally { //最后将ctx移除,实现批量处理,因为这个类的作用是添加自定义的Handler,本身没处理的能力,留着干嘛,删了 remove(ctx); } return true; } return false; }
在跟这个 initChannel((C) ctx.channel()) 方法,这个方法的内容不能在跟了,在跟就跟丢了,内容如下
//ServerBootStrap里的init方法的一部分 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
initChannel方法添加了
1)handler(handler(new LoggingHandler(LogLevel.INFO)))
2)new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)☆☆☆☆☆
跟AbstractChannel的registero方法里的safeSetSuccess(promise)代码
//AbstractChannel protected final void safeSetSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); } }
跟 promise.trySuccess()方法
//DefaultChannelPipeline @Override public boolean trySuccess() { return trySuccess(null); }
跟
//DefaultChannelPipeline @Override public boolean trySuccess(V result) { if (setSuccess0(result)) { //通知监听器 notifyListeners(); return true; } return false; }
跟AbstractChannel的registero方法里的pipeline.fireChannelRegistered()代码
他的运行原理和调用handerAdd方法差不多
到此为止,服务端启动完成
channel.active没有调用