深入理解 Netty-新连接接入(二)

简介: 深入理解 Netty-新连接接入(二)

read()三部曲:#


针对这段代码,我们值关心下面几部分, 这三部分结束, 整个新链接的建立就完成了,

下面三部曲的 大前提都是,当前我们是在AbstractNioMessageChannel


  • doReadMessages(readBuf)
  • allocHandle.incMessagesRead(localRead);
  • pipeline.fireChannelRead(readBuf.get(i));


第一步:#


如何创建出jdk原生的 客户端channel,对它做了什么?#


第一步doReadMessages(readBuf) 这是AbstractNioMessageChannel的抽象方法,从chanel读取内容我们需要一个维护特化chanenl引用的对象,谁呢? 它的子类NioServerSocketChannel, 源码如下: 解析依然写在代码下面


//todo doReadMessage  其实就是 doChannel
// todo 处理新连接, 现在是在 NioServReaderSocketChannel里面
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // todo java Nio底层在这里 创建jdk底层的 原生channel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // todo  把java原生的channel, 封装成 Netty自定义的封装的channel , 这里的buf是list集合对象,由上一层传递过来的
            // todo  this  --  NioServerSocketChannel
            // todo  ch --     SocketChnnel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
}


这是个跨越性的操作, 上面的代码主要进行如下面几步工作:

  • 从原生的jdk ServerSocketChannel中 accept出 jdk原生的 SocketChanel
  • 将jdk原生的 Socket封装成Netty对它的封装类型 NioChannel


为啥,服务端的channel需要反射创建,而客户的的channel直接new?#


我的理解是,netty不仅可以做 NIO编程模型的服务器, 传统的阻塞式IO,或者其他类型的服务器他也可以做, 我们传递进入的服务端Chanel的类型决定了他可以成为的服务器的类型, netty的设计者是不知道,用户想用netty做些什么的,于是设计成通过反射创建

但是,一旦服务端的channel类型确定了,对应的客户端的channel也一定知道了,直接new 就好了


NioSocketChannel的创建过程#


我们跟进new NioSocketChannel(this, ch) ,继续阅读, 其中的 this,是服务端的NioServerSocketChannel , ch 是 jdk原生的 SocketChannel, 方法调用链 的源码如下:


public NioSocketChannel(Channel parent, SocketChannel socket) {
    // todo 向上传递
    super(parent, socket);
    // todo 主要是设置 禁用了 NoDelay算法
    config = new NioSocketChannelConfig(this, socket.socket());
}


跟进去, 看, 他把SelectionKey.OP_READ,传递给了他的父类, 稍后 会用这个参数进行 cannel的二次注册,使得NioSocketChannel可以被netty处理它发生的感兴趣的事件, 我们发现,和服务端的chanel明显不同的是, 服务端的NioChannel关注用户的accept,而这里的客户端的channel关注的是read事件,它标志着,服务端的Selector会关心它当中传递进客户端发送的数据,告诉Selector应该读


protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}


继续跟进,到AbstractNioChannel, 他做了如下工作:

  • super(parent) 把NioServerSocketChannel设置为NioSokcetChannel的父parent
  • 自己维护原生的JDK SocketChannel
  • 保存感性趣的选项
  • 设置为非阻塞


源码如下:


