深入理解 Netty-Channel架构体系 (二)

简介: 深入理解 Netty-Channel架构体系 (二)

AbstractChanel#



接着往下看,下面来到Channel接口的直接实现类,AbstractChannel 他是个抽象类, AbstractChannel重写部分Channel接口预定义的方法, 它的抽象内部类AbstractUnsafe实现了Channel的内部接口unsafe

我们现在是从上往下看,但是当我们创建对象使用的时候其实是使用的特化的对象,创建特化的对象就难免会调层层往上调用父类的构造方法, 所以我们看看AbstractChannel的构造方法干了什么活? 源码如下:


protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // todo channelId 代表Chanel唯一的身份标志
    id = newId();
    // todo 创建一个unsafe对象
    unsafe = newUnsafe();
    // todo 在这里初始化了每一个channel都会有的pipeline组件
    pipeline = newChannelPipeline();
}


我们看,AbstractChannel构造函数, 接受的子类传递进来的参数只有一个parent CHannel,而且,还不有可能为空, 所以在AbstractChannel是没有维护jdk底层的Channel的, 相反他会维护着Channel关联的EventLoop,我是怎么知道的呢? 首先,它的属性中存在这个字段,而且,将channel注册进selector的Register()方法是AbastractChannel重写的,Selector在哪呢? 在EventLoop里面,它怎么得到的呢? 它的子类传递了给了它


终于看出来点眉目,构造方法做了四件事

  • 设置parent
  • 如果当前创建的channel是客户端的channel,把parent初始化为他对应的parent
  • 如果为服务端的channel,这就是null
  • 创建唯一的id
  • 创建针对channel进行io读写的unsafe
  • 创建channel的处理器handler链 channelPipeline


AbstractChannel中维护着EventLoop


AbstractChanel的重要抽象内部类AbstractUnsafe 继承了Channel的内部接口Unsafe#


他的源码如下,我贴出来了两个重要的方法, 关于这两个方法的解析,我写在代码的下面


