什么是新连接接入?以及新连接接入前,Netty处于什么状态#
netty的服务端NioServerSocketChannel
初始化,注册在BossGroup中的一条NioEventLoop
中,并且给NioServerSocketChannel
中维护的jdk原生的ServerSocketChannel
绑定好了端口后, EventLoop启动,开始轮询工作...
这时候 EventLoop 它在轮询什么? 其实它在轮询监听当初NioServerSocketChannel经过二次注册感兴趣的事件时, 告诉 Selector,让Selector关注自己身上可能会出现 OP_ACCEPT
事件, 这合情合理,因为对于Netty的主从Reactor线程模型中, BossGroup中的channel只关心OP_ACCEPT
也就是用户的请求建立连接事件
netty的新连接接入要做哪些工作?#
看上图,netty的新连接接入,对应这个线程模型中我圈出来的部分, 主要步骤如下
- 服务端Selector轮询到客户端请求建立连接
- 处理请求
- 从服务端维护的JDK 原生ServerSocketChannel中accept()客户端的channel
- 使用new的方法 将客户端的Channel封装成 NioSocketChannel
- 层层往上调用super(),初始化channel的组件
- 创建channel的配置类对象 config
- 向下传播channelRead事件
- 给客户端的channel设置相关参数
- 将客户端的channel注册在 workerGroup 中的轮询算法选出的 EventLoop
- 将jdk原生的SocketChanel注册进 EventLoop中的选择器中
- 传播channelregist事件
- 传播channelActive事件
- 给客户端的channel二次注册netty可以处理的感兴趣的事件
这是我总结的新连接接入的流程,从上面分析的开始检查新链接,终止的标志是,把客户端的NioSocketChannel二次注册在EventLoop上,成为Netty可以处理的chanel为止
入口:NioEventLoop
处理IO事件#
当服务端的事件循环检测到有io事件时,使用它的processSelectedKeys();
处理,源码如下:
private void processSelectedKeys() { // todo selectedKeys 就是经过优化后的keys(底层是数组) , 默认不为null if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
当有了新IO请求进来, jdk原生的Selector将SelectionKey放入存放感兴趣的key的集合中,而这个集合现在就是netty通过反射的方式强制替换为以数组为数据结构的selectedKeys
, 数组不为空, 跟进processSelectedKeysOptimized();
,源码如下: 解析写在源码下面:
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // todo 数组输出空项, 从而允许在channel 关闭时对其进行垃圾回收 // See https://github.com/netty/netty/issues/2363 // todo 数组中当前循环对应的keys质空, 这种感兴趣的事件只处理一次就行 selectedKeys.keys[i] = null; // todo 获取出 attachment,默认情况下就是注册进Selector时,传入的第三个参数 this===> NioServerSocketChannel // todo 一个Selector中可能被绑定上了成千上万个Channel, 通过K+attachment 的手段, 精确的取出发生指定事件的channel, 进而获取channel中的unsafe类进行下一步处理 final Object a = k.attachment(); // todo if (a instanceof AbstractNioChannel) { // todo 进入这个方法, 传进入 感兴趣的key + NioSocketChannel processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
处理感兴趣的事件, 想想,需要什么他才能进一步处理呢? 需要下面两点:
- 这个感兴趣的事件是啥?
- 在这了就是上面的 k
- 哪个channel出现的Selector感兴趣的事件?
- 在这里是通过 attachment拿到的 a ,其实不就是服务端的
NioServerSocketChannel
?
另外它把NioServerSocketChannel
向上强转成了AbstractNioChannel
这是为什么呢?
答:
第一点:
在我写的上一篇Chanel的架构体系中,我们知道,Netty的NioXXXChannel其实是netty的,基于原生的jdk的chanel的封装,而在他的整个继承体系中,这个AbstractNioChannel
就负责维护jdk原生的channel, 知道了这有啥用? 当然有用,我们要去给客户端channel接生了,原生服务端channel.accept()==客户端channel
第二点:
针对数据的读写都是unsafe中,回想是哪个类中定义了读取channel中IO数据的抽象模板函数呢? AbstractNioChannel
, 是它新增的内部接口,从而进客户端和服务对针对chanel的不同特化read进行不同的实现
好, 有了这两个条件,继续跟进processSelectedKey(k, (AbstractNioChannel) a);
看它是如何处理, 源码如下:
- 获取到服务端的unsafe对象(数据读写)
- 根据k的readOps,进行计算决定执行
unsafe.read();
// todo 服务端启动后,方法被用用处理新链接, 可以模拟 telnet localhost 8899 新链接的介入 // todo 处理selectedkey // todo netty底层对数据的读写都是 unsafe完成的 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // todo 这个unsafe 也是可channel 也是和Channel进行唯一绑定的对象 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // todo 确保Key的合法 final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { // todo 确保多线程下的安全性 return; } unsafe.close(unsafe.voidPromise()); return; } // todo NioServerSocketChannel和selectKey都合法的话, 就进入下面的 处理阶段 try { // todo 获取SelectedKey 的 关心的选项 int readyOps = k.readyOps(); // todo 在read() write()之前我们需要调用 finishConnect() 方法, 否则 NIO JDK抛出异常 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps( ); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // todo 同样是检查 readOps是否为零, 来检查是否出现了 jdk 空轮询的bug if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
下面我们进入unsafe.read();
, 直接跳进去,直接进入到了AbstractNioChannel
的抽象内部类,因为上面说了,做了向上强制类型转换,我们源码如下:
/** * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel} */ public interface NioUnsafe extends Unsafe { /** * Return underlying {@link SelectableChannel} */ SelectableChannel ch(); /** * Finish connect */ void finishConnect(); void forceFlush(); }
具体的实现是谁? 因为我们是服务端的channel, 所以实现类是:NioMessageUnsafe
, 进入查看他的源码: 下面这段代码真的是挺长的, 它的解析我写在他的下面:
@Override public void read() { // todo 同样是断言, 当前的线程必须是在 EventLoop 里面的线程才有资格执行 assert eventLoop().inEventLoop( ); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); // todo 用于查看服务端接受的速率, 说白了就是控制服务端是否接着read 客户端的IO事件 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // todo 进入 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //todo 对读到的连接,进行简单的计数 allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // todo 处理新的连接的逻辑来到这, 意思是让pipeline中发生事件传播, // todo pipeline是谁的呢? 现在是NioMessageUnsafe 所以是服务端的, // todo 事件是如何传播的呢? head-->>ServerBootStraptAcceptor-->>tail 依次传播, // todo 传播的什么事件? ChannelRead, 也就是说,会去调用 ServerBootStraptAcceptor的ChannelRead方法,跟进去 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }