服务端启动的最后一步,就是绑定本地端口,启动服务,下面我们来分析下这部分代码。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } }Copy to clipboardErrorCopied
先看下上述代码调用的 initAndRegister()方法。它首先实例化了一个 NioServerSocketChannel 类型 的 Channel 对象。相关代码如下。
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }Copy to clipboardErrorCopied
NioServerSocketChannel 创建成功后,对它进行初始化,初始化工作主要有以下三点。
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } // 1、设置 Socket参数 和 NioServerSocketChannel 的附加属性 final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // 2、将 AbstractBootstrap 的 Handler 添加到 NioServerSocketChannel // 的 ChannelPipeline 中 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // 3、将用于服务端注册的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }Copy to clipboardErrorCopied
到此,Netty 服务端监听的相关资源已经初始化完毕,就剩下最后一步,注册 NioServerSocketChannel 到 Reactor 线程 的多路复用器上,然后轮询客户端连接事件。在分析注册代码之前,我们先通过下图,看看目前 NioServerSocketChannel 的 ChannelPipeline 的组成。 最后,我们看下 NioServerSocketChannel 的注册。当 NioServerSocketChannel 初始化完成之后,需要将它注册到 Reactor 线程 的多路复用器上监听新客户端的接入,代码如下。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { protected abstract class AbstractUnsafe implements Unsafe { /** * 将完成初始化的 NioServerSocketChannel 注册到 Reactor线程 * 的多路复用器上,监听新客户端的接入 */ @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... // 首先判断是否是 NioEventLoop 自身发起的操作。如果是,则不存在并发操作,直接 // 执行 Channel注册;如果由其他线程发起,则封装成一个 Task 放入消息队列中异步执行。 // 此处,由于是由 ServerBootstrap 所在线程执行的注册操作,所以会将其封装成 Task 投递 // 到 NioEventLoop 中执行 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ...... } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 该方法在本类中是一个空实现,下面看一下它在子类 AbstractNioChannel 中的实现 doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } } public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 将 NioServerSocketChannel 注册到 NioEventLoop 的 多路复用器Selector 上 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ...... } } } }Copy to clipboardErrorCopied
到此,服务端监听启动部分源码已经分析完成。
结合 Netty 源码 对客户端接入过程进行解析
负责处理网络读写、连接和客户端请求接入的 Reactor 线程 就是 NioEventLoop,下面我们看下 NioEventLoop 是如何处理新的客户端连接接入的。当 多路复用器 检测到新的准备就绪的 Channel 时,默认执行 processSelectedKeysOptimized()方法,代码如下。
public final class NioEventLoop extends SingleThreadEventLoop { private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // 根据就绪的操作位 SelectionKey,执行不同的操作 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } } } // 根据就绪的操作位 SelectionKey,执行不同的操作 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // 由于不同的 Channel 执行不同的操作,所以 NioUnsafe 被设计成接口 // 由不同的 Channel 内部的 NioUnsafe实现类 负责具体实现 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } // read()方法 的实现有两个,分别是 NioByteUnsafe 和 NioMessageUnsafe, // 对于 NioServerSocketChannel,它使用的是 NioMessageUnsafe // 下面看一下 NioMessageUnsafe 对 read() 方法的实现 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } } public abstract class AbstractNioMessageChannel extends AbstractNioChannel { private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 接收新的客户端连接并创建 NioSocketChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 接收到新的客户端连接后,触发 ChannelPipeline 的 channelRead方法。 // 事件在 ChannelPipeline 中传递,执行 ServerBootstrapAcceptor 的 // channelRead方法 pipeline.fireChannelRead(readBuf.get(i)); } ...... } } } } public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { /** * 接收新的客户端连接并创建 NioSocketChannel */ @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ...... } return 0; } } public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { /** * 该方法主要分为如下三个步骤。 */ @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; // 第一步:将启动时传入的 childHandler 加入到客户端 SocketChannel 的 ChannelPipeline 中 child.pipeline().addLast(childHandler); // 第二步:设置客户端 SocketChannel 的 TCP参数 setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } // 第三步:注册 SocketChannel 到多路复用器 try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } } }Copy to clipboardErrorCopied
下面我们展开看下 NioSocketChannel 的 register()方法。NioSocketChannel 的注册方法与 ServerSocketChannel 的一致, 也是将 Channel 注册到 Reactor 线程 的多路复用器上。由于注册的操作位是 0,所以,此时 NioSocketChannel 还不能读取客户端发送的消息,下面我们看看 是什么时候修改监听操作位为 OP_READ 的。
执行完注册操作之后,紧接着会触发 ChannelReadComplete 事件。我们继续分析 ChannelReadComplete 在 ChannelPipeline 中的处理流程:Netty 的 Header 和 Tail 本身不关注 ChannelReadComplete 事件 就直接透传,执行完 ChannelReadComplete 后,接着执行 PipeLine 的 read()方法,最终执行 HeadHandler 的 read()方法。
HeadHandler 的 read()方法用来将网络操作位修改为读操作。创建 NioSocketChannel 的时候已经将 AbstractNioChannel 的 readInterestOp 设置为 OP_ READ,这样,执行 selectionKey. interestOps(interestOps | readInterestOp)操作 时就会把操作位设置为 OP_READ。代码如下。
public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } }Copy to clipboardErrorCopied
到此,新接入的客户端连接处理完成,可以进行网络读写等 IO 操作。