protected abstract class AbstractUnsafe implements Unsafe {
@Override
// todo 入参 eventLoop == SingleThreadEventLoop   promise == NioServerSocketChannel + Executor
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    // todo 赋值给自己的 事件循环, 把当前的eventLoop赋值给当前的Channel上  作用是标记后续的所有注册的操作都得交给我这个eventLoop处理, 正好对应着下面的判断
    // todo 保证了 即便是在多线程的环境下一条channel 也只能注册关联上唯一的eventLoop,唯一的线程
    AbstractChannel.this.eventLoop = eventLoop;
    // todo 下面的分支判断里面执行的代码是一样的!!, 为什么? 这是netty的重点, 它大量的使用线程, 线程之间就会产生同步和并发的问题
    // todo 下面的分支,目的就是把线程可能带来的问题降到最低限度
    // todo 进入inEventLoop() --> 判断当前执行这行代码的线程是否就是 SingleThreadEventExecutor里面维护的那条唯一的线程
    // todo 解释下面分支的必要性, 一个eventLoop可以注册多个channel, 但是channel的整个生命周期中所有的IO事件,仅仅和它关联上的thread有关系
    // todo 而且,一个eventLoop在他的整个生命周期中,只和唯一的线程进行绑定,
    //
    // todo 当我们注册channel的时候就得确保给他专属它的thread,
    // todo 如果是新的连接到了,
    if (eventLoop.inEventLoop()) {
        // todo 进入regist0()
        register0(promise);
    } else {
        try {
            // todo 如果不是,它以一个任务的形式提交  事件循环 , 新的任务在新的线程开始,  规避了多线程的并发
            // todo 他是SimpleThreadEventExucutor中execute()实现的,把任务添加到执行队列执行
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // todo 进入这个方法doRegister()
        // todo 它把系统创建的ServerSocketChannel 注册进了选择器
        doRegister();
        neverRegistered = false;
        registered = true;
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        // todo 确保在 notify the promise前调用 handlerAdded(...)
        // todo 这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件。
        // todo 如果需要的话,执行HandlerAdded()方法
        // todo 正是这个方法, 回调了前面我们添加 Initializer 中添加 Accpter的重要方法
        pipeline.invokeHandlerAddedIfNeeded();
        // todo  !!!!!!!  观察者模式!!!!!!  通知观察者,谁是观察者?  暂时理解ChannelHandler 是观察者
        safeSetSuccess(promise);
        // todo 传播行为, 传播什么行为呢?   在head---> ServerBootStraptAccptor ---> tail传播事件ChannelRegistered  , 也就是挨个调用它们的ChannelRegisted函数
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // todo 对于服务端:  javaChannel().socket().isBound(); 即  当Channel绑定上了端口   isActive()才会返回true
        // todo 对于客户端的连接 ch.isOpen() && ch.isConnected(); 返回true , 就是说, Channel是open的 打开状态的就是true
        if (isActive()) {
            if (firstRegistration) {
                // todo 在pipeline中传播ChannelActive的行为,跟进去
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                // todo 可以接受客户端的数据了
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }
    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }
    boolean wasActive = isActive();
    // todo 由于端口的绑定未完成,所以 wasActive是 false
    try {
        // todo 绑定端口, 进去就是NIO原生JDK绑定端口的代码
        doBind(localAddress);
        // todo 端口绑定完成  isActive()是true
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // todo 根据上面的逻辑判断, 结果为 true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            // todo 来到这里很重要, 向下传递事件行为, 传播行为的时候, 从管道的第一个节点开始传播, 第一个节点被封装成 HeadContext的对象
           // todo 进入方法, 去 HeadContext里面查看做了哪些事情
            // todo 她会触发channel的read, 最终重新为 已经注册进selector 的 chanel, 二次注册添加上感性趣的accept事件
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    // todo 观察者模式, 设置改变状态, 通知观察者的方法回调
    safeSetSuccess(promise);
}


AbstractChannel抽象内部类的register(EventLoop,channelPromise)方法#


这个方法,是将channel注册进EventLoop的Selector, 它的调用顺序如下:


本类方法 regist()--> 本类方法 register0() --> 本类抽象方法doRegister()

doRegister() 在这里设计成抽象方法,等着子类去具体的实现, 为啥这样做呢?

刚才说了,AbstractChannel本身就是个模板,而且它仅仅维护了EventLoop,没有拿到channel引用的它根本不可能进行注册的逻辑,那谁有jdk原生channel的引用呢? 它的直接子类AbstractNioChannel下面是AbstractNioChannel的构造方法, 它自己维护jdk原生的Channel,所以由他重写doRegister(),


*/ // todo 无论是服务端的channel 还是客户端的channel都会使用这个方法进行初始化
// // TODO: 2019/6/23                null        ServerSocketChannel       accept
// todo  如果是在创建NioSocketChannel  parent==NioServerSocketChannel  ch == SocketChanel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);// todo  继续向上跟,创建基本的组件
    // todo 如果是创建NioSocketChannel   这就是在保存原生的jdkchannel
    // todo 如果是创建NioServerSocketChannel   这就是在保存ServerSocketChannel
    this.ch = ch;
    // todo 设置上感兴趣的事件
    this.readInterestOp = readInterestOp;
    try {
        // todo 作为服务端, ServerSocketChannel 设置为非阻塞的
        // todo 作为客户端   SocketChannel 设置为非阻塞的
        ch.configureBlocking(false);
    } catch (IOException e) {


AbstractChannel抽象内部类的bind()方法#


bind()方法的调用顺序, 本类方法 bind()--> 本类的抽象方法 dobind()

方法的目的是给Channel绑定上属于它的端口,同样有一个抽象方法,等着子类去实现,因为我们已经知道了AbstractChannel不维护channel的引用,于是我就去找dobind()这个抽象函数的实现, 结果发现,AbstractChannel的直接子类AbstractNioChannel中根本不没有他的实现,这是被允许的,因为AbstractNioChannel本身也是抽象类, 到底是谁实现呢? 如下图:在NioServerSocketChannel中获取出 Jdk原生的channel, 客户端和服务端的channel又不同,所以绑定端口这中特化的任务,交给他们自己实现



AbstractChannel的beginRead()()方法#


上面完成注册之后,就去绑定端口,当端口绑定完成,就会channel处于active状态,下一步就是执行beginRead() ,执行的流程如下

本类抽象方法 beginRead() --> 本类抽象方法doBeginRead()

这个read() 就是从已经绑定好端口的channel中读取IO数据,和上面的方法一样,对于没有channel引用的AbstractChannel来说,netty把它设计成抽象方法,交给拥有jdk 原生channel引用的AbstractNioChannel实现


小结:


AbstractChannel作为Channel的直接实现类,本身又是抽象类,于是它实现了Channel的预留的一些抽象方法, 初始化了channel的四个组件 id pipeline unsafe parent, 更为重要的是它的抽象内部类 实现了 关于nettyChannel的注册,绑定,读取数据的逻辑,而且以抽象类的方法,挖好了填空题等待子类的特化实现

相关文章
|
7月前
|
前端开发 网络协议 Dubbo
Netty - 回顾Netty高性能原理和框架架构解析
Netty - 回顾Netty高性能原理和框架架构解析
295 0
|
设计模式 缓存 网络协议
Netty整体介绍和架构认知(一)
Netty整体介绍和架构认知
12366 3
|
编解码 前端开发 网络协议
Netty整体介绍和架构认知(二)
Netty整体介绍和架构认知
176 0
|
存储 安全 大数据
谈谈如何构建现代数据体系架构(数据湖+数据仓库)
如何构建当前企业数据体系架构呢?其实与许多其他技术一样,它实际上取决于企业要实现目标。
谈谈如何构建现代数据体系架构(数据湖+数据仓库)
|
存储 Kubernetes 监控
kubernete架构体系介绍
kubernete架构体系介绍
233 1
kubernete架构体系介绍
|
网络协议 Java 程序员
Netty网络编程(二):架构概述
Netty网络编程(二):架构概述
113 0
|
监控 网络协议 NoSQL
Netty 高性能架构设计
Netty 高性能架构设计
198 0
|
编解码 弹性计算 前端开发
太详细了!终于有人把Netty原理架构讲解清楚了
本文基于 Netty 4.1 展开介绍相关理论模型,使用场景,基本组件、整体架构,知其然且知其所以然,希望给大家在实际开发实践、学习开源项目方面提供参考。
472 1
太详细了!终于有人把Netty原理架构讲解清楚了
|
SQL 存储 缓存
一图搞定MySQL体系架构
要了解mysql的运行机制,那么首先要对mysql的体系结构有一定的了解。
1043 0
一图搞定MySQL体系架构
|
消息中间件 编解码 网络协议
Netty 的三层架构设计
《读尽源码》
308 0