深入理解 Netty-Pipeline组件 (一)

简介: 深入理解 Netty-Pipeline组件 (一)

首先我们知道,在NIO网络编程模型中,IO操作直接和channel相关,比如客户端的请求连接,或者向服务端发送数据, 服务端都要从客户端的channel获取这个数据

那么channelPipeline是什么?

其实,这个channelPepiline是Netty增加给原生的channel的组件,在ChannelPipeline接口的上的注解阐述了channelPipeline的作用,这个channelPipeline是高级过滤器的实现,netty将chanenl中数据导向channelPipeline,进而给了用户对channel中数据的百分百的控制权, 此外,channelPipeline数据结构是双向链表,每一个节点都是channelContext,channelContext里面维护了对应的handler和pipeline的引用, 大概总结一下: 通过chanelPipeline,用户客户轻松的往channel写数据,从channel读数据


创建pipeline#


通过前面几篇博客的追踪,我们知道无论我们是通过反射创建出服务端的channel也好,还是直接new创建客户端的channel也好,随着父类构造函数的逐层调用,最终我们都会在Channel体系的顶级抽象类AbstractChannel中,创建出Channel的一大组件 channelPipeline

于是我们程序的入口,AbstractChannelpipeline = newChannelPipeline(); ,跟进去,看到他的源码如下:


protected DefaultChannelPipeline newChannelPipeline() {
    // todo 跟进去
    return new DefaultChannelPipeline(this);
}


可以看到,它创建了一个DefaultChannelPipeline(thisChannel)

DefaultChannelPipeline是channelPipeline的默认实现,他有着举足轻重的作用,我们看一下下面的 ChannelChannelContextChannelPipeline的继承体系图,我们可以看到图中两个类,其实都很重要,



他们之间有什么关系呢?


当我们看完了DefaultChannelPipeline()构造中做了什么自然就知道了


// todo 来到这里
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    // todo 把当前的Channel 保存起来
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    // todo  这两方法很重要
    // todo 设置尾
    tail = new TailContext(this);
    // todo 设置头
    head = new HeadContext(this);
    // todo 双向链表关联
    head.next = tail;
    tail.prev = head;
}


主要做了如下几件事:

  • 初始化succeededFuture
  • 初始化voidPromise
  • 创建尾节点
  • 创建头节点
  • 关联头尾节点


其实,到现在为止,pipiline的初始化已经完成了,我们接着往下看

此外,我们看一下DefaultChannelPipeline的内部类和方法,如下图()



我们关注我圈出来的几部分

  • 两个重要的内部类
  • 头结点 HeaderContext
  • 尾节点 TailContext
  • PendingHandlerAddedTask 添加完handler之后处理的任务
  • PendingHandlerCallBack 添加完handler的回调
  • PengdingHandlerRemovedTask 移除Handler之后的任务
  • 大量的addXXX方法,


final AbstractChannelHandlerContext head;
 final AbstractChannelHandlerContext tail;


跟进它的封装方法:


TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }
// todo 来到这里
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    // todo 为ChannelContext的pipeline附上值了
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}


如下图是HeaderContext和TailContext的声明截图:




我们可以看到,这个tail节点是inbound类型的处理器,一开始确实很纳闷,难道header不应该是Inbound类型的吗?我也不买关子了,直接说为啥

是的,header确实是Inbound类型的处理器, 同时也是出站处理器 (评论区有个老哥说的也很清楚,可以瞅瞅)


因为,对netty来说用发送过来的数据,要就从header节点开始往后传播,怎么传播呢? 因为是双向链表,直接找后一个节点,什么类型的节点呢? inbound类型的,于是数据msg就从header之后的第一个结点往后传播,如果说,一直到最后,都只是传播数据而没有任何处理就会传播到tail节点,因为tail也是inbound类型的, tail节点会替我们释放掉这个msg,防止内存泄露,当然如果我们自己使用了msg,而没往后传播,也没有释放,内存泄露是早晚的时,这就是为啥tail是Inbound类型的, header节点和它相反,在下面说

ok,现在知道了ChannelPipeline的创建了吧


Channelpipeline与ChannelHandler和ChannelHandlerContext之间的关系#


它三者的关系也直接说了, 在上面pipeline的创建的过程中, DefaultChannelPipeline中的头尾节点都是ChannelHandlerContext, 这就意味着, 在pipeline双向链表的结构中,每一个节点都是一个ChannelHandlerContext, 而且每一个 ChannelHandlerContext维护一个handler,这一点不信可以看上图,ChannelHandlerContext的实现类DefaultChannelHandlerContext的实现类, 源码如下:


