深入理解 Netty-新连接接入(一)

简介: 深入理解 Netty-新连接接入(一)

什么是新连接接入?以及新连接接入前,Netty处于什么状态#


netty的服务端NioServerSocketChannel初始化,注册在BossGroup中的一条NioEventLoop中,并且给NioServerSocketChannel中维护的jdk原生的ServerSocketChannel绑定好了端口后, EventLoop启动,开始轮询工作...

这时候 EventLoop 它在轮询什么? 其实它在轮询监听当初NioServerSocketChannel经过二次注册感兴趣的事件时, 告诉 Selector,让Selector关注自己身上可能会出现 OP_ACCEPT 事件, 这合情合理,因为对于Netty的主从Reactor线程模型中, BossGroup中的channel只关心OP_ACCEPT 也就是用户的请求建立连接事件



netty的新连接接入要做哪些工作?#


看上图,netty的新连接接入,对应这个线程模型中我圈出来的部分, 主要步骤如下

  • 服务端Selector轮询到客户端请求建立连接
  • 处理请求
  • 从服务端维护的JDK 原生ServerSocketChannel中accept()客户端的channel
  • 使用new的方法 将客户端的Channel封装成 NioSocketChannel
  • 层层往上调用super(),初始化channel的组件
  • 创建channel的配置类对象 config
  • 向下传播channelRead事件
  • 给客户端的channel设置相关参数
  • 将客户端的channel注册在 workerGroup 中的轮询算法选出的 EventLoop
  • 将jdk原生的SocketChanel注册进 EventLoop中的选择器中
  • 传播channelregist事件
  • 传播channelActive事件
  • 给客户端的channel二次注册netty可以处理的感兴趣的事件


这是我总结的新连接接入的流程,从上面分析的开始检查新链接,终止的标志是,把客户端的NioSocketChannel二次注册在EventLoop上,成为Netty可以处理的chanel为止


入口:NioEventLoop处理IO事件#


当服务端的事件循环检测到有io事件时,使用它的processSelectedKeys();处理,源码如下:


private void processSelectedKeys() {
        // todo  selectedKeys 就是经过优化后的keys(底层是数组) , 默认不为null
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
}


当有了新IO请求进来, jdk原生的Selector将SelectionKey放入存放感兴趣的key的集合中,而这个集合现在就是netty通过反射的方式强制替换为以数组为数据结构的selectedKeys, 数组不为空, 跟进processSelectedKeysOptimized();,源码如下: 解析写在源码下面:



private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
    final SelectionKey k = selectedKeys.keys[i];
    // null out entry in the array to allow to have it GC'ed once the Channel close
    // todo 数组输出空项, 从而允许在channel 关闭时对其进行垃圾回收
    // See https://github.com/netty/netty/issues/2363
    // todo 数组中当前循环对应的keys质空, 这种感兴趣的事件只处理一次就行
    selectedKeys.keys[i] = null;
    // todo 获取出 attachment,默认情况下就是注册进Selector时,传入的第三个参数  this===>   NioServerSocketChannel
    // todo 一个Selector中可能被绑定上了成千上万个Channel,  通过K+attachment 的手段, 精确的取出发生指定事件的channel, 进而获取channel中的unsafe类进行下一步处理
    final Object a = k.attachment();
    // todo
    if (a instanceof AbstractNioChannel) {
        // todo 进入这个方法, 传进入 感兴趣的key + NioSocketChannel
        processSelectedKey(k, (AbstractNioChannel) a);
    } else {
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        processSelectedKey(k, task);
    }
    if (needsToSelectAgain) {
        // null out entries in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.reset(i + 1);
        selectAgain();
        i = -1;
    }
}
}


处理感兴趣的事件, 想想,需要什么他才能进一步处理呢? 需要下面两点:

  • 这个感兴趣的事件是啥?
  • 在这了就是上面的 k
  • 哪个channel出现的Selector感兴趣的事件?
  • 在这里是通过 attachment拿到的 a ,其实不就是服务端的NioServerSocketChannel ?


另外它把NioServerSocketChannel向上强转成了AbstractNioChannel这是为什么呢?


答:


第一点:

在我写的上一篇Chanel的架构体系中,我们知道,Netty的NioXXXChannel其实是netty的,基于原生的jdk的chanel的封装,而在他的整个继承体系中,这个AbstractNioChannel就负责维护jdk原生的channel, 知道了这有啥用? 当然有用,我们要去给客户端channel接生了,原生服务端channel.accept()==客户端channel


第二点:

针对数据的读写都是unsafe中,回想是哪个类中定义了读取channel中IO数据的抽象模板函数呢? AbstractNioChannel, 是它新增的内部接口,从而进客户端和服务对针对chanel的不同特化read进行不同的实现