*/ // todo 无论是服务端的channel 还是客户端的channel都会使用这个方法进行初始化
// // TODO: 2019/6/23                null        ServerSocketChannel       accept
// todo  如果是在创建NioSocketChannel  parent==NioServerSocketChannel  ch == SocketChanel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);// todo  继续向上跟,创建基本的组件
    // todo 如果是创建NioSocketChannel   这就是在保存原生的jdkchannel
    // todo 如果是创建NioServerSocketChannel   这就是在保存ServerSocketChannel
    this.ch = ch;
    // todo 设置上感兴趣的事件
    this.readInterestOp = readInterestOp;
    try {
        // todo 作为服务端, ServerSocketChannel 设置为非阻塞的
        // todo 作为客户端   SocketChannel 设置为非阻塞的
        ch.configureBlocking(false);


第二步:#


现在NioSocketChannel已经创建完成了,代码的调用栈重新返回上面的NioMessageUnsafe.read()方法,我们接着往下看


//todo 对读到的连接,进行简单的计数
    allocHandle.incMessagesRead(localRead);


第三步 pipeline.fireChannelRead(readBuf.get(i));#


往下传播channelRead(), 在管道中传递事件 channel, 对于服务端来说, 现在他的pipeline是怎么个状态呢?


Header --> ServerBootStraptAcceptor --> tail

channel的pipeline组件是基于双向链表实现,其中head和tail是默认的链表头和尾, 中间的ServerBootStraptAcceptor是什么呢? 其实他是在创建服务端的NioServerSocketChannel时,是在channel注册完毕之后,通过回调,将ServerBootStrapinit()函数,给channel添加channelInitializer时添加进去的; ServerBootStraptAcceptor本质上就是handler, 回顾第一个图, 他就是图中的Acceptor

ok,现在我们去直接去ServerBootStraptAcceptor中,他是ServerBootStrap的内部类,我们看它的channelRead()方法,源码如下:


public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    // todo 给这个来连接的通道添加 childHandler,是我在Server中添加的childHandler, 实际上是那个MyChannelInitializer , 最终目的是添加handler
    child.pipeline().addLast(childHandler);
    // todo 给新来的Channel设置 options 选项
    setChannelOptions(child, childOptions, logger);
    // todo 给新来的Channel设置 attr属性
    for (Entry<AttributeKey<?>, Object> e : childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    try {
        //todo 这里这!!   把新的channel注册进 childGroup
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {


我们可以看到,如下工作:

  • 初始化属性
  • 把客户端的channel的注册进childGroup中的EventLoop



在这里补一张channelGroup的继承图

我们看这个childGroup.regist()方法, 我们知道childGroup是workerGroup,在本类中,它的类型是EventLoopGroup,这是个接口类型的变量, 我们用点进去查看源码自然跳转进接口中,但是我们需要找他的实现类, 那么,是谁重写的它的方法呢?

大家去看看上面的图,它的直接实现类只有一个MultiThreadEventGroup, 其实大家想想看,现在的任务是将原生的客户端channel,注册进WorkerGroup中的EventLoop,那第一步是啥呢? 不得先从这个 事件循环组中拿出一个事件循环吗? 好,进去看MultiThreadEventGroup是如何实现的, 源码如下:


@Override
public ChannelFuture register(Channel channel) {
    // todo  next()  -- 就在上面->  根据轮询算法获取一个事件的执行器  EventExecutor
    // todo, 而每一个EventLoop对应一个EventExecutor   这里之所以是个组, 是因为, 我的机器内核决定我的  事件循环组有八个线程,
    //  todo ?? ????
    // todo 但是一会的责任并没有一直循环, 难道有效的bossGroup只有一条
    // todo 再进去就是SingleThreadEventLoop对此方法的实现了
    return next().register(channel);
}


是的,确实在获取事件循环,我们进行跟进next().register(channel), 现在是 eventloop.regist() ,当我们进入方法时,再次来到EventLoopGroup对这个方法的实现, ok,大家重新去看上面的图,一个eventloop.regist(),现在不再是 循环组.regist 而是 事件循环.regist, 而在图上,我们可以很轻松的看到 , 对EventLoopGroup接口的实现就是 SingleThreadEventLoop, 好,接着进去看它的实现, 源码如下:


// todo register来到这里
@Override
public ChannelFuture register(Channel channel) {
    // todo  ChannelPromise == channel+Executor    跟进去
    // todo   再次调用 register, 就在下面
    return register(new DefaultChannelPromise(channel, this));
}


调用本类的register(new DefaultChannelPromise(channel, this) ,接着进去,源码如下: 同样解析写在源码下面


@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // todo 重点来了
    // todo channel() 获取通道对象
    // todo unsafe()  获取仅供内部使用的unsafe对象   它定义在Channel接口中, 具体的对象是  Channel的子类, AbstractNioChannel
    // todo unsafe对象进行下一步注册 register
    * promise.channel().unsafe().register(this, promise);
    return promise;
}


  • promise.channel() 取出的是客户端的 NioSocketChanenl
  • promise.channel().unsafe()AbstractUnsafe

来到regist()的实现类

方法调用链:

  • 本类方法register
  • 本类方法register0()
  • 本类抽象方法doRegister()
  • pipeline.fireChannelRegistered();传播channel注册事件
  • pipeline.fireChannelActive();传播channel Active事件
  • 二次注册事件


其中,上面的doRegister()是真正的将jdk原生的channel注册进原生的selector

pipeline.fireChannelRegistered();是在 header --> ServerBootStraptAccptor --> 用户自己添加的handler --> tail 中,挨个传递 ChannelRegistered, 就是从头开始调用它们的函数, 我们着重看下面的第三个

pipeline.fireChannelActive();其实是比较绕的,涉及到了pipeline中事件的传递,但是它的作用很大,通过传播channelActive挨个回调他们的状态,netty成功的给这条客户端的新连接注册上了netty能处理的感兴趣的事件

整体源码太长了我不一一贴出来了, 直接看关于pipeline.fireChannelActive();的源码,如下:


if (isActive()) {
    if (firstRegistration) {
        // todo 在pipeline中传播ChannelActive的行为,跟进去
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        // This channel was registered before and autoRead() is set. This means we need to begin read
        // again so that we process inbound data.
        //
        // See https://github.com/netty/netty/issues/4805
        // todo 可以接受客户端的数据了
        beginRead();
    }


第一个判断, if (isActive())针对两个channel,存在两种情况

  • 如果是服务端的channel, 只有在channel绑定完端口后,才会处于active的状态
  • 如果是客户端的channel, 注册到selector+处于连接状态, 他就是active状态

满足条件,进入第一个分支判断,同样满足第一次注册的条件,开始传播事件



回想一下,现在程序进行到什么状态? 看上图的subReactor每一个蓝色的箭头都是一个客户端的channel, 问题是netty还处理不了这些channel上的会发生的感兴趣的事件,因为第一步我们只是把jdk原生的chanel和原生的selector之间进行了关联, 而netty对他们的封装类还没有关联,于是下一步就通过传播active的行为去二次注册关联感兴趣的事件

关于pipeline中的事件传递太多内容了,在下篇博客中写,连载

现在直接给结果,

传递到header的read()源码如下


@Override
    public void read(ChannelHandlerContext ctx) {
    // todo 如果是服务端: NioMessageUnsafe
    // todo 如果是客户端: NioSocketChannelUnsafe
    unsafe.beginRead();
}


接着跟进


@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
//todo 如果是服务端: 这里的SelectionKey就是我们在把NioServerSocketChannel 注册进BoosGroup中的eventLoop时,返回的selectionKey , 就在上面
//todo 如果是客户端: 这里的SelectionKey就是我们在把NioSocketChannel 注册进BoosGroup中的eventLoop时,返回的selectionKey , 就在上面
final SelectionKey selectionKey = this.selectionKey;
// todo 这SelectionKey 就是我们把 NioServerSocketChannel中的ServerSocketChannel注册进BossGroup时, 附加的第三个参数 0
if (!selectionKey.isValid()) {
    return;
}
readPending = true;
// todo 获取这个Selection 的感兴趣的事件,实际就是当时注册时的第二个参数 0
final int interestOps = selectionKey.interestOps();
// todo 如果是服务端, readInterestOp是创建服务端channel时设置的 op_accept
// todo 如果是客户端的新连接,readInterestOp是创建客户端channel时设置的 op_read
if ((interestOps & readInterestOp) == 0) {
    // todo interestOps | readInterestOp两者进行或运算,原来是0事件 , 现在又增加了一个事件, accept事件或者是read
    // todo 进而 从新注册到SelectionKey上面去。。。 0P_Accept 或者 OP_Read
    selectionKey.interestOps(interestOps | readInterestOp);
}
}


ok, 到这里netty的新链接接入就完成了....

连载下一篇, pipeline中的事件传播

相关文章
|
网络协议
netty编程实战02-创建一个带有连接重试的tcp客户端程序
netty编程实战02-创建一个带有连接重试的tcp客户端程序
222 0
|
8月前
|
前端开发 Java Maven
【Netty 网络通信】启动客户端连接服务端实现通信
【1月更文挑战第9天】【Netty 网络通信】启动客户端连接服务端实现通信
|
存储 缓存 编解码
一文搞定Netty,打造单机百万连接测试!3
一文搞定Netty,打造单机百万连接测试!
|
设计模式 缓存 前端开发
一文搞定Netty,打造单机百万连接测试!2
一文搞定Netty,打造单机百万连接测试!
|
缓存 监控 网络协议
一文搞定Netty,打造单机百万连接测试!1
一文搞定Netty,打造单机百万连接测试!
|
网络协议
Netty之第一次 TCP 连接时发生了什么
Netty之第一次 TCP 连接时发生了什么
189 0
|
算法 Java
深入理解 Netty-新连接接入(一)
深入理解 Netty-新连接接入(一)
219 0
手把手教你调试Netty创建连接流程源码
手把手教你调试Netty创建连接流程源码
109 0
手把手教你调试Netty创建连接流程源码
|
网络协议 前端开发 Linux
netty系列之:让TCP连接快一点,再快一点
netty系列之:让TCP连接快一点,再快一点
netty系列之:搭建客户端使用http1.1的方式连接http2服务器
netty系列之:搭建客户端使用http1.1的方式连接http2服务器