final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
// todo  Context里面有了 handler的引用
private final ChannelHandler handler;
// todo 创建默认的 ChannelHandlerContext,
DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;


ChannelHandlerContext 接口同时继承ChannelOutBoundInvoker和ChannelInBoundInvoker使得他同时拥有了传播入站事件和出站事件的能力, ChannelHandlerContext把事件传播之后,是谁处理的呢? 当然是handler 下面给出ChannelHandler的继承体系图,可以看到针对入站出来和出站处理ChannelHandler有不同的继承分支应对



添加一个新的节点:#


一般我们都是通过ChanelInitialezer动态的一次性添加多个handler, 下面就去看看,在服务端启动过程中,ServerBootStrapinit(),如下源码:解析我写在代码下面


// todo 这是ServerBootStrapt对 他父类初始化 channel的实现, 用于初始化 NioServerSocketChannel
@Override
void init(Channel channel) throws Exception {
// todo ChannelOption 是在配置 Channel 的 ChannelConfig 的信息
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
    // todo 把 NioserverSocketChannel 和 options Map传递进去, 给Channel里面的属性赋值
    // todo 这些常量值全是关于和诸如TCP协议相关的信息
    setChannelOptions(channel, options, logger);
}
    // todo 再次一波 给Channel里面的属性赋值  attrs0()是获取到用户自定义的业务逻辑属性 --  AttributeKey
final Map<AttributeKey<?>, Object> attrs = attrs0();
// todo 这个map中维护的是 程序运行时的 动态的 业务数据 , 可以实现让业务数据随着netty的运行原来存进去的数据还能取出来
synchronized (attrs) {
    for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
        @SuppressWarnings("unchecked")
        AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
        channel.attr(key).set(e.getValue());
    }
}
// todo-------   options   attrs :   都可以在创建BootStrap时动态的传递进去
// todo ChannelPipeline   本身 就是一个重要的组件, 他里面是一个一个的处理器, 说他是高级过滤器,交互的数据 会一层一层经过它
// todo 下面直接就调用了 p , 说明,在channel调用pipeline方法之前, pipeline已经被创建出来了!,
// todo 到底是什么时候创建出来的 ?  其实是在创建NioServerSocketChannel这个通道对象时,在他的顶级抽象父类(AbstractChannel)中创建了一个默认的pipeline对象
/// todo 补充: ChannelHandlerContext 是 ChannelHandler和Pipeline 交互的桥梁
ChannelPipeline p = channel.pipeline();
// todo  workerGroup 处理IO线程
final EventLoopGroup currentChildGroup = childGroup;
// todo 我们自己添加的 Initializer
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// todo 这里是我们在Server类中添加的一些针对新连接channel的属性设置, 这两者属性被acceptor使用到!!!
synchronized (childOptions) {
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// todo 默认 往NioServerSocketChannel的管道里面添加了一个 ChannelInitializer  ,
// todo  ( 后来我们自己添加的ChildHandler 就继承了的这个ChannelInitializer , 而这个就继承了的这个ChannelInitializer 实现了ChannelHandler)
p.addLast(new ChannelInitializer<Channel>() { // todo 进入addlast
    // todo  这个ChannelInitializer 方便我们一次性往pipeline中添加多个处理器
    @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // todo  获取bootStrap的handler 对象, 没有返回空
            // todo  这个handler 针对bossgroup的Channel  , 给他添加上我们在server类中添加的handler()里面添加处理器
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            // todo ServerBootstrapAcceptor 接收器, 是一个特殊的chanelHandler
             ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // todo !!! --   这个很重要,在ServerBootStrap里面,netty已经为我们生成了接收器  --!!!
                    // todo 专门处理新连接的接入, 把新连接的channel绑定在 workerGroup中的某一条线程上
                    // todo 用于处理用户的请求, 但是还有没搞明白它是怎么触发执行的
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            // todo 这些参数是用户自定义的参数
                            // todo NioServerSocketChannel, worker线程组  处理器   关系的事件
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
    }
});


这个函数真的是好长,但是我们的重点放在ChannelInitializer身上, 现在的阶段, 当前的channel还没有注册上EventLoop上的Selector中

还有不是分析怎么添加handler? 怎么来这里了? 其实下面的 ServerBootstrapAcceptor就是一个handler

我们看一下上面的代码做了啥