好, 有了这两个条件,继续跟进processSelectedKey(k, (AbstractNioChannel) a);看它是如何处理, 源码如下:

  • 获取到服务端的unsafe对象(数据读写)
  • 根据k的readOps,进行计算决定执行unsafe.read();


// todo 服务端启动后,方法被用用处理新链接,  可以模拟 telnet localhost 8899 新链接的介入
// todo 处理selectedkey
// todo netty底层对数据的读写都是  unsafe完成的
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// todo 这个unsafe 也是可channel 也是和Channel进行唯一绑定的对象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {   // todo 确保Key的合法
    final EventLoop eventLoop;
    try {
        eventLoop = ch.eventLoop();
    } catch (Throwable ignored) {
        return;
    }
    if (eventLoop != this || eventLoop == null) { // todo 确保多线程下的安全性
        return;
    }
    unsafe.close(unsafe.voidPromise());
    return;
}
// todo NioServerSocketChannel和selectKey都合法的话, 就进入下面的 处理阶段
try {
    // todo 获取SelectedKey 的 关心的选项
    int readyOps = k.readyOps();
    // todo 在read()   write()之前我们需要调用 finishConnect()  方法, 否则  NIO JDK抛出异常
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps( );
        unsafe.finishConnect();
    }
    // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to  write
        ch.unsafe().forceFlush();
    }
    // todo 同样是检查 readOps是否为零, 来检查是否出现了  jdk  空轮询的bug
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
} catch (CancelledKeyException ignored) {
    unsafe.close(unsafe.voidPromise());
}
}


下面我们进入unsafe.read();, 直接跳进去,直接进入到了AbstractNioChannel的抽象内部类,因为上面说了,做了向上强制类型转换,我们源码如下:


/**
     * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
     */
    public interface NioUnsafe extends Unsafe {
        /**
         * Return underlying {@link SelectableChannel}
         */
        SelectableChannel ch();
        /**
         * Finish connect
         */
        void finishConnect();
        void forceFlush();
    }


具体的实现是谁? 因为我们是服务端的channel, 所以实现类是:NioMessageUnsafe, 进入查看他的源码: 下面这段代码真的是挺长的, 它的解析我写在他的下面:


@Override
public void read() {
    // todo 同样是断言, 当前的线程必须是在 EventLoop  里面的线程才有资格执行
    assert eventLoop().inEventLoop( );
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // todo 用于查看服务端接受的速率, 说白了就是控制服务端是否接着read 客户端的IO事件
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                // todo 进入
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                //todo 对读到的连接,进行简单的计数
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // todo 处理新的连接的逻辑来到这, 意思是让pipeline中发生事件传播,
            // todo pipeline是谁的呢?  现在是NioMessageUnsafe  所以是服务端的,
            // todo 事件是如何传播的呢?  head-->>ServerBootStraptAcceptor-->>tail 依次传播,
            // todo 传播的什么事件?  ChannelRead,  也就是说,会去调用 ServerBootStraptAcceptor的ChannelRead方法,跟进去
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        if (exception != null) {
            closed = closeOnReadError(exception);
            pipeline.fireExceptionCaught(exception);
        }
        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
}



相关文章
|
网络协议
netty编程实战02-创建一个带有连接重试的tcp客户端程序
netty编程实战02-创建一个带有连接重试的tcp客户端程序
221 0
|
8月前
|
前端开发 Java Maven
【Netty 网络通信】启动客户端连接服务端实现通信
【1月更文挑战第9天】【Netty 网络通信】启动客户端连接服务端实现通信
|
存储 缓存 编解码
一文搞定Netty,打造单机百万连接测试!3
一文搞定Netty,打造单机百万连接测试!
|
设计模式 缓存 前端开发
一文搞定Netty,打造单机百万连接测试!2
一文搞定Netty,打造单机百万连接测试!
|
缓存 监控 网络协议
一文搞定Netty,打造单机百万连接测试!1
一文搞定Netty,打造单机百万连接测试!
|
网络协议
Netty之第一次 TCP 连接时发生了什么
Netty之第一次 TCP 连接时发生了什么
184 0
|
Java
深入理解 Netty-新连接接入(二)
深入理解 Netty-新连接接入(二)
152 0
手把手教你调试Netty创建连接流程源码
手把手教你调试Netty创建连接流程源码
109 0
手把手教你调试Netty创建连接流程源码
|
网络协议 前端开发 Linux
netty系列之:让TCP连接快一点,再快一点
netty系列之:让TCP连接快一点,再快一点
netty系列之:搭建客户端使用http1.1的方式连接http2服务器
netty系列之:搭建客户端使用http1.1的方式连接http2服务器