ch.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
        // todo !!! --   这个很重要,在ServerBootStrap里面,netty已经为我们生成了接收器  --!!!
        // todo 专门处理新连接的接入, 把新连接的channel绑定在 workerGroup中的某一条线程上
        // todo 用于处理用户的请求, 但是还有没搞明白它是怎么触发执行的
        pipeline.addLast(new ServerBootstrapAcceptor(
                // todo 这些参数是用户自定义的参数
                // todo NioServerSocketChannel, worker线程组  处理器   关系的事件
                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    }
});


懵逼不? 当时真的是给我整蒙圈了, 还没有关联上 EventLoop呢!!! 哪来的ch.eventLoop()....

后来整明白了,这其实是一个回调,netty提供给用户在任意时刻都可以往pipeline中添加handler的实现手段

那么在哪里回调呢? 其实是在 jdk原生的channel注册进EventLoop中的Selector后紧接着回调的,源码如下


private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // todo 进入这个方法doRegister()
        // todo 它把系统创建的ServerSocketChannel 注册进了选择器
        doRegister();
        neverRegistered = false;
        registered = true;
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        // todo 确保在 notify the promise前调用 handlerAdded(...)
        // todo 这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件。
        // todo 如果需要的话,执行HandlerAdded()方法
        // todo 正是这个方法, 回调了前面我们添加 Initializer 中添加 Accpter的重要方法
        pipeline.invokeHandlerAddedIfNeeded();


回调函数在pipeline.invokeHandlerAddedIfNeeded();, 看它的命名, 如果需要的话,执行handler已经添加完成了操作 哈哈,我们现在当然需要,刚添加了个ServerBootstrapAcceptor

在跟进入看源码之间,注意,方法是pipeline调用的, 哪个pipeline呢? 就是上面我们说的DefaultChannelPipeline, ok,跟进源码,进入 DefaultChannelPipeline


// todo 执行handler的添加,如果 需要的话
final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // todo 现在我们的channel已经注册在bossGroup中的eventLoop上了, 是时候回调执行那些在注册前添加的 handler了
        callHandlerAddedForAllHandlers();
    }
}


调用本类方法callHandlerAddedForAllHandlers(); 继续跟进下


// todo 回调原来在没有注册完成之前添加的handler
private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;
        // This Channel itself was registered.
        registered = true;
        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }
  PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}


我们它的动作task.execute();

其中的task是谁? pendingHandlerCallbackHead 这是DefaultChannelPipeline的内部类, 它的作用就是辅助完成 添加handler之后的回调, 源码如下:


private abstract static class PendingHandlerCallback implements Runnable {
    final AbstractChannelHandlerContext ctx;
    PendingHandlerCallback next;
    PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
        this.ctx = ctx;
    }
    abstract void execute();
}


我们跟进上一步的task.execute()就会看到它的抽象方法,那么是谁实现的呢? 实现类是PendingHandlerAddedTask同样是DefaultChannelPipeline的内部类, 既然不是抽象类了, 就得同时实现他父类PendingHandlerCallback的抽象方法,其实有两个一是个excute()另一个是run() --Runable

我们进入看它是如何实现excute,源码如下:


@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
    callHandlerAdded0(ctx);
} else {
    try {
        executor.execute(this);
    } catch (RejectedExecutionException e) {
        if (logger.isWarnEnabled()) {
            logger.warn(
                    "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                    executor, ctx.name(), e);
        }
        remove0(ctx);
        ctx.setRemoved();
    }
}


HandlerAdded()的回调时机#

我们往下追踪, 调用类本类方法callHandlerAdded0(ctx); 源码如下:


// todo 重点看看这个方法 , 入参是刚才添加的  Context
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
    // todo 在channel关联上handler之后并且把Context添加到了 Pipeline之后进行调用!!!
    ctx.handler().handlerAdded(ctx); // todo 他是诸多的回调方法中第一个被调用的
    ctx.setAddComplete();  // todo 修改状态
}
...


继续往下追踪

  • ctx.handler() -- 获取到了当前的channel
  • 调用channel的.handlerAdded(ctx);

这个handlerAdded()是定义在ChannelHandler中的回调方法, 什么时候回调呢? 当handler添加后回调, 因为我们知道,当服务端的channel在启动时,会通过 channelInitializer 添加那个ServerBootstrapAcceptor,所以ServerBootstrapAcceptorhandlerAdded()的回调时机就在上面代码中的ctx.handler().handlerAdded(ctx);

如果直接点击去这个函数,肯定就是ChannelHandler接口中去; 那么 新的问题来了,谁是实现类? 答案是抽象类 ChannelInitializer`` 就在上面我们添加ServerBootstrapAcceptor就创建了一个ChannelInitializer的匿名对象

它的继承体系图如下:



介绍这个ChannelInitializer 他是Netty提供的辅助类,用于提供针对channel的初始化工作,什么工作呢? 批量初始化channel

这个中有三个重要方法,如下

  • 重写的channel的handlerAdded(), 这其实也是handlerAdded()的回调的体现
  • 自己的initChannel()
  • 自己的remove()

继续跟进我们上面的handlerAdded(ChannelHandlerContext ctx) 源码如下:


@Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                initChannel(ctx); // todo 这个方法在上面, 进入 可以在 finally中 找到移除Initializer的逻辑
            }
    }


调用本类的initChannel(ctx); 源码如下:


private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                // todo    remove(ctx);  删除 ChannelInitializer
                remove(ctx);
            }
            return true;
        }
        return false;
    }


两个点

  • 第一: 继续调用本类的抽象方法initChannel((C) ctx.channel());
  • 第二: 移除了remove(ctx);

分开进行第一步

initChannel((C) ctx.channel()); 初始化channel,这个函数被设计成了抽象的, 问题来了, 实现类是谁? 实现类其实刚才说了,就是netty在添加ServerBootStrapAcceptor时创建的那个匿名内部类,我们跟进去看他的实现: 源码如下:


@Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        // todo  获取bootStrap的handler 对象, 没有返回空
        // todo  这个handler 针对bossgroup的Channel  , 给他添加上我们在server类中添加的handler()里面添加处理器
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }
        // todo ServerBootstrapAcceptor 接收器, 是一个特殊的chanelHandler
         ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // todo !!! --   这个很重要,在ServerBootStrap里面,netty已经为我们生成了接收器  --!!!
                // todo 专门处理新连接的接入, 把新连接的channel绑定在 workerGroup中的某一条线程上
                // todo 用于处理用户的请求, 但是还有没搞明白它是怎么触发执行的
                pipeline.addLast(new ServerBootstrapAcceptor(
                        // todo 这些参数是用户自定义的参数
                        // todo NioServerSocketChannel, worker线程组  处理器   关系的事件
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
}


实际上就是完成了一次方法的回调,成功添加了ServerBootstrapAcceptor处理器


删除一个节点#


回来看第二步


remove(ctx); 删除一个节点, 把Initializer删除了? 是的, 把这个初始化器删除了, 为啥要把它删除呢, 说了好多次, 其实他是一个辅助类, 目的就是通过他往channel中一次性添加多个handler, 现在handler也添加完成了, 留着他也没啥用,直接移除了

我们接着看它的源码


// todo 删除当前ctx 节点
    private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        } finally {
            initMap.remove(ctx);
        }
    }


从pipeline中移除, 一路看过去,就会发现底层删除链表节点的操作


private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next;
    next.prev = prev;
}





相关文章
|
Java
由浅入深Netty组件实战3
由浅入深Netty组件实战3
74 0
|
前端开发 算法 Java
由浅入深Netty组件实战2
由浅入深Netty组件实战2
230 0
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
86 0
|
Java
由浅入深Netty基础知识NIO三大组件原理实战 1
由浅入深Netty基础知识NIO三大组件原理实战
107 0
|
前端开发 安全 Java
由浅入深Netty组件实战1
由浅入深Netty组件实战1
96 0
|
8月前
|
设计模式 前端开发 网络协议
面试官:说说Netty的核心组件?
Netty 核心组件是指 Netty 在执行过程中所涉及到的重要概念,这些核心组件共同组成了 Netty 框架,使 Netty 框架能够正常的运行。 Netty 核心组件包含以下内容: 1. 启动器 Bootstrap/ServerBootstrap 2. 事件循环器 EventLoopGroup/EventLoop 3. 通道 Channel 4. 通道处理器 ChannelHandler 5. 通道管道 ChannelPipeline 这些组件的交互流程如下: ![image.png](https://cdn.nlark.com/yuque/0/2024/png/92791/1716
58 0
面试官:说说Netty的核心组件?
|
8月前
|
前端开发 Java 网络安全
【Netty 网络通信】Netty 核心组件
【1月更文挑战第9天】【Netty 网络通信】Netty 核心组件
|
8月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
472 0
|
8月前
|
网络协议 前端开发 Java
Netty Review - 核心组件扫盲
Netty Review - 核心组件扫盲
109 0
|
编解码 前端开发 Java
源码分析Netty:核心组件及启动过程分析
本篇从实例出发,了解Netty核心组件的概念、作用及串联过程。从概念到设计原理,再到深入了解实现细节,从而能够清晰地掌握Netty的技术细节甚至存在的问题,才能最终更好地支持我们实际的各项业务。
363 0